You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/09/16 19:50:04 UTC

svn commit: r1625344 [4/4] - in /hive/branches/llap: ./ common/src/java/org/apache/hadoop/hive/conf/ data/conf/ data/conf/tez/ itests/qtest/ llap-client/ llap-client/src/ llap-client/src/java/ llap-client/src/java/org/ llap-client/src/java/org/apache/ ...

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Tue Sep 16 17:50:02 2014
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.io.orc
 
 import java.io.IOException;
 
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 
 /**
@@ -75,4 +76,19 @@ public interface RecordReader {
    * Seek to a particular row number.
    */
   void seekToRow(long rowCount) throws IOException;
+
+  /**
+   * TODO: change to interface rather than ctx obj?
+   * TODO: write javadoc.
+   * @return
+   */
+  Object prepareColumnRead();
+
+  /**
+   * TODO: write javadoc.
+   * @param writer
+   * @return
+   */
+  boolean readNextColumnStripe(Object ctxObj, ChunkWriter writer)
+      throws IOException;
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Sep 16 17:50:02 2014
@@ -42,6 +42,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
 import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
@@ -49,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
+import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
 import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -283,7 +287,7 @@ class RecordReaderImpl implements Record
     reader = createTreeReader(path, 0, types, included, conf);
     indexes = new OrcProto.RowIndex[types.size()];
     rowIndexStride = strideRate;
-    advanceToNextRow(0L);
+    advanceToNextRow(reader, 0L, true);
   }
 
   private static final class PositionProviderImpl implements PositionProvider {
@@ -303,7 +307,7 @@ class RecordReaderImpl implements Record
   private abstract static class TreeReader {
     protected final Path path;
     protected final int columnId;
-    private BitFieldReader present = null;
+    protected BitFieldReader present = null;
     protected boolean valuePresent = false;
     protected final Configuration conf;
 
@@ -392,7 +396,6 @@ class RecordReaderImpl implements Record
      * @throws IOException
      */
     Object nextVector(Object previousVector, long batchSize) throws IOException {
-
       ColumnVector result = (ColumnVector) previousVector;
       if (present != null) {
         // Set noNulls and isNull vector of the ColumnVector based on
@@ -414,9 +417,11 @@ class RecordReaderImpl implements Record
       }
       return previousVector;
     }
+
+    public abstract long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException;
   }
 
-  private static class BooleanTreeReader extends TreeReader{
+  private static class BooleanTreeReader extends TreeReader {
     private BitFieldReader reader = null;
 
     BooleanTreeReader(Path path, int columnId, Configuration conf) {
@@ -474,6 +479,11 @@ class RecordReaderImpl implements Record
       reader.nextVector(result, batchSize);
       return result;
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+      return reader.nextChunk(writer, present, rowsLeft);
+    }
   }
 
   private static class ByteTreeReader extends TreeReader{
@@ -534,6 +544,11 @@ class RecordReaderImpl implements Record
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+      return reader.nextChunk(writer, present, rowsLeft);
+    }
   }
 
   private static class ShortTreeReader extends TreeReader{
@@ -604,6 +619,11 @@ class RecordReaderImpl implements Record
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+      return reader.nextChunk(writer, present, rowsLeft);
+    }
   }
 
   private static class IntTreeReader extends TreeReader{
@@ -674,6 +694,11 @@ class RecordReaderImpl implements Record
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+      return reader.nextChunk(writer, present, rowsLeft);
+    }
   }
 
   private static class LongTreeReader extends TreeReader{
@@ -744,6 +769,11 @@ class RecordReaderImpl implements Record
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+      return reader.nextChunk(writer, present, rowsLeft);
+    }
   }
 
   private static class FloatTreeReader extends TreeReader{
@@ -826,6 +856,45 @@ class RecordReaderImpl implements Record
         utils.readFloat(stream);
       }
     }
+
+    private double[] values;
+    private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeftToRead) throws IOException {
+      boolean mayHaveNulls = present != null;
+      NullsState nullState = mayHaveNulls ? NullsState.HAS_NULLS : NullsState.NO_NULLS;
+      int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.DOUBLE, mayHaveNulls);
+      if (rowsLeftToWrite == 0) {
+        return 0; // Cannot write any rows into this writer.
+      }
+      // If we send values to llap one by one, it will be hard for it to decide how to
+      // store them wrt nulls. Therefore, we'll group values together and send in groups.
+      if (values == null) {
+        values = new double[LlapUtils.DOUBLE_GROUP_SIZE];
+      }
+      long originalRowsLeft = rowsLeftToRead;
+      // Start the big loop to read rows until we run out of either input or space.
+      while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+        int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+        presentHelper.availLength = Math.min(values.length, rowsToTransfer);
+        if (mayHaveNulls) {
+          LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+        }
+        if (presentHelper.isNullsRun) {
+          writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+        } else {
+          for (int i = 0; i < presentHelper.availLength; ++i) {
+            values[i] = utils.readFloat(stream);
+          }
+          writer.writeDoubles(values, 0, presentHelper.availLength,
+              presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : nullState);
+        }
+        rowsLeftToWrite = writer.estimateValueCountThatFits(Type.DOUBLE, mayHaveNulls);
+        rowsLeftToRead -= presentHelper.availLength;
+      }
+      writer.finishCurrentSegment();
+      return (int)(originalRowsLeft - rowsLeftToRead);
+    }
   }
 
   private static class DoubleTreeReader extends TreeReader{
@@ -906,6 +975,46 @@ class RecordReaderImpl implements Record
       items = countNonNulls(items);
       stream.skip(items * 8);
     }
