You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by om...@apache.org on 2016/05/20 21:22:50 UTC

[12/27] hive git commit: HIVE-11417. Move the ReaderImpl and RowReaderImpl to the ORC module, by making shims for the row by row reader. (omalley reviewed by prasanth_j)

http://git-wip-us.apache.org/repos/asf/hive/blob/ffb79509/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 2199b11..e46ca51 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@ -18,1218 +18,923 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
-import java.math.BigDecimal;
-import java.sql.Date;
-import java.sql.Timestamp;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.orc.BooleanColumnStatistics;
-import org.apache.orc.impl.BufferChunk;
-import org.apache.orc.ColumnStatistics;
-import org.apache.orc.impl.ColumnStatisticsImpl;
-import org.apache.orc.CompressionCodec;
-import org.apache.orc.DataReader;
-import org.apache.orc.DateColumnStatistics;
-import org.apache.orc.DecimalColumnStatistics;
-import org.apache.orc.DoubleColumnStatistics;
-import org.apache.orc.impl.DataReaderProperties;
-import org.apache.orc.impl.InStream;
-import org.apache.orc.IntegerColumnStatistics;
-import org.apache.orc.OrcConf;
-import org.apache.orc.impl.OrcIndex;
-import org.apache.orc.impl.PositionProvider;
-import org.apache.orc.impl.StreamName;
-import org.apache.orc.StringColumnStatistics;
-import org.apache.orc.StripeInformation;
-import org.apache.orc.TimestampColumnStatistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.io.DiskRange;
-import org.apache.hadoop.hive.common.io.DiskRangeList;
-import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
-import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
-import org.apache.orc.BloomFilterIO;
-import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
-import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.orc.OrcProto;
+import org.apache.orc.TypeDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
-public class RecordReaderImpl implements RecordReader {
+public class RecordReaderImpl extends org.apache.orc.impl.RecordReaderImpl
+                              implements RecordReader {
   static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
-  private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
-  private static final Object UNKNOWN_VALUE = new Object();
-  private final Path path;
-  private final long firstRow;
-  private final List<StripeInformation> stripes =
-      new ArrayList<StripeInformation>();
-  private OrcProto.StripeFooter stripeFooter;
-  private final long totalRowCount;
-  private final CompressionCodec codec;
-  private final List<OrcProto.Type> types;
-  private final int bufferSize;
-  private final boolean[] included;
-  private final long rowIndexStride;
-  private long rowInStripe = 0;
-  private int currentStripe = -1;
-  private long rowBaseInStripe = 0;
-  private long rowCountInStripe = 0;
-  private final Map<StreamName, InStream> streams =
-      new HashMap<StreamName, InStream>();
-  DiskRangeList bufferChunks = null;
-  private final TreeReaderFactory.TreeReader reader;
-  private final OrcProto.RowIndex[] indexes;
-  private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
-  private final SargApplier sargApp;
-  // an array about which row groups aren't skipped
-  private boolean[] includedRowGroups = null;
-  private final DataReader dataReader;
+  private final VectorizedRowBatch batch;
+  private int rowInBatch;
+  private long baseRow;
 
-  /**
-   * Given a list of column names, find the given column and return the index.
-   *
-   * @param columnNames the list of potential column names
-   * @param columnName  the column name to look for
-   * @param rootColumn  offset the result with the rootColumn
-   * @return the column number or -1 if the column wasn't found
-   */
-  static int findColumns(String[] columnNames,
-                         String columnName,
-                         int rootColumn) {
-    for(int i=0; i < columnNames.length; ++i) {
-      if (columnName.equals(columnNames[i])) {
-        return i + rootColumn;
-      }
-    }
-    return -1;
+  protected RecordReaderImpl(ReaderImpl fileReader,
+                             Reader.Options options) throws IOException {
+    super(fileReader, options);
+    batch = this.schema.createRowBatch();
+    rowInBatch = 0;
   }
 
   /**
-   * Find the mapping from predicate leaves to columns.
-   * @param sargLeaves the search argument that we need to map
-   * @param columnNames the names of the columns
-   * @param rootColumn the offset of the top level row, which offsets the
-   *                   result
-   * @return an array mapping the sarg leaves to concrete column numbers
+   * If the current batch is empty, get a new one.
+   * @return true if we have rows available.
+   * @throws IOException
    */
-  public static int[] mapSargColumnsToOrcInternalColIdx(List<PredicateLeaf> sargLeaves,
-                             String[] columnNames,
-                             int rootColumn) {
-    int[] result = new int[sargLeaves.size()];
-    Arrays.fill(result, -1);
-    for(int i=0; i < result.length; ++i) {
-      String colName = sargLeaves.get(i).getColumnName();
-      result[i] = findColumns(columnNames, colName, rootColumn);
+  boolean ensureBatch() throws IOException {
+    if (rowInBatch >= batch.size) {
+      baseRow = super.getRowNumber();
+      rowInBatch = 0;
+      return super.nextBatch(batch);
     }
-    return result;
+    return true;
   }
 
-  protected RecordReaderImpl(ReaderImpl fileReader,
-                             Reader.Options options) throws IOException {
-    SchemaEvolution treeReaderSchema;
-    this.included = options.getInclude();
-    included[0] = true;
-    if (options.getSchema() == null) {
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Schema on read not provided -- using file schema " +
-            fileReader.getSchema());
-      }
-      treeReaderSchema = new SchemaEvolution(fileReader.getSchema(), included);
-    } else {
+  @Override
+  public long getRowNumber() {
+    return baseRow + rowInBatch;
+  }
 
-      // Now that we are creating a record reader for a file, validate that the schema to read
-      // is compatible with the file schema.
-      //
-      treeReaderSchema = new SchemaEvolution(fileReader.getSchema(),
-          options.getSchema(),
-          included);
-    }
-    this.path = fileReader.path;
-    this.codec = fileReader.codec;
-    this.types = fileReader.types;
-    this.bufferSize = fileReader.bufferSize;
-    this.rowIndexStride = fileReader.rowIndexStride;
-    FileSystem fileSystem = fileReader.fileSystem;
-    SearchArgument sarg = options.getSearchArgument();
-    if (sarg != null && rowIndexStride != 0) {
-      sargApp = new SargApplier(
-          sarg, options.getColumnNames(), rowIndexStride, types, included.length);
-    } else {
-      sargApp = null;
-    }
-    long rows = 0;
-    long skippedRows = 0;
-    long offset = options.getOffset();
-    long maxOffset = options.getMaxOffset();
-    for(StripeInformation stripe: fileReader.getStripes()) {
-      long stripeStart = stripe.getOffset();
-      if (offset > stripeStart) {
-        skippedRows += stripe.getNumberOfRows();
-      } else if (stripeStart < maxOffset) {
-        this.stripes.add(stripe);
-        rows += stripe.getNumberOfRows();
-      }
-    }
+  @Override
+  public boolean hasNext() throws IOException {
+    return ensureBatch();
+  }
 
-    Boolean zeroCopy = options.getUseZeroCopy();
-    if (zeroCopy == null) {
-      zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
-    }
-    if (options.getDataReader() == null) {
-      dataReader = RecordReaderUtils.createDefaultDataReader(
-          DataReaderProperties.builder()
-              .withBufferSize(bufferSize)
-              .withCompression(fileReader.compressionKind)
-              .withFileSystem(fileSystem)
-              .withPath(path)
-              .withTypeCount(types.size())
-              .withZeroCopy(zeroCopy)
-              .build());
+  @Override
+  public void seekToRow(long row) throws IOException {
+    if (row >= baseRow && row < baseRow + batch.size) {
+      rowInBatch = (int) (row - baseRow);
     } else {
-      dataReader = options.getDataReader();
+      super.seekToRow(row);
+      batch.size = 0;
+      ensureBatch();
     }
-    firstRow = skippedRows;
-    totalRowCount = rows;
-    Boolean skipCorrupt = options.getSkipCorruptRecords();
-    if (skipCorrupt == null) {
-      skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
-    }
-
-    reader = TreeReaderFactory.createTreeReader(treeReaderSchema.getReaderSchema(),
-        treeReaderSchema, included, skipCorrupt);
-    indexes = new OrcProto.RowIndex[types.size()];
-    bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
-    advanceToNextRow(reader, 0L, true);
   }
 
-  public static final class PositionProviderImpl implements PositionProvider {
-    private final OrcProto.RowIndexEntry entry;
-    private int index;
-
-    public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
-      this(entry, 0);
+  @Override
+  public Object next(Object previous) throws IOException {
+    if (!ensureBatch()) {
+      return null;
     }
-
-    public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
-      this.entry = entry;
-      this.index = startPos;
+    if (schema.getCategory() == TypeDescription.Category.STRUCT) {
+      OrcStruct result;
+      List<TypeDescription> children = schema.getChildren();
+      int numberOfChildren = children.size();
+      if (previous == null || previous.getClass() != OrcStruct.class) {
+        result = new OrcStruct(numberOfChildren);
+        previous = result;
+      } else {
+        result = (OrcStruct) previous;
+        if (result.getNumFields() != numberOfChildren) {
+          result.setNumFields(numberOfChildren);
+        }
+      }
+      for(int i=0; i < numberOfChildren; ++i) {
+        result.setFieldValue(i, nextValue(batch.cols[i], rowInBatch,
+            children.get(i), result.getFieldValue(i)));
+      }
+    } else {
+      previous = nextValue(batch.cols[0], rowInBatch, schema, previous);
     }
+    rowInBatch += 1;
+    return previous;
+  }
 
-    @Override
-    public long getNext() {
-      return entry.getPositions(index++);
+  public boolean nextBatch(VectorizedRowBatch theirBatch) throws IOException {
+    // If the user hasn't been reading by row, use the fast path.
+    if (rowInBatch >= batch.size) {
+      return super.nextBatch(theirBatch);
     }
+    copyIntoBatch(theirBatch, batch, rowInBatch);
+    rowInBatch += theirBatch.size;
+    return theirBatch.size > 0;
   }
 
-  OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
-    return dataReader.readStripeFooter(stripe);
+  @Override
+  public void close() throws IOException {
+    super.close();
+    // free the memory for the column vectors
+    batch.cols = null;
   }
 
-  enum Location {
-    BEFORE, MIN, MIDDLE, MAX, AFTER
-  }
+  /* Routines for stubbing into Writables */
 
-  /**
-   * Given a point and min and max, determine if the point is before, at the
-   * min, in the middle, at the max, or after the range.
-   * @param point the point to test
-   * @param min the minimum point
-   * @param max the maximum point
-   * @param <T> the type of the comparision
-   * @return the location of the point
-   */
-  static <T> Location compareToRange(Comparable<T> point, T min, T max) {
-    int minCompare = point.compareTo(min);
-    if (minCompare < 0) {
-      return Location.BEFORE;
-    } else if (minCompare == 0) {
-      return Location.MIN;
+  static BooleanWritable nextBoolean(ColumnVector vector,
+                                     int row,
+                                     Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-    int maxCompare = point.compareTo(max);
-    if (maxCompare > 0) {
-      return Location.AFTER;
-    } else if (maxCompare == 0) {
-      return Location.MAX;
+    if (vector.noNulls || !vector.isNull[row]) {
+      BooleanWritable result;
+      if (previous == null || previous.getClass() != BooleanWritable.class) {
+        result = new BooleanWritable();
+      } else {
+        result = (BooleanWritable) previous;
+      }
+      result.set(((LongColumnVector) vector).vector[row] != 0);
+      return result;
+    } else {
+      return null;
     }
-    return Location.MIDDLE;
   }
 
-  /**
-   * Get the maximum value out of an index entry.
-   * @param index
-   *          the index entry
-   * @return the object for the maximum value or null if there isn't one
-   */
-  static Object getMax(ColumnStatistics index) {
-    if (index instanceof IntegerColumnStatistics) {
-      return ((IntegerColumnStatistics) index).getMaximum();
-    } else if (index instanceof DoubleColumnStatistics) {
-      return ((DoubleColumnStatistics) index).getMaximum();
-    } else if (index instanceof StringColumnStatistics) {
-      return ((StringColumnStatistics) index).getMaximum();
-    } else if (index instanceof DateColumnStatistics) {
-      return ((DateColumnStatistics) index).getMaximum();
-    } else if (index instanceof DecimalColumnStatistics) {
-      return ((DecimalColumnStatistics) index).getMaximum();
-    } else if (index instanceof TimestampColumnStatistics) {
-      return ((TimestampColumnStatistics) index).getMaximum();
-    } else if (index instanceof BooleanColumnStatistics) {
-      if (((BooleanColumnStatistics)index).getTrueCount()!=0) {
-        return Boolean.TRUE;
+  static ByteWritable nextByte(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      ByteWritable result;
+      if (previous == null || previous.getClass() != ByteWritable.class) {
+        result = new ByteWritable();
       } else {
-        return Boolean.FALSE;
+        result = (ByteWritable) previous;
       }
+      result.set((byte) ((LongColumnVector) vector).vector[row]);
+      return result;
     } else {
       return null;
     }
   }
 
-  /**
-   * Get the minimum value out of an index entry.
-   * @param index
-   *          the index entry
-   * @return the object for the minimum value or null if there isn't one
-   */
-  static Object getMin(ColumnStatistics index) {
-    if (index instanceof IntegerColumnStatistics) {
-      return ((IntegerColumnStatistics) index).getMinimum();
-    } else if (index instanceof DoubleColumnStatistics) {
-      return ((DoubleColumnStatistics) index).getMinimum();
-    } else if (index instanceof StringColumnStatistics) {
-      return ((StringColumnStatistics) index).getMinimum();
-    } else if (index instanceof DateColumnStatistics) {
-      return ((DateColumnStatistics) index).getMinimum();
-    } else if (index instanceof DecimalColumnStatistics) {
-      return ((DecimalColumnStatistics) index).getMinimum();
-    } else if (index instanceof TimestampColumnStatistics) {
-      return ((TimestampColumnStatistics) index).getMinimum();
-    } else if (index instanceof BooleanColumnStatistics) {
-      if (((BooleanColumnStatistics)index).getFalseCount()!=0) {
-        return Boolean.FALSE;
+  static ShortWritable nextShort(ColumnVector vector,
+                                 int row,
+                                 Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      ShortWritable result;
+      if (previous == null || previous.getClass() != ShortWritable.class) {
+        result = new ShortWritable();
       } else {
-        return Boolean.TRUE;
+        result = (ShortWritable) previous;
       }
+      result.set((short) ((LongColumnVector) vector).vector[row]);
+      return result;
     } else {
-      return UNKNOWN_VALUE; // null is not safe here
+      return null;
     }
   }
 
-  /**
-   * Evaluate a predicate with respect to the statistics from the column
-   * that is referenced in the predicate.
-   * @param statsProto the statistics for the column mentioned in the predicate
-   * @param predicate the leaf predicate we need to evaluation
-   * @param bloomFilter
-   * @return the set of truth values that may be returned for the given
-   *   predicate.
-   */
-  static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
-      PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) {
-    ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto);
-    Object minValue = getMin(cs);
-    Object maxValue = getMax(cs);
-    BloomFilterIO bf = null;
-    if (bloomFilter != null) {
-      bf = new BloomFilterIO(bloomFilter);
+  static IntWritable nextInt(ColumnVector vector,
+                             int row,
+                             Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-    return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), bf);
-  }
-
-  /**
-   * Evaluate a predicate with respect to the statistics from the column
-   * that is referenced in the predicate.
-   * @param stats the statistics for the column mentioned in the predicate
-   * @param predicate the leaf predicate we need to evaluation
-   * @return the set of truth values that may be returned for the given
-   *   predicate.
-   */
-  static TruthValue evaluatePredicate(ColumnStatistics stats,
-      PredicateLeaf predicate, BloomFilterIO bloomFilter) {
-    Object minValue = getMin(stats);
-    Object maxValue = getMax(stats);
-    return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter);
-  }
-
-  static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
-      Object max, boolean hasNull, BloomFilterIO bloomFilter) {
-    // if we didn't have any values, everything must have been null
-    if (min == null) {
-      if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
-        return TruthValue.YES;
+    if (vector.noNulls || !vector.isNull[row]) {
+      IntWritable result;
+      if (previous == null || previous.getClass() != IntWritable.class) {
+        result = new IntWritable();
       } else {
-        return TruthValue.NULL;
+        result = (IntWritable) previous;
       }
-    } else if (min == UNKNOWN_VALUE) {
-      return TruthValue.YES_NO_NULL;
+      result.set((int) ((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
     }
+  }
 
-    TruthValue result;
-    Object baseObj = predicate.getLiteral();
-    try {
-      // Predicate object and stats objects are converted to the type of the predicate object.
-      Object minValue = getBaseObjectForComparison(predicate.getType(), min);
-      Object maxValue = getBaseObjectForComparison(predicate.getType(), max);
-      Object predObj = getBaseObjectForComparison(predicate.getType(), baseObj);
-
-      result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull);
-      if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) {
-        result = evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull);
-      }
-      // in case failed conversion, return the default YES_NO_NULL truth value
-    } catch (Exception e) {
-      if (LOG.isWarnEnabled()) {
-        final String statsType = min == null ?
-            (max == null ? "null" : max.getClass().getSimpleName()) :
-            min.getClass().getSimpleName();
-        final String predicateType = baseObj == null ? "null" : baseObj.getClass().getSimpleName();
-        final String reason = e.getClass().getSimpleName() + " when evaluating predicate." +
-            " Skipping ORC PPD." +
-            " Exception: " + e.getMessage() +
-            " StatsType: " + statsType +
-            " PredicateType: " + predicateType;
-        LOG.warn(reason);
-        LOG.debug(reason, e);
-      }
-      if (predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS) || !hasNull) {
-        result = TruthValue.YES_NO;
+  static LongWritable nextLong(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      LongWritable result;
+      if (previous == null || previous.getClass() != LongWritable.class) {
+        result = new LongWritable();
       } else {
-        result = TruthValue.YES_NO_NULL;
+        result = (LongWritable) previous;
       }
+      result.set(((LongColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
     }
-    return result;
   }
 
-  private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate,
-      TruthValue result, BloomFilterIO bloomFilter) {
-    // evaluate bloom filter only when
-    // 1) Bloom filter is available
-    // 2) Min/Max evaluation yield YES or MAYBE
-    // 3) Predicate is EQUALS or IN list
-    if (bloomFilter != null
-        && result != TruthValue.NO_NULL && result != TruthValue.NO
-        && (predicate.getOperator().equals(PredicateLeaf.Operator.EQUALS)
-            || predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS)
-            || predicate.getOperator().equals(PredicateLeaf.Operator.IN))) {
-      return true;
+  static FloatWritable nextFloat(ColumnVector vector,
+                                 int row,
+                                 Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-    return false;
-  }
-
-  private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Object predObj,
-      Object minValue,
-      Object maxValue,
-      boolean hasNull) {
-    Location loc;
-
-    switch (predicate.getOperator()) {
-      case NULL_SAFE_EQUALS:
-        loc = compareToRange((Comparable) predObj, minValue, maxValue);
-        if (loc == Location.BEFORE || loc == Location.AFTER) {
-          return TruthValue.NO;
-        } else {
-          return TruthValue.YES_NO;
-        }
-      case EQUALS:
-        loc = compareToRange((Comparable) predObj, minValue, maxValue);
-        if (minValue.equals(maxValue) && loc == Location.MIN) {
-          return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
-        } else if (loc == Location.BEFORE || loc == Location.AFTER) {
-          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
-        } else {
-          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
-        }
-      case LESS_THAN:
-        loc = compareToRange((Comparable) predObj, minValue, maxValue);
-        if (loc == Location.AFTER) {
-          return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
-        } else if (loc == Location.BEFORE || loc == Location.MIN) {
-          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
-        } else {
-          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
-        }
-      case LESS_THAN_EQUALS:
-        loc = compareToRange((Comparable) predObj, minValue, maxValue);
-        if (loc == Location.AFTER || loc == Location.MAX) {
-          return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
-        } else if (loc == Location.BEFORE) {
-          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
-        } else {
-          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
-        }
-      case IN:
-        if (minValue.equals(maxValue)) {
-          // for a single value, look through to see if that value is in the
-          // set
-          for (Object arg : predicate.getLiteralList()) {
-            predObj = getBaseObjectForComparison(predicate.getType(), arg);
-            loc = compareToRange((Comparable) predObj, minValue, maxValue);
-            if (loc == Location.MIN) {
-              return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
-            }
-          }
-          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
-        } else {
-          // are all of the values outside of the range?
-          for (Object arg : predicate.getLiteralList()) {
-            predObj = getBaseObjectForComparison(predicate.getType(), arg);
-            loc = compareToRange((Comparable) predObj, minValue, maxValue);
-            if (loc == Location.MIN || loc == Location.MIDDLE ||
-                loc == Location.MAX) {
-              return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
-            }
-          }
-          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
-        }
-      case BETWEEN:
-        List<Object> args = predicate.getLiteralList();
-        Object predObj1 = getBaseObjectForComparison(predicate.getType(), args.get(0));
-
-        loc = compareToRange((Comparable) predObj1, minValue, maxValue);
-        if (loc == Location.BEFORE || loc == Location.MIN) {
-          Object predObj2 = getBaseObjectForComparison(predicate.getType(), args.get(1));
-
-          Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue);
-          if (loc2 == Location.AFTER || loc2 == Location.MAX) {
-            return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
-          } else if (loc2 == Location.BEFORE) {
-            return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
-          } else {
-            return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
-          }
-        } else if (loc == Location.AFTER) {
-          return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
-        } else {
-          return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
-        }
-      case IS_NULL:
-        // min = null condition above handles the all-nulls YES case
-        return hasNull ? TruthValue.YES_NO : TruthValue.NO;
-      default:
-        return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+    if (vector.noNulls || !vector.isNull[row]) {
+      FloatWritable result;
+      if (previous == null || previous.getClass() != FloatWritable.class) {
+        result = new FloatWritable();
+      } else {
+        result = (FloatWritable) previous;
+      }
+      result.set((float) ((DoubleColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
     }
   }
 
-  private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate,
-      final Object predObj, BloomFilterIO bloomFilter, boolean hasNull) {
-    switch (predicate.getOperator()) {
-      case NULL_SAFE_EQUALS:
-        // null safe equals does not return *_NULL variant. So set hasNull to false
-        return checkInBloomFilter(bloomFilter, predObj, false);
-      case EQUALS:
-        return checkInBloomFilter(bloomFilter, predObj, hasNull);
-      case IN:
-        for (Object arg : predicate.getLiteralList()) {
-          // if atleast one value in IN list exist in bloom filter, qualify the row group/stripe
-          Object predObjItem = getBaseObjectForComparison(predicate.getType(), arg);
-          TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull);
-          if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) {
-            return result;
-          }
-        }
-        return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
-      default:
-        return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
+  static DoubleWritable nextDouble(ColumnVector vector,
+                                   int row,
+                                   Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-  }
-
-  private static TruthValue checkInBloomFilter(BloomFilterIO bf, Object predObj, boolean hasNull) {
-    TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
-
-    if (predObj instanceof Long) {
-      if (bf.testLong(((Long) predObj).longValue())) {
-        result = TruthValue.YES_NO_NULL;
-      }
-    } else if (predObj instanceof Double) {
-      if (bf.testDouble(((Double) predObj).doubleValue())) {
-        result = TruthValue.YES_NO_NULL;
-      }
-    } else if (predObj instanceof String || predObj instanceof Text ||
-        predObj instanceof HiveDecimalWritable ||
-        predObj instanceof BigDecimal) {
-      if (bf.testString(predObj.toString())) {
-        result = TruthValue.YES_NO_NULL;
-      }
-    } else if (predObj instanceof Timestamp) {
-      if (bf.testLong(((Timestamp) predObj).getTime())) {
-        result = TruthValue.YES_NO_NULL;
-      }
-    } else if (predObj instanceof TimestampWritable) {
-      if (bf.testLong(((TimestampWritable) predObj).getTimestamp().getTime())) {
-        result = TruthValue.YES_NO_NULL;
-      }
-    } else if (predObj instanceof Date) {
-      if (bf.testLong(DateWritable.dateToDays((Date) predObj))) {
-        result = TruthValue.YES_NO_NULL;
+    if (vector.noNulls || !vector.isNull[row]) {
+      DoubleWritable result;
+      if (previous == null || previous.getClass() != DoubleWritable.class) {
+        result = new DoubleWritable();
+      } else {
+        result = (DoubleWritable) previous;
       }
+      result.set(((DoubleColumnVector) vector).vector[row]);
+      return result;
     } else {
-        // if the predicate object is null and if hasNull says there are no nulls then return NO
-        if (predObj == null && !hasNull) {
-          result = TruthValue.NO;
-        } else {
-          result = TruthValue.YES_NO_NULL;
-        }
-      }
-
-    if (result == TruthValue.YES_NO_NULL && !hasNull) {
-      result = TruthValue.YES_NO;
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Bloom filter evaluation: " + result.toString());
+      return null;
     }
-
-    return result;
   }
 
-  private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object obj) {
-    if (obj == null) {
-      return null;
+  static Text nextString(ColumnVector vector,
+                         int row,
+                         Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-    switch (type) {
-      case BOOLEAN:
-        if (obj instanceof Boolean) {
-          return obj;
-        } else {
-          // will only be true if the string conversion yields "true", all other values are
-          // considered false
-          return Boolean.valueOf(obj.toString());
-        }
-      case DATE:
-        if (obj instanceof Date) {
-          return obj;
-        } else if (obj instanceof String) {
-          return Date.valueOf((String) obj);
-        } else if (obj instanceof Timestamp) {
-          return DateWritable.timeToDate(((Timestamp) obj).getTime() / 1000L);
-        }
-        // always string, but prevent the comparison to numbers (are they days/seconds/milliseconds?)
-        break;
-      case DECIMAL:
-        if (obj instanceof Boolean) {
-          return new HiveDecimalWritable(((Boolean) obj).booleanValue() ?
-              HiveDecimal.ONE : HiveDecimal.ZERO);
-        } else if (obj instanceof Integer) {
-          return new HiveDecimalWritable(((Integer) obj).intValue());
-        } else if (obj instanceof Long) {
-          return new HiveDecimalWritable(((Long) obj));
-        } else if (obj instanceof Float || obj instanceof Double ||
-            obj instanceof String) {
-          return new HiveDecimalWritable(obj.toString());
-        } else if (obj instanceof BigDecimal) {
-          return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) obj));
-        } else if (obj instanceof HiveDecimal) {
-          return new HiveDecimalWritable((HiveDecimal) obj);
-        } else if (obj instanceof HiveDecimalWritable) {
-          return obj;
-        } else if (obj instanceof Timestamp) {
-          return new HiveDecimalWritable(
-              new Double(new TimestampWritable((Timestamp) obj).getDouble()).toString());
-        }
-        break;
-      case FLOAT:
-        if (obj instanceof Number) {
-          // widening conversion
-          return ((Number) obj).doubleValue();
-        } else if (obj instanceof HiveDecimal) {
-          return ((HiveDecimal) obj).doubleValue();
-        } else if (obj instanceof String) {
-          return Double.valueOf(obj.toString());
-        } else if (obj instanceof Timestamp) {
-          return new TimestampWritable((Timestamp)obj).getDouble();
-        } else if (obj instanceof HiveDecimal) {
-          return ((HiveDecimal) obj).doubleValue();
-        } else if (obj instanceof BigDecimal) {
-          return ((BigDecimal) obj).doubleValue();
-        }
-        break;
-      case LONG:
-        if (obj instanceof Number) {
-          // widening conversion
-          return ((Number) obj).longValue();
-        } else if (obj instanceof HiveDecimal) {
-          return ((HiveDecimal) obj).longValue();
-        } else if (obj instanceof String) {
-          return Long.valueOf(obj.toString());
-        }
-        break;
-      case STRING:
-        if (obj != null) {
-          return (obj.toString());
-        }
-        break;
-      case TIMESTAMP:
-        if (obj instanceof Timestamp) {
-          return obj;
-        } else if (obj instanceof Integer) {
-          return TimestampWritable.longToTimestamp(((Number) obj).longValue(), false);
-        } else if (obj instanceof Float) {
-          return TimestampWritable.doubleToTimestamp(((Float) obj).doubleValue());
-        } else if (obj instanceof Double) {
-          return TimestampWritable.doubleToTimestamp(((Double) obj).doubleValue());
-        } else if (obj instanceof HiveDecimal) {
-          return TimestampWritable.decimalToTimestamp((HiveDecimal) obj);
-        } else if (obj instanceof HiveDecimalWritable) {
-          return TimestampWritable.decimalToTimestamp(((HiveDecimalWritable) obj).getHiveDecimal());
-        } else if (obj instanceof Date) {
-          return new Timestamp(((Date) obj).getTime());
-        }
-        // float/double conversion to timestamp is interpreted as seconds whereas integer conversion
-        // to timestamp is interpreted as milliseconds by default. The integer to timestamp casting
-        // is also config driven. The filter operator changes its promotion based on config:
-        // "int.timestamp.conversion.in.seconds". Disable PPD for integer cases.
-        break;
-      default:
-        break;
+    if (vector.noNulls || !vector.isNull[row]) {
+      Text result;
+      if (previous == null || previous.getClass() != Text.class) {
+        result = new Text();
+      } else {
+        result = (Text) previous;
+      }
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+      return result;
+    } else {
+      return null;
     }
-
-    throw new IllegalArgumentException(String.format(
-        "ORC SARGS could not convert from %s to %s", obj == null ? "(null)" : obj.getClass()
-            .getSimpleName(), type));
   }
 
-  public static class SargApplier {
-    public final static boolean[] READ_ALL_RGS = null;
-    public final static boolean[] READ_NO_RGS = new boolean[0];
-
-    private final SearchArgument sarg;
-    private final List<PredicateLeaf> sargLeaves;
-    private final int[] filterColumns;
-    private final long rowIndexStride;
-    // same as the above array, but indices are set to true
-    private final boolean[] sargColumns;
-
-    public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride,
-        List<OrcProto.Type> types, int includedCount) {
-      this.sarg = sarg;
-      sargLeaves = sarg.getLeaves();
-      filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, columnNames, 0);
-      this.rowIndexStride = rowIndexStride;
-      // included will not be null, row options will fill the array with trues if null
-      sargColumns = new boolean[includedCount];
-      for (int i : filterColumns) {
-        // filter columns may have -1 as index which could be partition column in SARG.
-        if (i > 0) {
-          sargColumns[i] = true;
-        }
-      }
+  static HiveCharWritable nextChar(ColumnVector vector,
+                                   int row,
+                                   int size,
+                                   Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-
-    /**
-     * Pick the row groups that we need to load from the current stripe.
-     *
-     * @return an array with a boolean for each row group or null if all of the
-     * row groups must be read.
-     * @throws IOException
-     */
-    public boolean[] pickRowGroups(StripeInformation stripe, OrcProto.RowIndex[] indexes,
-        OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone) throws IOException {
-      long rowsInStripe = stripe.getNumberOfRows();
-      int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
-      boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
-      TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
-      boolean hasSelected = false, hasSkipped = false;
-      for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
-        for (int pred = 0; pred < leafValues.length; ++pred) {
-          int columnIx = filterColumns[pred];
-          if (columnIx != -1) {
-            if (indexes[columnIx] == null) {
-              throw new AssertionError("Index is not populated for " + columnIx);
-            }
-            OrcProto.RowIndexEntry entry = indexes[columnIx].getEntry(rowGroup);
-            if (entry == null) {
-              throw new AssertionError("RG is not populated for " + columnIx + " rg " + rowGroup);
-            }
-            OrcProto.ColumnStatistics stats = entry.getStatistics();
-            OrcProto.BloomFilter bf = null;
-            if (bloomFilterIndices != null && bloomFilterIndices[filterColumns[pred]] != null) {
-              bf = bloomFilterIndices[filterColumns[pred]].getBloomFilter(rowGroup);
-            }
-            leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Stats = " + stats);
-              LOG.trace("Setting " + sargLeaves.get(pred) + " to " + leafValues[pred]);
-            }
-          } else {
-            // the column is a virtual column
-            leafValues[pred] = TruthValue.YES_NO_NULL;
-          }
-        }
-        result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
-        hasSelected = hasSelected || result[rowGroup];
-        hasSkipped = hasSkipped || (!result[rowGroup]);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
-              (rowIndexStride * (rowGroup + 1) - 1) + " is " +
-              (result[rowGroup] ? "" : "not ") + "included.");
-        }
+    if (vector.noNulls || !vector.isNull[row]) {
+      HiveCharWritable result;
+      if (previous == null || previous.getClass() != HiveCharWritable.class) {
+        result = new HiveCharWritable();
+      } else {
+        result = (HiveCharWritable) previous;
       }
-
-      return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS;
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.toString(row), size);
+      return result;
+    } else {
+      return null;
     }
   }
 
-  /**
-   * Pick the row groups that we need to load from the current stripe.
-   *
-   * @return an array with a boolean for each row group or null if all of the
-   * row groups must be read.
-   * @throws IOException
-   */
-  protected boolean[] pickRowGroups() throws IOException {
-    // if we don't have a sarg or indexes, we read everything
-    if (sargApp == null) {
+  static HiveVarcharWritable nextVarchar(ColumnVector vector,
+                                         int row,
+                                         int size,
+                                         Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      HiveVarcharWritable result;
+      if (previous == null || previous.getClass() != HiveVarcharWritable.class) {
+        result = new HiveVarcharWritable();
+      } else {
+        result = (HiveVarcharWritable) previous;
+      }
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.toString(row), size);
+      return result;
+    } else {
       return null;
     }
-    readRowIndex(currentStripe, included, sargApp.sargColumns);
-    return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
   }
 
-  private void clearStreams()  {
-    // explicit close of all streams to de-ref ByteBuffers
-    for (InStream is : streams.values()) {
-      is.close();
+  static BytesWritable nextBinary(ColumnVector vector,
+                                  int row,
+                                  Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-    if (bufferChunks != null) {
-      if (dataReader.isTrackingDiskRanges()) {
-        for (DiskRangeList range = bufferChunks; range != null; range = range.next) {
-          if (!(range instanceof BufferChunk)) {
-            continue;
-          }
-          dataReader.releaseBuffer(((BufferChunk) range).getChunk());
-        }
+    if (vector.noNulls || !vector.isNull[row]) {
+      BytesWritable result;
+      if (previous == null || previous.getClass() != BytesWritable.class) {
+        result = new BytesWritable();
+      } else {
+        result = (BytesWritable) previous;
       }
+      BytesColumnVector bytes = (BytesColumnVector) vector;
+      result.set(bytes.vector[row], bytes.start[row], bytes.length[row]);
+      return result;
+    } else {
+      return null;
     }
-    bufferChunks = null;
-    streams.clear();
   }
 
-  /**
-   * Read the current stripe into memory.
-   *
-   * @throws IOException
-   */
-  private void readStripe() throws IOException {
-    StripeInformation stripe = beginReadStripe();
-    includedRowGroups = pickRowGroups();
-
-    // move forward to the first unskipped row
-    if (includedRowGroups != null) {
-      while (rowInStripe < rowCountInStripe &&
-          !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
-        rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
-      }
+  static HiveDecimalWritable nextDecimal(ColumnVector vector,
+                                         int row,
+                                         Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-
-    // if we haven't skipped the whole stripe, read the data
-    if (rowInStripe < rowCountInStripe) {
-      // if we aren't projecting columns or filtering rows, just read it all
-      if (included == null && includedRowGroups == null) {
-        readAllDataStreams(stripe);
+    if (vector.noNulls || !vector.isNull[row]) {
+      HiveDecimalWritable result;
+      if (previous == null || previous.getClass() != HiveDecimalWritable.class) {
+        result = new HiveDecimalWritable();
       } else {
-        readPartialDataStreams(stripe);
-      }
-      reader.startStripe(streams, stripeFooter);
-      // if we skipped the first row group, move the pointers forward
-      if (rowInStripe != 0) {
-        seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
+        result = (HiveDecimalWritable) previous;
       }
+      result.set(((DecimalColumnVector) vector).vector[row]);
+      return result;
+    } else {
+      return null;
     }
   }
 
-  private StripeInformation beginReadStripe() throws IOException {
-    StripeInformation stripe = stripes.get(currentStripe);
-    stripeFooter = readStripeFooter(stripe);
-    clearStreams();
-    // setup the position in the stripe
-    rowCountInStripe = stripe.getNumberOfRows();
-    rowInStripe = 0;
-    rowBaseInStripe = 0;
-    for (int i = 0; i < currentStripe; ++i) {
-      rowBaseInStripe += stripes.get(i).getNumberOfRows();
+  static DateWritable nextDate(ColumnVector vector,
+                               int row,
+                               Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-    // reset all of the indexes
-    for (int i = 0; i < indexes.length; ++i) {
-      indexes[i] = null;
+    if (vector.noNulls || !vector.isNull[row]) {
+      DateWritable result;
+      if (previous == null || previous.getClass() != DateWritable.class) {
+        result = new DateWritable();
+      } else {
+        result = (DateWritable) previous;
+      }
+      int date = (int) ((LongColumnVector) vector).vector[row];
+      result.set(date);
+      return result;
+    } else {
+      return null;
     }
-    return stripe;
   }
 
-  private void readAllDataStreams(StripeInformation stripe) throws IOException {
-    long start = stripe.getIndexLength();
-    long end = start + stripe.getDataLength();
-    // explicitly trigger 1 big read
-    DiskRangeList toRead = new DiskRangeList(start, end);
-    bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
-    List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
-    createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
-  }
-
-  /**
-   * Plan the ranges of the file that we need to read given the list of
-   * columns and row groups.
-   *
-   * @param streamList        the list of streams available
-   * @param indexes           the indexes that have been loaded
-   * @param includedColumns   which columns are needed
-   * @param includedRowGroups which row groups are needed
-   * @param isCompressed      does the file have generic compression
-   * @param encodings         the encodings for each column
-   * @param types             the types of the columns
-   * @param compressionSize   the compression block size
-   * @return the list of disk ranges that will be loaded
-   */
-  static DiskRangeList planReadPartialDataStreams
-  (List<OrcProto.Stream> streamList,
-      OrcProto.RowIndex[] indexes,
-      boolean[] includedColumns,
-      boolean[] includedRowGroups,
-      boolean isCompressed,
-      List<OrcProto.ColumnEncoding> encodings,
-      List<OrcProto.Type> types,
-      int compressionSize,
-      boolean doMergeBuffers) {
-    long offset = 0;
-    // figure out which columns have a present stream
-    boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
-    CreateHelper list = new CreateHelper();
-    for (OrcProto.Stream stream : streamList) {
-      long length = stream.getLength();
-      int column = stream.getColumn();
-      OrcProto.Stream.Kind streamKind = stream.getKind();
-      // since stream kind is optional, first check if it exists
-      if (stream.hasKind() &&
-          (StreamName.getArea(streamKind) == StreamName.Area.DATA) &&
-          (column < includedColumns.length && includedColumns[column])) {
-        // if we aren't filtering or it is a dictionary, load it.
-        if (includedRowGroups == null
-            || RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) {
-          RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers);
-        } else {
-          RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups,
-              isCompressed, indexes[column], encodings.get(column), types.get(column),
-              compressionSize, hasNull[column], offset, length, list, doMergeBuffers);
-        }
+  static TimestampWritable nextTimestamp(ColumnVector vector,
+                                         int row,
+                                         Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      TimestampWritable result;
+      if (previous == null || previous.getClass() != TimestampWritable.class) {
+        result = new TimestampWritable();
+      } else {
+        result = (TimestampWritable) previous;
       }
-      offset += length;
+      TimestampColumnVector tcv = (TimestampColumnVector) vector;
+      result.setInternal(tcv.time[row], tcv.nanos[row]);
+      return result;
+    } else {
+      return null;
     }
-    return list.extract();
   }
 
-  void createStreams(List<OrcProto.Stream> streamDescriptions,
-      DiskRangeList ranges,
-      boolean[] includeColumn,
-      CompressionCodec codec,
-      int bufferSize,
-      Map<StreamName, InStream> streams) throws IOException {
-    long streamOffset = 0;
-    for (OrcProto.Stream streamDesc : streamDescriptions) {
-      int column = streamDesc.getColumn();
-      if ((includeColumn != null &&
-          (column < included.length && !includeColumn[column])) ||
-          streamDesc.hasKind() &&
-              (StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) {
-        streamOffset += streamDesc.getLength();
-        continue;
+  static OrcStruct nextStruct(ColumnVector vector,
+                              int row,
+                              TypeDescription schema,
+                              Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
+    }
+    if (vector.noNulls || !vector.isNull[row]) {
+      OrcStruct result;
+      List<TypeDescription> childrenTypes = schema.getChildren();
+      int numChildren = childrenTypes.size();
+      if (previous == null || previous.getClass() != OrcStruct.class) {
+        result = new OrcStruct(numChildren);
+      } else {
+        result = (OrcStruct) previous;
+        result.setNumFields(numChildren);
+      }
+      StructColumnVector struct = (StructColumnVector) vector;
+      for(int f=0; f < numChildren; ++f) {
+        result.setFieldValue(f, nextValue(struct.fields[f], row,
+            childrenTypes.get(f), result.getFieldValue(f)));
       }
-      List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
-          ranges, streamOffset, streamDesc.getLength());
-      StreamName name = new StreamName(column, streamDesc.getKind());
-      streams.put(name, InStream.create(name.toString(), buffers,
-          streamDesc.getLength(), codec, bufferSize));
-      streamOffset += streamDesc.getLength();
+      return result;
+    } else {
+      return null;
     }
   }
 
-  private void readPartialDataStreams(StripeInformation stripe) throws IOException {
-    List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
-    DiskRangeList toRead = planReadPartialDataStreams(streamList,
-        indexes, included, includedRowGroups, codec != null,
-        stripeFooter.getColumnsList(), types, bufferSize, true);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
+  static OrcUnion nextUnion(ColumnVector vector,
+                            int row,
+                            TypeDescription schema,
+                            Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-    bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
+    if (vector.noNulls || !vector.isNull[row]) {
+      OrcUnion result;
+      List<TypeDescription> childrenTypes = schema.getChildren();
+      if (previous == null || previous.getClass() != OrcUnion.class) {
+        result = new OrcUnion();
+      } else {
+        result = (OrcUnion) previous;
+      }
+      UnionColumnVector union = (UnionColumnVector) vector;
+      byte tag = (byte) union.tags[row];
+      result.set(tag, nextValue(union.fields[tag], row, childrenTypes.get(tag),
+          result.getObject()));
+      return result;
+    } else {
+      return null;
     }
-
-    createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
-  }
-
-  @Override
-  public boolean hasNext() throws IOException {
-    return rowInStripe < rowCountInStripe;
   }
 
-  /**
-   * Read the next stripe until we find a row that we don't skip.
-   *
-   * @throws IOException
-   */
-  private void advanceStripe() throws IOException {
-    rowInStripe = rowCountInStripe;
-    while (rowInStripe >= rowCountInStripe &&
-        currentStripe < stripes.size() - 1) {
-      currentStripe += 1;
-      readStripe();
+  static ArrayList<Object> nextList(ColumnVector vector,
+                                    int row,
+                                    TypeDescription schema,
+                                    Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-  }
-
-  /**
-   * Skip over rows that we aren't selecting, so that the next row is
-   * one that we will read.
-   *
-   * @param nextRow the row we want to go to
-   * @throws IOException
-   */
-  private boolean advanceToNextRow(
-      TreeReaderFactory.TreeReader reader, long nextRow, boolean canAdvanceStripe)
-      throws IOException {
-    long nextRowInStripe = nextRow - rowBaseInStripe;
-    // check for row skipping
-    if (rowIndexStride != 0 &&
-        includedRowGroups != null &&
-        nextRowInStripe < rowCountInStripe) {
-      int rowGroup = (int) (nextRowInStripe / rowIndexStride);
-      if (!includedRowGroups[rowGroup]) {
-        while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) {
-          rowGroup += 1;
-        }
-        if (rowGroup >= includedRowGroups.length) {
-          if (canAdvanceStripe) {
-            advanceStripe();
-          }
-          return canAdvanceStripe;
+    if (vector.noNulls || !vector.isNull[row]) {
+      ArrayList<Object> result;
+      if (previous == null || previous.getClass() != ArrayList.class) {
+        result = new ArrayList<>();
+      } else {
+        result = (ArrayList<Object>) previous;
+      }
+      ListColumnVector list = (ListColumnVector) vector;
+      int length = (int) list.lengths[row];
+      int offset = (int) list.offsets[row];
+      result.ensureCapacity(length);
+      int oldLength = result.size();
+      int idx = 0;
+      TypeDescription childType = schema.getChildren().get(0);
+      while (idx < length && idx < oldLength) {
+        result.set(idx, nextValue(list.child, offset + idx, childType,
+            result.get(idx)));
+        idx += 1;
+      }
+      if (length < oldLength) {
+        result.subList(length,result.size()).clear();
+      } else if (oldLength < length) {
+        while (idx < length) {
+          result.add(nextValue(list.child, offset + idx, childType, null));
+          idx += 1;
         }
-        nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
       }
+      return result;
+    } else {
+      return null;
     }
-    if (nextRowInStripe >= rowCountInStripe) {
-      if (canAdvanceStripe) {
-        advanceStripe();
-      }
-      return canAdvanceStripe;
+  }
+
+  static HashMap<Object,Object> nextMap(ColumnVector vector,
+                                        int row,
+                                        TypeDescription schema,
+                                        Object previous) {
+    if (vector.isRepeating) {
+      row = 0;
     }
-    if (nextRowInStripe != rowInStripe) {
-      if (rowIndexStride != 0) {
-        int rowGroup = (int) (nextRowInStripe / rowIndexStride);
-        seekToRowEntry(reader, rowGroup);
-        reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+    if (vector.noNulls || !vector.isNull[row]) {
+      MapColumnVector map = (MapColumnVector) vector;
+      int length = (int) map.lengths[row];
+      int offset = (int) map.offsets[row];
+      TypeDescription keyType = schema.getChildren().get(0);
+      TypeDescription valueType = schema.getChildren().get(1);
+      HashMap<Object,Object> result;
+      if (previous == null || previous.getClass() != HashMap.class) {
+        result = new HashMap<Object,Object>(length);
       } else {
-        reader.skipRows(nextRowInStripe - rowInStripe);
+        result = (HashMap<Object,Object>) previous;
+        // I couldn't think of a good way to reuse the keys and value objects
+        // without even more allocations, so take the easy and safe approach.
+        result.clear();
       }
-      rowInStripe = nextRowInStripe;
+      for(int e=0; e < length; ++e) {
+        result.put(nextValue(map.keys, e + offset, keyType, null),
+                   nextValue(map.values, e + offset, valueType, null));
+      }
+      return result;
+    } else {
+      return null;
     }
-    return true;
   }
 
-  @Override
-  public Object next(Object previous) throws IOException {
-    try {
-      final Object result = reader.next(previous);
-      // find the next row
-      rowInStripe += 1;
-      advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
-      return result;
-    } catch (IOException e) {
-      // Rethrow exception with file name in log message
-      throw new IOException("Error reading file: " + path, e);
+  static Object nextValue(ColumnVector vector,
+                          int row,
+                          TypeDescription schema,
+                          Object previous) {
+    switch (schema.getCategory()) {
+      case BOOLEAN:
+        return nextBoolean(vector, row, previous);
+      case BYTE:
+        return nextByte(vector, row, previous);
+      case SHORT:
+        return nextShort(vector, row, previous);
+      case INT:
+        return nextInt(vector, row, previous);
+      case LONG:
+        return nextLong(vector, row, previous);
+      case FLOAT:
+        return nextFloat(vector, row, previous);
+      case DOUBLE:
+        return nextDouble(vector, row, previous);
+      case STRING:
+        return nextString(vector, row, previous);
+      case CHAR:
+        return nextChar(vector, row, schema.getMaxLength(), previous);
+      case VARCHAR:
+        return nextVarchar(vector, row, schema.getMaxLength(), previous);
+      case BINARY:
+        return nextBinary(vector, row, previous);
+      case DECIMAL:
+        return nextDecimal(vector, row, previous);
+      case DATE:
+        return nextDate(vector, row, previous);
+      case TIMESTAMP:
+        return nextTimestamp(vector, row, previous);
+      case STRUCT:
+        return nextStruct(vector, row, schema, previous);
+      case UNION:
+        return nextUnion(vector, row, schema, previous);
+      case LIST:
+        return nextList(vector, row, schema, previous);
+      case MAP:
+        return nextMap(vector, row, schema, previous);
+      default:
+        throw new IllegalArgumentException("Unknown type " + schema);
     }
   }
 
-  @Override
-  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
-    try {
-      if (rowInStripe >= rowCountInStripe) {
-        currentStripe += 1;
-        if (currentStripe >= stripes.size()) {
-          batch.size = 0;
-          return false;
+  /* Routines for copying between VectorizedRowBatches */
+
+  void copyLongColumn(ColumnVector destination,
+                      ColumnVector source,
+                      int sourceOffset,
+                      int length) {
+    LongColumnVector lsource = (LongColumnVector) source;
+    LongColumnVector ldest = (LongColumnVector) destination;
+    ldest.isRepeating = lsource.isRepeating;
+    ldest.noNulls = lsource.noNulls;
+    if (source.isRepeating) {
+      ldest.isNull[0] = lsource.isNull[0];
+      ldest.vector[0] = lsource.vector[0];
+    } else {
+      if (!lsource.noNulls) {
+        for(int r=0; r < length; ++r) {
+          ldest.isNull[r] = lsource.isNull[sourceOffset + r];
+          ldest.vector[r] = lsource.vector[sourceOffset + r];
+        }
+      } else {
+        for (int r = 0; r < length; ++r) {
+          ldest.vector[r] = lsource.vector[sourceOffset + r];
         }
-        readStripe();
       }
-
-      int batchSize = computeBatchSize(batch.getMaxSize());
-
-      rowInStripe += batchSize;
-      reader.setVectorColumnCount(batch.getDataColumnCount());
-      reader.nextBatch(batch, batchSize);
-
-      batch.size = (int) batchSize;
-      batch.selectedInUse = false;
-      advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
-      return batch.size  != 0;
-    } catch (IOException e) {
-      // Rethrow exception with file name in log message
-      throw new IOException("Error reading file: " + path, e);
     }
   }
 
-  private int computeBatchSize(long targetBatchSize) {
-    final int batchSize;
-    // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
-    // groups are selected then marker position is set to the end of range (subset of row groups
-    // within strip). Batch size computed out of marker position makes sure that batch size is
-    // aware of row group boundary and will not cause overflow when reading rows
-    // illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287
-    if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) {
-      int startRowGroup = (int) (rowInStripe / rowIndexStride);
-      if (!includedRowGroups[startRowGroup]) {
-        while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) {
-          startRowGroup += 1;
+  void copyDoubleColumn(ColumnVector destination,
+                        ColumnVector source,
+                        int sourceOffset,
+                        int length) {
+    DoubleColumnVector castedSource = (DoubleColumnVector) source;
+    DoubleColumnVector castedDestination = (DoubleColumnVector) destination;
+    if (source.isRepeating) {
+      castedDestination.isRepeating = true;
+      castedDestination.noNulls = castedSource.noNulls;
+      castedDestination.isNull[0] = castedSource.isNull[0];
+      castedDestination.vector[0] = castedSource.vector[0];
+    } else {
+      if (!castedSource.noNulls) {
+        castedDestination.noNulls = true;
+        for(int r=0; r < length; ++r) {
+          castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
         }
       }
-
-      int endRowGroup = startRowGroup;
-      while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) {
-        endRowGroup += 1;
-      }
-
-      final long markerPosition =
-          (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride)
-              : rowCountInStripe;
-      batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe));
-
-      if (isLogDebugEnabled && batchSize < targetBatchSize) {
-        LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize);
+      for(int r=0; r < length; ++r) {
+        castedDestination.vector[r] = castedSource.vector[sourceOffset + r];
       }
-    } else {
-      batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
     }
-    return batchSize;
-  }
-
-  @Override
-  public void close() throws IOException {
-    clearStreams();
-    dataReader.close();
-  }
-
-  @Override
-  public long getRowNumber() {
-    return rowInStripe + rowBaseInStripe + firstRow;
-  }
-
-  /**
-   * Return the fraction of rows that have been read from the selected.
-   * section of the file
-   *
-   * @return fraction between 0.0 and 1.0 of rows consumed
-   */
-  @Override
-  public float getProgress() {
-    return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
   }
 
-  private int findStripe(long rowNumber) {
-    for (int i = 0; i < stripes.size(); i++) {
-      StripeInformation stripe = stripes.get(i);
-      if (stripe.getNumberOfRows() > rowNumber) {
-        return i;
+  void copyTimestampColumn(ColumnVector destination,
+                           ColumnVector source,
+                           int sourceOffset,
+                           int length) {
+    TimestampColumnVector castedSource = (TimestampColumnVector) source;
+    TimestampColumnVector castedDestination = (TimestampColumnVector) destination;
+    castedDestination.isRepeating = castedSource.isRepeating;
+    castedDestination.noNulls = castedSource.noNulls;
+    if (source.isRepeating) {
+      castedDestination.isNull[0] = castedSource.isNull[0];
+      castedDestination.time[0] = castedSource.time[0];
+      castedDestination.nanos[0] = castedSource.nanos[0];
+    } else {
+      if (!castedSource.noNulls) {
+        castedDestination.noNulls = true;
+        for(int r=0; r < length; ++r) {
+          castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+          castedDestination.time[r] = castedSource.time[sourceOffset + r];
+          castedDestination.nanos[r] = castedSource.nanos[sourceOffset + r];
+        }
+      } else {
+        for (int r = 0; r < length; ++r) {
+          castedDestination.time[r] = castedSource.time[sourceOffset + r];
+          castedDestination.nanos[r] = castedSource.nanos[sourceOffset + r];
+        }
       }
-      rowNumber -= stripe.getNumberOfRows();
     }
-    throw new IllegalArgumentException("Seek after the end of reader range");
   }
 
-  OrcIndex readRowIndex(
-      int stripeIndex, boolean[] included, boolean[] sargColumns) throws IOException {
-    return readRowIndex(stripeIndex, included, null, null, sargColumns);
+  void copyDecimalColumn(ColumnVector destination,
+                         ColumnVector source,
+                         int sourceOffset,
+                         int length) {
+    DecimalColumnVector castedSource = (DecimalColumnVector) source;
+    DecimalColumnVector castedDestination = (DecimalColumnVector) destination;
+    castedDestination.isRepeating = castedSource.isRepeating;
+    castedDestination.noNulls = castedSource.noNulls;
+    if (source.isRepeating) {
+      castedDestination.isNull[0] = castedSource.isNull[0];
+      if (!castedSource.isNull[0]) {
+        castedDestination.set(0, castedSource.vector[0]);
+      }
+    } else {
+      if (!castedSource.noNulls) {
+        for(int r=0; r < length; ++r) {
+          castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+          if (!castedDestination.isNull[r]) {
+            castedDestination.set(r, castedSource.vector[r]);
+          }
+        }
+      } else {
+        for (int r = 0; r < length; ++r) {
+          castedDestination.set(r, castedSource.vector[r]);
+        }
+      }
+    }
   }
 
-  OrcIndex readRowIndex(int stripeIndex, boolean[] included, OrcProto.RowIndex[] indexes,
-      OrcProto.BloomFilterIndex[] bloomFilterIndex, boolean[] sargColumns) throws IOException {
-    StripeInformation stripe = stripes.get(stripeIndex);
-    OrcProto.StripeFooter stripeFooter = null;
-    // if this is the current stripe, use the cached objects.
-    if (stripeIndex == currentStripe) {
-      stripeFooter = this.stripeFooter;
-      indexes = indexes == null ? this.indexes : indexes;
-      bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex;
-      sargColumns = sargColumns == null ?
-          (sargApp == null ? null : sargApp.sargColumns) : sargColumns;
+  void copyBytesColumn(ColumnVector destination,
+                       ColumnVector source,
+                       int sourceOffset,
+                       int length) {
+    BytesColumnVector castedSource = (BytesColumnVector) source;
+    BytesColumnVector castedDestination = (BytesColumnVector) destination;
+    castedDestination.isRepeating = castedSource.isRepeating;
+    castedDestination.noNulls = castedSource.noNulls;
+    if (source.isRepeating) {
+      castedDestination.isNull[0] = castedSource.isNull[0];
+      if (!castedSource.isNull[0]) {
+        castedDestination.setVal(0, castedSource.vector[0],
+            castedSource.start[0], castedSource.length[0]);
+      }
+    } else {
+      if (!castedSource.noNulls) {
+        for(int r=0; r < length; ++r) {
+          castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+          if (!castedDestination.isNull[r]) {
+            castedDestination.setVal(r, castedSource.vector[sourceOffset + r],
+                castedSource.start[sourceOffset + r],
+                castedSource.length[sourceOffset + r]);
+          }
+        }
+      } else {
+        for (int r = 0; r < length; ++r) {
+          castedDestination.setVal(r, castedSource.vector[sourceOffset + r],
+              castedSource.start[sourceOffset + r],
+              castedSource.length[sourceOffset + r]);
+        }
+      }
     }
-    return dataReader.readRowIndex(stripe, stripeFooter, included, indexes,
-        sargColumns, bloomFilterIndex);
   }
 
-  private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)
-      throws IOException {
-    PositionProvider[] index = new PositionProvider[indexes.length];
-    for (int i = 0; i < indexes.length; ++i) {
-      if (indexes[i] != null) {
-        index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+  void copyStructColumn(ColumnVector destination,
+                        ColumnVector source,
+                        int sourceOffset,
+                        int length) {
+    StructColumnVector castedSource = (StructColumnVector) source;
+    StructColumnVector castedDestination = (StructColumnVector) destination;
+    castedDestination.isRepeating = castedSource.isRepeating;
+    castedDestination.noNulls = castedSource.noNulls;
+    if (source.isRepeating) {
+      castedDestination.isNull[0] = castedSource.isNull[0];
+      for(int c=0; c > castedSource.fields.length; ++c) {
+        copyColumn(castedDestination.fields[c], castedSource.fields[c], 0, 1);
+      }
+    } else {
+      if (!castedSource.noNulls) {
+        for (int r = 0; r < length; ++r) {
+          castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+        }
+      } else {
+        for (int c = 0; c > castedSource.fields.length; ++c) {
+          copyColumn(castedDestination.fields[c], castedSource.fields[c],
+              sourceOffset, length);
+        }
       }
     }
-    reader.seek(index);
   }
 
-  @Override
-  public void seekToRow(long rowNumber) throws IOException {
-    if (rowNumber < 0) {
-      throw new IllegalArgumentException("Seek to a negative row number " +
-          rowNumber);
-    } else if (rowNumber < firstRow) {
-      throw new IllegalArgumentException("Seek before reader range " +
-          rowNumber);
+  void copyUnionColumn(ColumnVector destination,
+                       ColumnVector source,
+                       int sourceOffset,
+                       int length) {
+    UnionColumnVector castedSource = (UnionColumnVector) source;
+    UnionColumnVector castedDestination = (UnionColumnVector) destination;
+    castedDestination.isRepeating = castedSource.isRepeating;
+    castedDestination.noNulls = castedSource.noNulls;
+    if (source.isRepeating) {
+      castedDestination.isNull[0] = castedSource.isNull[0];
+      int tag = castedSource.tags[0];
+      castedDestination.tags[0] = tag;
+      if (!castedDestination.isNull[0]) {
+        copyColumn(castedDestination.fields[tag], castedSource.fields[tag], 0,
+            1);
+      }
+    } else {
+      if (!castedSource.noNulls) {
+        for (int r = 0; r < length; ++r) {
+          castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+          castedDestination.tags[r] = castedSource.tags[sourceOffset + r];
+        }
+      } else {
+        for(int r=0; r < length; ++r) {
+          castedDestination.tags[r] = castedSource.tags[sourceOffset + r];
+        }
+      }
+      for(int c=0; c > castedSource.fields.length; ++c) {
+        copyColumn(castedDestination.fields[c], castedSource.fields[c],
+            sourceOffset, length);
+      }
     }
-    // convert to our internal form (rows from the beginning of slice)
-    rowNumber -= firstRow;
+  }
 
-    // move to the right stripe
-    int rightStripe = findStripe(rowNumber);
-    if (rightStripe != currentStripe) {
-      currentStripe = rightStripe;
-      readStripe();
+  void copyListColumn(ColumnVector destination,
+                       ColumnVector source,
+                       int sourceOffset,
+                       int length) {
+    ListColumnVector castedSource = (ListColumnVector) source;
+    ListColumnVector castedDestination = (ListColumnVector) destination;
+    castedDestination.isRepeating = castedSource.noNulls;
+    castedDestination.noNulls = castedSource.noNulls;
+    if (source.isRepeating) {
+      castedDestination.isNull[0] = castedSource.isNull[0];
+      castedDestination.offsets[0] = 0;
+      castedDestination.lengths[0] = castedSource.lengths[0];
+      copyColumn(castedDestination.child, castedSource.child,
+          (int) castedSource.offsets[0], (int) castedSource.lengths[0]);
+    } else {
+      if (!castedSource.noNulls) {
+        for (int r = 0; r < length; ++r) {
+          castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+        }
+      }
+      int minOffset = Integer.MAX_VALUE;
+      int maxOffset = Integer.MIN_VALUE;
+      for(int r=0; r < length; ++r) {
+        int childOffset = (int) castedSource.offsets[r + sourceOffset];
+        int childLength = (int) castedSource.lengths[r + sourceOffset];
+        castedDestination.offsets[r] = childOffset;
+        castedDestination.lengths[r] = childLength;
+        minOffset = Math.min(minOffset, childOffset);
+        maxOffset = Math.max(maxOffset, childOffset + childLength);
+      }
+      if (minOffset <= maxOffset) {
+        castedDestination.childCount = maxOffset - minOffset + 1;
+        copyColumn(castedDestination.child, castedSource.child,
+            minOffset, castedDestination.childCount);
+      } else {
+        castedDestination.childCount = 0;
+      }
+    }
+  }
+
+  void copyMapColumn(ColumnVector destination,
+                     ColumnVector source,
+                     int sourceOffset,
+                     int length) {
+    MapColumnVector castedSource = (MapColumnVector) source;
+    MapColumnVector castedDestination = (MapColumnVector) destination;
+    castedDestination.isRepeating = castedSource.noNulls;
+    castedDestination.noNulls = castedSource.noNulls;
+    if (source.isRepeating) {
+      castedDestination.isNull[0] = castedSource.isNull[0];
+      castedDestination.offsets[0] = 0;
+      castedDestination.lengths[0] = castedSource.lengths[0];
+      copyColumn(castedDestination.keys, castedSource.keys,
+          (int) castedSource.offsets[0], (int) castedSource.lengths[0]);
+      copyColumn(castedDestination.values, castedSource.values,
+          (int) castedSource.offsets[0], (int) castedSource.lengths[0]);
+    } else {
+      if (!castedSource.noNulls) {
+        for (int r = 0; r < length; ++r) {
+          castedDestination.isNull[r] = castedSource.isNull[sourceOffset + r];
+        }
+      }
+      int minOffset = Integer.MAX_VALUE;
+      int maxOffset = Integer.MIN_VALUE;
+      for(int r=0; r < length; ++r) {
+        int childOffset = (int) castedSource.offsets[r + sourceOffset];
+        int childLength = (int) castedSource.lengths[r + sourceOffset];
+        castedDestination.offsets[r] = childOffset;
+        castedDestination.lengths[r] = childLength;
+        minOffset = Math.min(minOffset, childOffset);
+        maxOffset = Math.max(maxOffset, childOffset + childLength);
+      }
+      if (minOffset <= maxOffset) {
+        castedDestination.childCount = maxOffset - minOffset + 1;
+        copyColumn(castedDestination.keys, castedSource.keys,
+            minOffset, castedDestination.childCount);
+        copyColumn(castedDestination.values, castedSource.values,
+            minOffset, castedDestination.childCount);
+      } else {
+        castedDestination.childCount = 0;
+      }
     }
-    readRowIndex(currentStripe, included, sargApp == null ? null : sargApp.sargColumns);
-
-    // if we aren't to the right row yet, advance in the stripe.
-    advanceToNextRow(reader, rowNumber, true);
   }
 
-  private static final String TRANSLATED_SARG_SEPARATOR = "_";
-  public static String encodeTranslatedSargColumn(int rootColumn, Integer indexInSourceTable) {
-    return rootColumn + TRANSLATED_SARG_SEPARATOR
-        + ((indexInSourceTable == null) ? -1 : indexInSourceTable);
+  void copyColumn(ColumnVector destination,
+                  ColumnVector source,
+                  int sourceOffset,
+                  int length) {
+    if (source.getClass() == LongColumnVector.class) {
+      copyLongColumn(destination, source, sourceOffset, length);
+    } else if (source.getClass() == DoubleColumnVector.class) {
+      copyDoubleColumn(destination, source, sourceOffset, length);
+    } else if (source.getClass() == BytesColumnVector.class) {
+      copyBytesColumn(destination, source, sourceOffset, length);
+    } else if (source.getClass() == TimestampColumnVector.class) {
+      copyTimestampColumn(destination, source, sourceOffset, length);
+    } else if (source.getClass() == DecimalColumnVector.class) {
+      copyDecimalColumn(destination, source, sourceOffset, length);
+    } else if (source.getClass() == StructColumnVector.class) {
+      copyStructColumn(destination, source, sourceOffset, length);
+    } else if (source.getClass() == UnionColumnVector.class) {
+      copyUnionColumn(destination, source, sourceOffset, length);
+    } else if (source.getClass() == ListColumnVector.class) {
+      copyListColumn(destination, source, sourceOffset, length);
+    } else if (source.getClass() == MapColumnVector.class) {
+      copyMapColumn(destination, source, sourceOffset, length);
+    }
   }
 
-  public static int[] mapTranslatedSargColumns(
-      List<OrcProto.Type> types, List<PredicateLeaf> sargLeaves) {
-    int[] result = new int[sargLeaves.size()];
-    OrcProto.Type lastRoot = null; // Root will be the same for everyone as of now.
-    String lastRootStr = null;
-    for (int i = 0; i < result.length; ++i) {
-      String[] rootAndIndex = sargLeaves.get(i).getColumnName().split(TRANSLATED_SARG_SEPARATOR);
-      assert rootAndIndex.length == 2;
-      String rootStr = rootAndIndex[0], indexStr = rootAndIndex[1];
-      int index = Integer.parseInt(indexStr);
-      // First, check if the column even maps to anything.
-      if (index == -1) {
-        result[i] = -1;
-        continue;
-      }
-      assert index >= 0;
-      // Then, find the root type if needed.
-      if (!rootStr.equals(lastRootStr)) {
-        lastRoot = types.get(Integer.parseInt(rootStr));
-        lastRootStr = rootStr;
-      }
-      // Subtypes of the root types correspond, in order, to the columns in the table schema
-      // (disregarding schema evolution that doesn't presently work). Get the index for the
-      // corresponding subtype.
-      result[i] = lastRoot.getSubtypes(index);
-    }
-    return result;
+  /**
+   * Copy part of a batch into the destination batch.
+   * @param destination the batch to copy into
+   * @param source the batch to copy from
+   * @param sourceStart the row number to start from in the source
+   * @return the number of rows copied
+   */
+  void copyIntoBatch(VectorizedRowBatch destination,
+                     VectorizedRowBatch source,
+                     int sourceStart) {
+    int rows = Math.min(source.size - sourceStart, destination.getMaxSize());
+    for(int c=0; c < source.cols.length; ++c) {
+      destination.cols[c].reset();
+      copyColumn(destination.cols[c], source.cols[c], sourceStart, rows);
+    }
+    destination.size = rows;
   }
 }