+
+
+    private double[] values;
+    private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeftToRead) throws IOException {
+      boolean mayHaveNulls = present != null;
+      NullsState nullState = mayHaveNulls ? NullsState.HAS_NULLS : NullsState.NO_NULLS;
+      int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.DOUBLE, mayHaveNulls);
+      if (rowsLeftToWrite == 0) {
+        return 0; // Cannot write any rows into this writer.
+      }
+      // If we send values to llap one by one, it will be hard for it to decide how to
+      // store them wrt nulls. Therefore, we'll group values together and send in groups.
+      if (values == null) {
+        values = new double[LlapUtils.DOUBLE_GROUP_SIZE];
+      }
+      long originalRowsLeft = rowsLeftToRead;
+      // Start the big loop to read rows until we run out of either input or space.
+      while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+        int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+        presentHelper.availLength = Math.min(values.length, rowsToTransfer);
+        if (mayHaveNulls) {
+          LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+        }
+        if (presentHelper.isNullsRun) {
+          writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+        } else {
+          for (int i = 0; i < presentHelper.availLength; ++i) {
+            values[i] = utils.readDouble(stream);
+          }
+          writer.writeDoubles(values, 0, presentHelper.availLength,
+              presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : nullState);
+        }
+        rowsLeftToWrite = writer.estimateValueCountThatFits(Type.DOUBLE, mayHaveNulls);
+        rowsLeftToRead -= presentHelper.availLength;
+      }
+      writer.finishCurrentSegment();
+      return (int)(originalRowsLeft - rowsLeftToRead);
+    }
   }
 
   private static class BinaryTreeReader extends TreeReader{
@@ -997,9 +1106,15 @@ class RecordReaderImpl implements Record
       }
       stream.skip(lengthToSkip);
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) {
+      // TODO: string support would be here
+      throw new UnsupportedOperationException();
+    }
   }
 
-  private static class TimestampTreeReader extends TreeReader{
+  private static class TimestampTreeReader extends TreeReader {
     private IntegerReader data = null;
     private IntegerReader nanos = null;
     private final LongColumnVector nanoVector = new LongColumnVector();
@@ -1128,6 +1243,12 @@ class RecordReaderImpl implements Record
       data.skip(items);
       nanos.skip(items);
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) {
+      // TODO: timestamp support would be here
+      throw new UnsupportedOperationException();
+    }
   }
 
   private static class DateTreeReader extends TreeReader{
@@ -1198,6 +1319,11 @@ class RecordReaderImpl implements Record
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) throws IOException {
+      return reader.nextChunk(writer, present, rowsLeft);
+    }
   }
 
   private static class DecimalTreeReader extends TreeReader{
@@ -1315,6 +1441,12 @@ class RecordReaderImpl implements Record
       }
       scaleStream.skip(items);
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) {
+      // TODO: decimal support would be here
+      throw new UnsupportedOperationException();
+    }
   }
 
   /**
@@ -1375,6 +1507,12 @@ class RecordReaderImpl implements Record
     void skipRows(long items) throws IOException {
       reader.skipRows(items);
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) {
+      // TODO: string support would be here
+      throw new UnsupportedOperationException();
+    }
   }
 
   // This class collects together very similar methods for reading an ORC vector of byte arrays and
@@ -1542,6 +1680,12 @@ class RecordReaderImpl implements Record
       }
       stream.skip(lengthToSkip);
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) {
+      // TODO: string support would be here
+      throw new UnsupportedOperationException();
+    }
   }
 
   /**
@@ -1717,6 +1861,12 @@ class RecordReaderImpl implements Record
     void skipRows(long items) throws IOException {
       reader.skip(countNonNulls(items));
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) {
+      // TODO: string support would be here
+      throw new UnsupportedOperationException();
+    }
   }
 
   private static class CharTreeReader extends StringTreeReader {
@@ -1850,6 +2000,7 @@ class RecordReaderImpl implements Record
   private static class StructTreeReader extends TreeReader {
     private final TreeReader[] fields;
     private final String[] fieldNames;
+    private final List<TreeReader> readers;
 
     StructTreeReader(Path path, int columnId,
                      List<OrcProto.Type> types,
@@ -1859,10 +2010,12 @@ class RecordReaderImpl implements Record
       int fieldCount = type.getFieldNamesCount();
       this.fields = new TreeReader[fieldCount];
       this.fieldNames = new String[fieldCount];
+      this.readers = new ArrayList<TreeReader>();
       for(int i=0; i < fieldCount; ++i) {
         int subtype = type.getSubtypes(i);
         if (included == null || included[subtype]) {
           this.fields[i] = createTreeReader(path, subtype, types, included, conf);
+          readers.add(this.fields[i]);
         }
         this.fieldNames[i] = type.getFieldNames(i);
       }
@@ -1904,6 +2057,21 @@ class RecordReaderImpl implements Record
       return result;
     }
 
+    /**
+     * @return Total count of <b>non-null</b> field readers.
+     */
+    int getReaderCount() {
+      return readers.size();
+    }
+
+    /**
+     * @param readerIndex Index among <b>non-null</b> readers. Not a column index!
+     * @return The readerIndex-s <b>non-null</b> field reader.
+     */
+    TreeReader getColumnReader(int readerIndex) {
+      return readers.get(readerIndex);
+    }
+
     @Override
     Object nextVector(Object previousVector, long batchSize) throws IOException {
       ColumnVector[] result = null;
@@ -1947,6 +2115,11 @@ class RecordReaderImpl implements Record
         }
       }
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) {
+      throw new UnsupportedOperationException("Non-primitives are not supported");
+    }
   }
 
   private static class UnionTreeReader extends TreeReader {
@@ -2026,6 +2199,11 @@ class RecordReaderImpl implements Record
         fields[i].skipRows(counts[i]);
       }
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) {
+      throw new UnsupportedOperationException("Non-primitives are not supported");
+    }
   }
 
   private static class ListTreeReader extends TreeReader {
@@ -2115,6 +2293,11 @@ class RecordReaderImpl implements Record
       }
       elementReader.skipRows(childSkip);
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) {
+      throw new UnsupportedOperationException("Non-primitives are not supported");
+    }
   }
 
   private static class MapTreeReader extends TreeReader {
@@ -2213,6 +2396,11 @@ class RecordReaderImpl implements Record
       keyReader.skipRows(childSkip);
       valueReader.skipRows(childSkip);
     }
+
+    @Override
+    public long nextChunk(ChunkWriter writer, long rowsLeft) {
+      throw new UnsupportedOperationException("Non-primitives are not supported");
+    }
   }
 
   private static TreeReader createTreeReader(Path path,
@@ -2669,7 +2857,7 @@ class RecordReaderImpl implements Record
       reader.startStripe(streams, stripeFooter.getColumnsList());
       // if we skipped the first row group, move the pointers forward
       if (rowInStripe != 0) {
-        seekToRowEntry((int) (rowInStripe / rowIndexStride));
+        seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
       }
     }
   }
@@ -2680,7 +2868,7 @@ class RecordReaderImpl implements Record
     long end = start + stripe.getDataLength();
     // explicitly trigger 1 big read
     DiskRange[] ranges = new DiskRange[]{new DiskRange(start, end)};
-    bufferChunks = readDiskRanges(file, stripe.getOffset(), Arrays.asList(ranges));
+    bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), Arrays.asList(ranges));
     List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
     createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
   }
@@ -2936,7 +3124,8 @@ class RecordReaderImpl implements Record
    *    ranges
    * @throws IOException
    */
-  List<BufferChunk> readDiskRanges(FSDataInputStream file,
+  static List<BufferChunk> readDiskRanges(FSDataInputStream file,
+                                 ZeroCopyReaderShim zcr,
                                  long base,
                                  List<DiskRange> ranges) throws IOException {
     ArrayList<BufferChunk> result = new ArrayList<RecordReaderImpl.BufferChunk>(ranges.size());
@@ -3062,7 +3251,7 @@ class RecordReaderImpl implements Record
     if (LOG.isDebugEnabled()) {
       LOG.debug("merge = " + stringifyDiskRanges(chunks));
     }
-    bufferChunks = readDiskRanges(file, stripe.getOffset(), chunks);
+    bufferChunks = readDiskRanges(file, zcr, stripe.getOffset(), chunks);
     createStreams(streamList, bufferChunks, included, codec, bufferSize,
         streams);
   }
@@ -3091,7 +3280,8 @@ class RecordReaderImpl implements Record
    * @param nextRow the row we want to go to
    * @throws IOException
    */
-  private void advanceToNextRow(long nextRow) throws IOException {
+  private boolean advanceToNextRow(
+      TreeReader reader, long nextRow, boolean canAdvanceStripe) throws IOException {
     long nextRowInStripe = nextRow - rowBaseInStripe;
     // check for row skipping
     if (rowIndexStride != 0 &&
@@ -3099,32 +3289,35 @@ class RecordReaderImpl implements Record
         nextRowInStripe < rowCountInStripe) {
       int rowGroup = (int) (nextRowInStripe / rowIndexStride);
       if (!includedRowGroups[rowGroup]) {
-        while (rowGroup < includedRowGroups.length &&
-               !includedRowGroups[rowGroup]) {
+        while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) {
           rowGroup += 1;
         }
-        // if we are off the end of the stripe, just move stripes
         if (rowGroup >= includedRowGroups.length) {
-          advanceStripe();
-          return;
+          if (canAdvanceStripe) {
+            advanceStripe();
+          }
+          return canAdvanceStripe;
         }
         nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
       }
     }
-    if (nextRowInStripe < rowCountInStripe) {
-      if (nextRowInStripe != rowInStripe) {
-        if (rowIndexStride != 0) {
-          int rowGroup = (int) (nextRowInStripe / rowIndexStride);
-          seekToRowEntry(rowGroup);
-          reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
-        } else {
-          reader.skipRows(nextRowInStripe - rowInStripe);
-        }
-        rowInStripe = nextRowInStripe;
+    if (nextRowInStripe >= rowCountInStripe) {
+      if (canAdvanceStripe) {
+        advanceStripe();
       }
-    } else {
-      advanceStripe();
+      return canAdvanceStripe;
+    }
+    if (nextRowInStripe != rowInStripe) {
+      if (rowIndexStride != 0) {
+        int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+        seekToRowEntry(reader, rowGroup);
+        reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+      } else {
+        reader.skipRows(nextRowInStripe - rowInStripe);
+      }
+      rowInStripe = nextRowInStripe;
     }
+    return true;
   }
 
   @Override
@@ -3132,7 +3325,7 @@ class RecordReaderImpl implements Record
     Object result = reader.next(previous);
     // find the next row
     rowInStripe += 1;
-    advanceToNextRow(rowInStripe + rowBaseInStripe);
+    advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
     if (LOG.isDebugEnabled()) {
       LOG.debug("row from " + reader.path);
       LOG.debug("orc row = " + result);
@@ -3148,8 +3341,26 @@ class RecordReaderImpl implements Record
       readStripe();
     }
 
-    long batchSize = 0;
+    long batchSize = computeBatchSize(VectorizedRowBatch.DEFAULT_SIZE);
+
+    rowInStripe += batchSize;
+    if (previous == null) {
+      ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
+      result = new VectorizedRowBatch(cols.length);
+      result.cols = cols;
+    } else {
+      result = (VectorizedRowBatch) previous;
+      result.selectedInUse = false;
+      reader.nextVector(result.cols, (int) batchSize);
+    }
+
+    result.size = (int) batchSize;
+    advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
+    return result;
+  }
 
+  private long computeBatchSize(long targetBatchSize) {
+    long batchSize = 0;
     // In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
     // groups are selected then marker position is set to the end of range (subset of row groups
     // within strip). Batch size computed out of marker position makes sure that batch size is
@@ -3170,29 +3381,15 @@ class RecordReaderImpl implements Record
 
       final long markerPosition = (endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride)
           : rowCountInStripe;
-      batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (markerPosition - rowInStripe));
+      batchSize = Math.min(targetBatchSize, (markerPosition - rowInStripe));
 
-      if (LOG.isDebugEnabled() && batchSize < VectorizedRowBatch.DEFAULT_SIZE) {
+      if (LOG.isDebugEnabled() && batchSize < targetBatchSize) {
         LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize);
       }
     } else {
-      batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (rowCountInStripe - rowInStripe));
-    }
-
-    rowInStripe += batchSize;
-    if (previous == null) {
-      ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
-      result = new VectorizedRowBatch(cols.length);
-      result.cols = cols;
-    } else {
-      result = (VectorizedRowBatch) previous;
-      result.selectedInUse = false;
-      reader.nextVector(result.cols, (int) batchSize);
+      batchSize = Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
     }
-
-    result.size = (int) batchSize;
-    advanceToNextRow(rowInStripe + rowBaseInStripe);
-    return result;
+    return batchSize;
   }
 
   @Override
@@ -3257,12 +3454,11 @@ class RecordReaderImpl implements Record
     return indexes;
   }
 
-  private void seekToRowEntry(int rowEntry) throws IOException {
+  private void seekToRowEntry(TreeReader reader, int rowEntry) throws IOException {
     PositionProvider[] index = new PositionProvider[indexes.length];
-    for(int i=0; i < indexes.length; ++i) {
+    for (int i = 0; i < indexes.length; ++i) {
       if (indexes[i] != null) {
-        index[i]=
-            new PositionProviderImpl(indexes[i].getEntry(rowEntry));
+        index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
       }
     }
     reader.seek(index);
@@ -3289,6 +3485,89 @@ class RecordReaderImpl implements Record
     readRowIndex(currentStripe);
 
     // if we aren't to the right row yet, advanance in the stripe.
-    advanceToNextRow(rowNumber);
+    advanceToNextRow(reader, rowNumber, true);
+  }
+
+  /**
+   * Iterator-like context to read ORC as a sequence of column x stripe "cells".
+   * TODO: for this to actually be an iterator-like thing, we need to clone nested reader state.
+   *       As of now, we advance parent's shared column readers separately, which would cause
+   *       other calls (e.g. nextBatch) to break once nextColumnStripe is called. Currently,
+   *       it is always called alone, so this is ok; context is merely a convenience class.
+   */
+  private static class ColumnReadContext {
+    public ColumnReadContext(StructTreeReader reader) {
+      StructTreeReader structReader = (StructTreeReader)reader;
+      readers = new TreeReader[structReader.getReaderCount()];
+      for (int i = 0; i < readers.length; ++i) {
+        readers[i] = structReader.getColumnReader(i);
+      }
+    }
+    /** Readers for each separate column; no nulls, just the columns being read. */
+    private final TreeReader[] readers;
+    /** Remembered row offset after a partial read of one column from stripe. */
+    private long rowInStripe = 0;
+    /** Next column to be read (index into readers). */
+    private int columnIx = 0;
+    /** Remaining row count for current stripe; same for every column, so don't recompute. */
+    private long remainingToReadFromStart = -1;
+    /** Whether the next call will be the first for this column x stripe. TODO: derive? */
+    private boolean firstCall = true;
+  }
+
+  @Override
+  public Object prepareColumnRead() {
+    return new ColumnReadContext((StructTreeReader)this.reader);
+  }
+
+  @Override
+  public boolean readNextColumnStripe(
+      Object ctxObj, ChunkWriter writer) throws IOException {
+    ColumnReadContext ctx = (ColumnReadContext)ctxObj;
+    if (rowInStripe >= rowCountInStripe) {
+      assert ctx.columnIx == 0;
+      currentStripe += 1;
+      readStripe();
+    }
+    long rowInStripeGlobal = rowInStripe; // Remember the global state.
+    rowInStripe = ctx.rowInStripe;
+    if (ctx.columnIx == 0 && ctx.firstCall) {
+      // We are starting a new stripe - remember the number of rows to read (same for all cols).
+      // Doesn't take into account space remaining in ChunkWriter.
+      ctx.remainingToReadFromStart = computeBatchSize(Long.MAX_VALUE);
+    }
+    long remainingToRead =
+        ctx.firstCall ? ctx.remainingToReadFromStart : computeBatchSize(Long.MAX_VALUE);
+    TreeReader columnReader = ctx.readers[ctx.columnIx];
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Calling nextChunk for " + remainingToRead);
+    }
+    long rowsRead = columnReader.nextChunk(writer, remainingToRead);
+    assert rowsRead <= remainingToRead;
+    rowInStripe += rowsRead;
+    boolean doneWithColumnStripe = (rowsRead == remainingToRead);
+    ctx.firstCall = doneWithColumnStripe; // If we are not done, there will be more calls.
+    if (!doneWithColumnStripe) {
+      // Note that we are only advancing the reader for the current column.
+      boolean hasRows = advanceToNextRow(columnReader, rowInStripe + rowBaseInStripe, false);
+      ctx.rowInStripe = rowInStripe; // Remember the current value for next call.
+      if (!hasRows) {
+        throw new AssertionError("No rows after advance; read "
+            + rowsRead + " out of " + remainingToRead);
+      }
+    } else {
+      // Done with some column + stripe.
+      ++ctx.columnIx;
+      if (ctx.columnIx == ctx.readers.length) {
+        // Done with the last column in this stripe; advance the global rowInStripe.
+        ctx.columnIx = 0;
+        ctx.rowInStripe = rowInStripeGlobal = rowInStripe;
+      } else {
+        // Revert the state back to start of stripe.
+        ctx.rowInStripe = rowInStripeGlobal;
+      }
+    }
+    rowInStripe = rowInStripeGlobal; // Restore global state.
+    return !doneWithColumnStripe;
   }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java Tue Sep 16 17:50:02 2014
@@ -20,7 +20,12 @@ package org.apache.hadoop.hive.ql.io.orc
 import java.io.EOFException;
 import java.io.IOException;
 
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
+import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
 
 /**
  * A reader that reads a sequence of bytes. A control byte is read before
@@ -39,11 +44,15 @@ class RunLengthByteReader {
     this.input = input;
   }
 
-  private void readValues() throws IOException {
+  private void readValues(boolean ignoreEof) throws IOException {
     int control = input.read();
     used = 0;
     if (control == -1) {
-      throw new EOFException("Read past end of buffer RLE byte from " + input);
+      if (!ignoreEof) {
+        throw new EOFException("Read past end of buffer RLE byte from " + input);
+      }
+      used = numLiterals = 0;
+      return;
     } else if (control < 0x80) {
       repeat = true;
       numLiterals = control + RunLengthByteWriter.MIN_REPEAT_SIZE;
@@ -73,17 +82,70 @@ class RunLengthByteReader {
   byte next() throws IOException {
     byte result;
     if (used == numLiterals) {
-      readValues();
+      readValues(false);
     }
     if (repeat) {
-      used += 1;
       result = literals[0];
     } else {
-      result = literals[used++];
+      result = literals[used];
     }
+    ++used;
     return result;
   }
 
+
+  private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+  public int nextChunk(
+      ChunkWriter writer, BitFieldReader present, long rowsLeftToRead) throws IOException {
+    boolean mayHaveNulls = present != null;
+    int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+    if (rowsLeftToWrite == 0) {
+      return 0; // Cannot write any rows into this writer.
+    }
+    long originalRowsLeft = rowsLeftToRead;
+    // Start the big loop to read rows until we run out of either input or space.
+    while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+      int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+      presentHelper.availLength = Math.min(peekNextAvailLength(), rowsToTransfer);
+      if (mayHaveNulls) {
+        LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+      }
+      assert presentHelper.availLength > 0;
+      assert rowsLeftToRead >= presentHelper.availLength;
+
+      if (presentHelper.isNullsRun) {
+        writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+      } else {
+        NullsState nullsState = !mayHaveNulls ? NullsState.NO_NULLS :
+              (presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : NullsState.HAS_NULLS);
+        if (repeat) {
+          writer.writeRepeatedLongs(literals[0], presentHelper.availLength, nullsState);
+        } else {
+          writer.writeLongs(literals, used, presentHelper.availLength, nullsState);
+        }
+        skipCurrentLiterals(presentHelper.availLength);
+      }
+      rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+      rowsLeftToRead -= presentHelper.availLength;
+    } // End of big loop.
+    writer.finishCurrentSegment();
+    return (int)(originalRowsLeft - rowsLeftToRead);
+  }
+
+  private int peekNextAvailLength() throws IOException {
+    if (used == numLiterals) {
+      readValues(true);
+    }
+    return numLiterals - used;
+  }
+
+  private void skipCurrentLiterals(int valuesToSkip) {
+    if ((used + valuesToSkip) > numLiterals) {
+      throw new AssertionError("Skipping " + valuesToSkip + "; used " + used + "/" + numLiterals);
+    }
+    used += valuesToSkip;
+  }
+
   void nextVector(LongColumnVector previous, long previousLen)
       throws IOException {
     previous.isRepeating = true;
@@ -113,7 +175,7 @@ class RunLengthByteReader {
     if (consumed != 0) {
       // a loop is required for cases where we break the run into two parts
       while (consumed > 0) {
-        readValues();
+        readValues(false);
         used = consumed;
         consumed -= numLiterals;
       }
@@ -126,7 +188,7 @@ class RunLengthByteReader {
   void skip(long items) throws IOException {
     while (items > 0) {
       if (used == numLiterals) {
-        readValues();
+        readValues(false);
       }
       long consume = Math.min(items, numLiterals - used);
       used += consume;

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Tue Sep 16 17:50:02 2014
@@ -20,7 +20,12 @@ package org.apache.hadoop.hive.ql.io.orc
 import java.io.EOFException;
 import java.io.IOException;
 
+import org.apache.hadoop.hive.llap.api.Vector;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
 
 /**
  * A reader that reads a sequence of integers.
@@ -42,10 +47,14 @@ class RunLengthIntegerReader implements 
     this.utils = new SerializationUtils();
   }
 
-  private void readValues() throws IOException {
+  private void readValues(boolean ignoreEof) throws IOException {
     int control = input.read();
     if (control == -1) {
-      throw new EOFException("Read past end of RLE integer from " + input);
+      if (!ignoreEof) {
+        throw new EOFException("Read past end of RLE integer from " + input);
+      }
+      used = numLiterals = 0;
+      return;
     } else if (control < 0x80) {
       numLiterals = control + RunLengthIntegerWriter.MIN_REPEAT_SIZE;
       used = 0;
@@ -84,7 +93,7 @@ class RunLengthIntegerReader implements 
   public long next() throws IOException {
     long result;
     if (used == numLiterals) {
-      readValues();
+      readValues(false);
     }
     if (repeat) {
       result = literals[0] + (used++) * delta;
@@ -94,9 +103,58 @@ class RunLengthIntegerReader implements 
     return result;
   }
 
+  private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+  @Override
+  public int nextChunk(
+      ChunkWriter writer, BitFieldReader present, long rowsLeftToRead) throws IOException {
+    boolean mayHaveNulls = present != null;
+    int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+    if (rowsLeftToWrite == 0) {
+      return 0; // Cannot write any rows into this writer.
+    }
+    long originalRowsLeft = rowsLeftToRead;
+    // Start the big loop to read rows until we run out of either input or space.
+    while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+      int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+      presentHelper.availLength = Math.min(peekNextAvailLength(), rowsToTransfer);
+      if (mayHaveNulls) {
+        LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+      }
+      assert presentHelper.availLength > 0;
+      assert rowsLeftToRead >= presentHelper.availLength;
+
+      if (presentHelper.isNullsRun) {
+        writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+      } else {
+        NullsState nullsState = !mayHaveNulls ? NullsState.NO_NULLS :
+              (presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : NullsState.HAS_NULLS);
+        if (repeat && delta == 0) {
+          writer.writeRepeatedLongs(literals[0], presentHelper.availLength, nullsState);
+        } else {
+          writer.writeLongs(literals, used, presentHelper.availLength, nullsState);
+        }
+        skipCurrentLiterals(presentHelper.availLength);
+      }
+      rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+      rowsLeftToRead -= presentHelper.availLength;
+    } // End of big loop.
+    writer.finishCurrentSegment();
+    return (int)(originalRowsLeft - rowsLeftToRead);
+  }
+
+  private int peekNextAvailLength() throws IOException {
+    if (used == numLiterals) {
+      readValues(true);
+    }
+    return numLiterals - used;
+  }
+
+  private void skipCurrentLiterals(int valuesToSkip) {
+    assert (used + valuesToSkip) <= numLiterals;
+    used += valuesToSkip;
+  }
   @Override
-  public void nextVector(LongColumnVector previous, long previousLen)
-      throws IOException {
+  public void nextVector(LongColumnVector previous, long previousLen) throws IOException {
     previous.isRepeating = true;
     for (int i = 0; i < previousLen; i++) {
       if (!previous.isNull[i]) {
@@ -125,7 +183,7 @@ class RunLengthIntegerReader implements 
     if (consumed != 0) {
       // a loop is required for cases where we break the run into two parts
       while (consumed > 0) {
-        readValues();
+        readValues(false);
         used = consumed;
         consumed -= numLiterals;
       }
@@ -139,7 +197,7 @@ class RunLengthIntegerReader implements 
   public void skip(long numValues) throws IOException {
     while (numValues > 0) {
       if (used == numLiterals) {
-        readValues();
+        readValues(false);
       }
       long consume = Math.min(numValues, numLiterals - used);
       used += consume;

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java Tue Sep 16 17:50:02 2014
@@ -19,12 +19,19 @@ package org.apache.hadoop.hive.ql.io.orc
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.Arrays;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.api.Vector.Type;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter;
+import org.apache.hadoop.hive.llap.chunk.ChunkWriter.NullsState;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.io.orc.LlapUtils.PresentStreamReadResult;
 import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
 
 /**
@@ -33,13 +40,17 @@ import org.apache.hadoop.hive.ql.io.orc.
  * compression techniques.
  */
 class RunLengthIntegerReaderV2 implements IntegerReader {
+  public static final Log LOG = LogFactory.getLog(RunLengthIntegerReaderV2.class);
+
   private final InStream input;
   private final boolean signed;
   private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE];
+  private boolean isRepeating = false;
   private int numLiterals = 0;
   private int used = 0;
   private final boolean skipCorrupt;
   private final SerializationUtils utils;
+  private EncodingType currentEncoding;
 
   RunLengthIntegerReaderV2(InStream input, boolean signed,
       Configuration conf) throws IOException {
@@ -49,22 +60,25 @@ class RunLengthIntegerReaderV2 implement
     this.utils = new SerializationUtils();
   }
 
-  private void readValues() throws IOException {
+  private final static EncodingType[] encodings = EncodingType.values();
+  private void readValues(boolean ignoreEof) throws IOException {
     // read the first 2 bits and determine the encoding type
+    isRepeating = false;
     int firstByte = input.read();
     if (firstByte < 0) {
-      throw new EOFException("Read past end of RLE integer from " + input);
-    } else {
-      int enc = (firstByte >>> 6) & 0x03;
-      if (EncodingType.SHORT_REPEAT.ordinal() == enc) {
-        readShortRepeatValues(firstByte);
-      } else if (EncodingType.DIRECT.ordinal() == enc) {
-        readDirectValues(firstByte);
-      } else if (EncodingType.PATCHED_BASE.ordinal() == enc) {
-        readPatchedBaseValues(firstByte);
-      } else {
-        readDeltaValues(firstByte);
+      if (!ignoreEof) {
+        throw new EOFException("Read past end of RLE integer from " + input);
       }
+      used = numLiterals = 0;
+      return;
+    }
+    currentEncoding = encodings[(firstByte >>> 6) & 0x03];
+    switch (currentEncoding) {
+    case SHORT_REPEAT: readShortRepeatValues(firstByte); break;
+    case DIRECT: readDirectValues(firstByte); break;
+    case PATCHED_BASE: readPatchedBaseValues(firstByte); break;
+    case DELTA: readDeltaValues(firstByte); break;
+    default: throw new IOException("Unknown encoding " + currentEncoding);
     }
   }
 
@@ -97,10 +111,16 @@ class RunLengthIntegerReaderV2 implement
       // read the fixed delta value stored as vint (deltas can be negative even
       // if all number are positive)
       long fd = utils.readVslong(input);
-
-      // add fixed deltas to adjacent values
-      for(int i = 0; i < len; i++) {
-        literals[numLiterals++] = literals[numLiterals - 2] + fd;
+      if (fd == 0) {
+        isRepeating = true;
+        assert numLiterals == 1;
+        Arrays.fill(literals, numLiterals, numLiterals + len, literals[0]);
+        numLiterals += len;
+      } else {
+        // add fixed deltas to adjacent values
+        for(int i = 0; i < len; i++) {
+          literals[numLiterals++] = literals[numLiterals - 2] + fd;
+        }
       }
     } else {
       long deltaBase = utils.readVslong(input);
@@ -282,10 +302,18 @@ class RunLengthIntegerReaderV2 implement
       val = utils.zigzagDecode(val);
     }
 
+    if (numLiterals != 0) {
+      // Currently this always holds, which makes peekNextAvailLength simpler.
+      // If this changes, peekNextAvailLength should be adjusted accordingly.
+      throw new AssertionError("readValues called with existing values present");
+    }
     // repeat the value for length times
+    isRepeating = true;
+    // TODO: this is not so useful and V1 reader doesn't do that. Fix? Same if delta == 0
     for(int i = 0; i < len; i++) {
-      literals[numLiterals++] = val;
+      literals[i] = val;
     }
+    numLiterals = len;
   }
 
   @Override
@@ -299,7 +327,7 @@ class RunLengthIntegerReaderV2 implement
     if (used == numLiterals) {
       numLiterals = 0;
       used = 0;
-      readValues();
+      readValues(false);
     }
     result = literals[used++];
     return result;
@@ -314,7 +342,7 @@ class RunLengthIntegerReaderV2 implement
       // parts
       while (consumed > 0) {
         numLiterals = 0;
-        readValues();
+        readValues(false);
         used = consumed;
         consumed -= numLiterals;
       }
@@ -330,7 +358,7 @@ class RunLengthIntegerReaderV2 implement
       if (used == numLiterals) {
         numLiterals = 0;
         used = 0;
-        readValues();
+        readValues(false);
       }
       long consume = Math.min(numValues, numLiterals - used);
       used += consume;
@@ -338,6 +366,58 @@ class RunLengthIntegerReaderV2 implement
     }
   }
 
+  private final PresentStreamReadResult presentHelper = new PresentStreamReadResult();
+  @Override
+  public int nextChunk(
+      ChunkWriter writer, BitFieldReader present, long rowsLeftToRead) throws IOException {
+    boolean mayHaveNulls = present != null;
+    int rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+    if (rowsLeftToWrite == 0) {
+      return 0; // Cannot write any rows into this writer.
+    }
+    long originalRowsLeft = rowsLeftToRead;
+    // Start the big loop to read rows until we run out of either input or space.
+    while (rowsLeftToRead > 0 && rowsLeftToWrite > 0) {
+      int rowsToTransfer = (int)Math.min(rowsLeftToRead, rowsLeftToWrite);
+      presentHelper.availLength = Math.min(peekNextAvailLength(), rowsToTransfer);
+      if (mayHaveNulls) {
+        LlapUtils.readPresentStream(presentHelper, present, rowsToTransfer);
+      }
+      assert presentHelper.availLength > 0;
+      assert rowsLeftToRead >= presentHelper.availLength;
+      if (presentHelper.isNullsRun) {
+        writer.writeNulls(presentHelper.availLength, presentHelper.isFollowedByOther);
+      } else {
+        NullsState nullsState = !mayHaveNulls ? NullsState.NO_NULLS :
+              (presentHelper.isFollowedByOther ? NullsState.NEXT_NULL : NullsState.HAS_NULLS);
+        if (isRepeating) {
+          writer.writeRepeatedLongs(literals[0], presentHelper.availLength, nullsState);
+        } else {
+          writer.writeLongs(literals, used, presentHelper.availLength, nullsState);
+        }
+        skipCurrentLiterals(presentHelper.availLength);
+      }
+      rowsLeftToWrite = writer.estimateValueCountThatFits(Type.LONG, mayHaveNulls);
+      rowsLeftToRead -= presentHelper.availLength;
+    } // End of big loop.
+    writer.finishCurrentSegment();
+    return (int)(originalRowsLeft - rowsLeftToRead);
+  }
+
+  private void skipCurrentLiterals(int valuesToSkip) {
+    assert (used + valuesToSkip) <= numLiterals;
+    used += valuesToSkip;
+  }
+
+  private int peekNextAvailLength() throws IOException {
+    if (used == numLiterals) {
+      numLiterals = 0;
+      used = 0;
+      readValues(true);
+    }
+    return numLiterals - used;
+  }
+
   @Override
   public void nextVector(LongColumnVector previous, long previousLen) throws IOException {
     previous.isRepeating = true;

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Tue Sep 16 17:50:02 2014
@@ -27,6 +27,9 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
@@ -62,7 +65,7 @@ public class VectorizedOrcInputFormat ex
       this.offset = fileSplit.getStart();
       this.length = fileSplit.getLength();
       options.range(offset, length);
-      OrcInputFormat.setIncludedColumns(options, types, conf, true);
+      options.include(OrcInputFormat.genIncludedColumns(types, conf, true));
       OrcInputFormat.setSearchArgument(options, types, conf, true);
 
       this.reader = file.rowsOptions(options);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentFactory.java Tue Sep 16 17:50:02 2014
@@ -1,13 +1,19 @@
 package org.apache.hadoop.hive.ql.io.sarg;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 
 /**
  * A factory for creating SearchArguments.
  */
 public class SearchArgumentFactory {
+  public static final String SARG_PUSHDOWN = "sarg.pushdown";
+
   public static SearchArgument create(ExprNodeGenericFuncDesc expression) {
     return new SearchArgumentImpl(expression);
   }
@@ -19,4 +25,14 @@ public class SearchArgumentFactory {
   public static SearchArgument create(String kryo) {
     return SearchArgumentImpl.fromKryo(kryo);
   }
+
+  public static SearchArgument createFromConf(Configuration conf) {
+    String sargString = null;
+    if ((sargString = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR)) != null) {
+      return create(Utilities.deserializeExpression(sargString));
+    } else if ((sargString = conf.get(SARG_PUSHDOWN)) != null) {
+      return create(sargString);
+    }
+    return null;
+  }
 }

Modified: hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1625344&r1=1625343&r2=1625344&view=diff
==============================================================================
--- hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java (original)
+++ hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java Tue Sep 16 17:50:02 2014
@@ -1589,7 +1589,7 @@ public class TestInputOutputFormat {
     types.add(builder.build());
     SearchArgument isNull = SearchArgumentFactory.newBuilder()
         .startAnd().isNull("cost").end().build();
-    conf.set(OrcInputFormat.SARG_PUSHDOWN, isNull.toKryo());
+    conf.set(SearchArgumentFactory.SARG_PUSHDOWN, isNull.toKryo());
     conf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR,
         "url,cost");
     options.include(new boolean[]{true, true, false, true, false});

Added: hive/branches/llap/ql/src/test/queries/clientcompare/llap_0.q
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/queries/clientcompare/llap_0.q?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/test/queries/clientcompare/llap_0.q (added)
+++ hive/branches/llap/ql/src/test/queries/clientcompare/llap_0.q Tue Sep 16 17:50:02 2014
@@ -0,0 +1,11 @@
+SET hive.vectorized.execution.enabled=true;
+
+SELECT   cfloat,
+         cint,
+         cdouble,
+         cbigint
+FROM     alltypesorc
+WHERE    (cbigint > -23)
+           AND ((cdouble != 988888)
+                OR (cint > -863.257))
+ORDER BY cbigint, cfloat;

Added: hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_00.qv
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_00.qv?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_00.qv (added)
+++ hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_00.qv Tue Sep 16 17:50:02 2014
@@ -0,0 +1 @@
+set hive.llap.enabled=false;

Added: hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_01.qv
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_01.qv?rev=1625344&view=auto
==============================================================================
--- hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_01.qv (added)
+++ hive/branches/llap/ql/src/test/queries/clientcompare/llap_0_01.qv Tue Sep 16 17:50:02 2014
@@ -0,0 +1 @@
+set hive.llap.enabled=true;