You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/15 21:05:36 UTC

svn commit: r1514438 [1/3] - in /hive/trunk: ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/io/ ql/src/java/org/apache/hadoop/hive/ql/io/orc/ ql/src/java/org/apache/hadoop/hive/ql/io/sarg/ ql/src/java/org/apache/hadoo...

Author: gunther
Date: Thu Aug 15 19:05:35 2013
New Revision: 1514438

URL: http://svn.apache.org/r1514438
Log:
HIVE-4246: Implement predicate pushdown for ORC (Owen O'Malley via Gunther Hagleitner)

Added:
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcFile.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRunLengthByteReader.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRunLengthIntegerReader.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/sarg/TestSearchArgumentImpl.java
    hive/trunk/ql/src/test/results/compiler/plan/case_sensitivity.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/cast1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/groupby6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input20.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input7.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input8.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input9.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_part1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_testsequencefile.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_testxpath.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/input_testxpath2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join7.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/join8.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample2.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample3.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample5.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/sample7.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/subq.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf1.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf4.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf6.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf_case.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/udf_when.q.xml
    hive/trunk/ql/src/test/results/compiler/plan/union.q.xml
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Thu Aug 15 19:05:35 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -241,6 +242,7 @@ public class TableScanOperator extends O
   // and 2) it will fail some join and union queries if this is added forcibly
   // into tableScanDesc
   java.util.ArrayList<Integer> neededColumnIDs;
+  List<String> neededColumns;
 
   public void setNeededColumnIDs(java.util.ArrayList<Integer> orign_columns) {
     neededColumnIDs = orign_columns;
@@ -250,6 +252,14 @@ public class TableScanOperator extends O
     return neededColumnIDs;
   }
 
+  public void setNeededColumns(List<String> columnNames) {
+    neededColumns = columnNames;
+  }
+
+  public List<String> getNeededColumns() {
+    return neededColumns;
+  }
+
   @Override
   public OperatorType getType() {
     return OperatorType.TABLESCAN;

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Thu Aug 15 19:05:35 2013
@@ -428,6 +428,8 @@ public class HiveInputFormat<K extends W
         } else {
           ColumnProjectionUtils.setFullyReadColumns(jobConf);
         }
+        ColumnProjectionUtils.appendReadColumnNames(jobConf,
+            tableScan.getNeededColumns());
 
         pushFilters(jobConf, tableScan);
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java Thu Aug 15 19:05:35 2013
@@ -39,7 +39,7 @@ class BitFieldReader {
       current = 0xff & input.next();
       bitsLeft = 8;
     } else {
-      throw new EOFException("Read past end of bit field from " + input);
+      throw new EOFException("Read past end of bit field from " + this);
     }
   }
 
@@ -85,4 +85,10 @@ class BitFieldReader {
       bitsLeft = (int) (8 - (totalBits % 8));
     }
   }
+
+  @Override
+  public String toString() {
+    return "bit reader current: " + current + " bits left: " + bitsLeft +
+        " bit size: " + bitSize + " from " + input;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Thu Aug 15 19:05:35 2013
@@ -25,97 +25,149 @@ abstract class InStream extends InputStr
 
   private static class UncompressedStream extends InStream {
     private final String name;
-    private byte[] array;
-    private int offset;
-    private final int base;
-    private final int limit;
+    private final ByteBuffer[] bytes;
+    private final long[] offsets;
+    private final long length;
+    private long currentOffset;
+    private byte[] range;
+    private int currentRange;
+    private int offsetInRange;
+    private int limitInRange;
 
-    public UncompressedStream(String name, ByteBuffer input) {
+    public UncompressedStream(String name, ByteBuffer[] input, long[] offsets,
+                              long length) {
       this.name = name;
-      this.array = input.array();
-      base = input.arrayOffset() + input.position();
-      offset = base;
-      limit = input.arrayOffset() + input.limit();
+      this.bytes = input;
+      this.offsets = offsets;
+      this.length = length;
+      currentRange = 0;
+      offsetInRange = 0;
+      limitInRange = 0;
+      currentOffset = 0;
     }
 
     @Override
     public int read() {
-      if (offset == limit) {
-        return -1;
+      if (offsetInRange >= limitInRange) {
+        if (currentOffset == length) {
+          return -1;
+        }
+        seek(currentOffset);
       }
-      return 0xff & array[offset++];
+      currentOffset += 1;
+      return 0xff & range[offsetInRange++];
     }
 
     @Override
     public int read(byte[] data, int offset, int length) {
-      if (this.offset == limit) {
-        return -1;
+      if (offsetInRange >= limitInRange) {
+        if (currentOffset == this.length) {
+          return -1;
+        }
+        seek(currentOffset);
       }
-      int actualLength = Math.min(length, limit - this.offset);
-      System.arraycopy(array, this.offset, data, offset, actualLength);
-      this.offset += actualLength;
+      int actualLength = Math.min(length, limitInRange - offsetInRange);
+      System.arraycopy(range, offsetInRange, data, offset, actualLength);
+      offsetInRange += actualLength;
+      currentOffset += actualLength;
       return actualLength;
     }
 
     @Override
     public int available() {
-      return limit - offset;
+      if (offsetInRange < limitInRange) {
+        return limitInRange - offsetInRange;
+      }
+      return (int) (length - currentOffset);
     }
 
     @Override
     public void close() {
-      array = null;
-      offset = 0;
+      currentRange = bytes.length;
+      currentOffset = length;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
-      offset = base + (int) index.getNext();
+      seek(index.getNext());
+    }
+
+    public void seek(long desired) {
+      for(int i = 0; i < bytes.length; ++i) {
+        if (offsets[i] <= desired &&
+            desired - offsets[i] < bytes[i].remaining()) {
+          currentOffset = desired;
+          currentRange = i;
+          this.range = bytes[i].array();
+          offsetInRange = bytes[i].arrayOffset() + bytes[i].position();
+          limitInRange = bytes[i].arrayOffset() + bytes[i].limit();
+          offsetInRange += desired - offsets[i];
+          return;
+        }
+      }
+      throw new IllegalArgumentException("Seek in " + name + " to " +
+        desired + " is outside of the data");
     }
 
     @Override
     public String toString() {
-      return "uncompressed stream " + name + " base: " + base +
-         " offset: " + offset + " limit: " + limit;
+      return "uncompressed stream " + name + " position: " + currentOffset +
+          " length: " + length + " range: " + currentRange +
+          " offset: " + offsetInRange + " limit: " + limitInRange;
     }
   }
 
   private static class CompressedStream extends InStream {
     private final String name;
-    private byte[] array;
+    private final ByteBuffer[] bytes;
+    private final long[] offsets;
     private final int bufferSize;
+    private final long length;
     private ByteBuffer uncompressed = null;
     private final CompressionCodec codec;
-    private int offset;
-    private final int base;
-    private final int limit;
+    private byte[] compressed = null;
+    private long currentOffset;
+    private int currentRange;
+    private int offsetInCompressed;
+    private int limitInCompressed;
     private boolean isUncompressedOriginal;
 
-    public CompressedStream(String name, ByteBuffer input,
+    public CompressedStream(String name, ByteBuffer[] input,
+                            long[] offsets, long length,
                             CompressionCodec codec, int bufferSize
                            ) {
-      this.array = input.array();
+      this.bytes = input;
       this.name = name;
       this.codec = codec;
+      this.length = length;
+      this.offsets = offsets;
       this.bufferSize = bufferSize;
-      base = input.arrayOffset() + input.position();
-      offset = base;
-      limit = input.arrayOffset() + input.limit();
+      currentOffset = 0;
+      currentRange = 0;
+      offsetInCompressed = 0;
+      limitInCompressed = 0;
     }
 
     private void readHeader() throws IOException {
-      if (limit - offset > OutStream.HEADER_SIZE) {
-        int chunkLength = ((0xff & array[offset + 2]) << 15) |
-          ((0xff & array[offset + 1]) << 7) | ((0xff & array[offset]) >> 1);
+      if (compressed == null || offsetInCompressed >= limitInCompressed) {
+        seek(currentOffset);
+      }
+      if (limitInCompressed - offsetInCompressed > OutStream.HEADER_SIZE) {
+        int chunkLength = ((0xff & compressed[offsetInCompressed + 2]) << 15) |
+          ((0xff & compressed[offsetInCompressed + 1]) << 7) |
+            ((0xff & compressed[offsetInCompressed]) >> 1);
         if (chunkLength > bufferSize) {
           throw new IllegalArgumentException("Buffer size too small. size = " +
               bufferSize + " needed = " + chunkLength);
         }
-        boolean isOriginal = (array[offset] & 0x01) == 1;
-        offset += OutStream.HEADER_SIZE;
+        boolean isOriginal = (compressed[offsetInCompressed] & 0x01) == 1;
+        offsetInCompressed += OutStream.HEADER_SIZE;
         if (isOriginal) {
           isUncompressedOriginal = true;
-          uncompressed = ByteBuffer.wrap(array, offset, chunkLength);
+          uncompressed = bytes[currentRange].duplicate();
+          uncompressed.position(offsetInCompressed -
+              bytes[currentRange].arrayOffset());
+          uncompressed.limit(offsetInCompressed + chunkLength);
         } else {
           if (isUncompressedOriginal) {
             uncompressed = ByteBuffer.allocate(bufferSize);
@@ -125,19 +177,21 @@ abstract class InStream extends InputStr
           } else {
             uncompressed.clear();
           }
-          codec.decompress(ByteBuffer.wrap(array, offset, chunkLength),
+          codec.decompress(ByteBuffer.wrap(compressed, offsetInCompressed,
+              chunkLength),
             uncompressed);
         }
-        offset += chunkLength;
+        offsetInCompressed += chunkLength;
+        currentOffset += chunkLength + OutStream.HEADER_SIZE;
       } else {
-        throw new IllegalStateException("Can't read header");
+        throw new IllegalStateException("Can't read header at " + this);
       }
     }
 
     @Override
     public int read() throws IOException {
       if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (offset == limit) {
+        if (currentOffset == length) {
           return -1;
         }
         readHeader();
@@ -148,7 +202,7 @@ abstract class InStream extends InputStr
     @Override
     public int read(byte[] data, int offset, int length) throws IOException {
       if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (this.offset == this.limit) {
+        if (currentOffset == this.length) {
           return -1;
         }
         readHeader();
@@ -164,7 +218,7 @@ abstract class InStream extends InputStr
     @Override
     public int available() throws IOException {
       if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (offset == limit) {
+        if (currentOffset == length) {
           return 0;
         }
         readHeader();
@@ -174,27 +228,74 @@ abstract class InStream extends InputStr
 
     @Override
     public void close() {
-      array = null;
       uncompressed = null;
-      offset = 0;
+      currentRange = bytes.length;
+      offsetInCompressed = 0;
+      limitInCompressed = 0;
+      currentOffset = length;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
-      offset = base + (int) index.getNext();
-      int uncompBytes = (int) index.getNext();
-      if (uncompBytes != 0) {
+      seek(index.getNext());
+      long uncompressedBytes = index.getNext();
+      if (uncompressedBytes != 0) {
         readHeader();
-        uncompressed.position(uncompressed.position() + uncompBytes);
+        uncompressed.position(uncompressed.position() +
+                              (int) uncompressedBytes);
       } else if (uncompressed != null) {
+        // mark the uncompressed buffer as done
         uncompressed.position(uncompressed.limit());
       }
     }
 
+    private void seek(long desired) throws IOException {
+      for(int i = 0; i < bytes.length; ++i) {
+        if (offsets[i] <= desired &&
+            desired - offsets[i] < bytes[i].remaining()) {
+          currentRange = i;
+          compressed = bytes[i].array();
+          offsetInCompressed = (int) (bytes[i].arrayOffset() +
+              bytes[i].position() + (desired - offsets[i]));
+          currentOffset = desired;
+          limitInCompressed = bytes[i].arrayOffset() + bytes[i].limit();
+          return;
+        }
+      }
+      // if they are seeking to the precise end, go ahead and let them go there
+      int segments = bytes.length;
+      if (segments != 0 &&
+          desired == offsets[segments - 1] + bytes[segments - 1].remaining()) {
+        currentRange = segments - 1;
+        compressed = bytes[currentRange].array();
+        offsetInCompressed = bytes[currentRange].arrayOffset() +
+          bytes[currentRange].limit();
+        currentOffset = desired;
+        limitInCompressed = offsetInCompressed;
+        return;
+      }
+      throw new IOException("Seek outside of data in " + this + " to " +
+        desired);
+    }
+
+    private String rangeString() {
+      StringBuilder builder = new StringBuilder();
+      for(int i=0; i < offsets.length; ++i) {
+        if (i != 0) {
+          builder.append("; ");
+        }
+        builder.append(" range " + i + " = " + offsets[i] + " to " +
+            bytes[i].remaining());
+      }
+      return builder.toString();
+    }
+
     @Override
     public String toString() {
-      return "compressed stream " + name + " base: " + base +
-          " offset: " + offset + " limit: " + limit +
+      return "compressed stream " + name + " position: " + currentOffset +
+          " length: " + length + " range: " + currentRange +
+          " offset: " + offsetInCompressed + " limit: " + limitInCompressed +
+          rangeString() +
           (uncompressed == null ? "" :
               " uncompressed: " + uncompressed.position() + " to " +
                   uncompressed.limit());
@@ -203,14 +304,29 @@ abstract class InStream extends InputStr
 
   public abstract void seek(PositionProvider index) throws IOException;
 
+  /**
+   * Create an input stream from a list of buffers.
+   * @param name the name of the stream
+   * @param input the list of ranges of bytes for the stream
+   * @param offsets a list of offsets (the same length as input) that must
+   *                contain the first offset of the each set of bytes in input
+   * @param length the length in bytes of the stream
+   * @param codec the compression codec
+   * @param bufferSize the compression buffer size
+   * @return an input stream
+   * @throws IOException
+   */
   public static InStream create(String name,
-                                ByteBuffer input,
+                                ByteBuffer[] input,
+                                long[] offsets,
+                                long length,
                                 CompressionCodec codec,
                                 int bufferSize) throws IOException {
     if (codec == null) {
-      return new UncompressedStream(name, input);
+      return new UncompressedStream(name, input, offsets, length);
     } else {
-      return new CompressedStream(name, input, codec, bufferSize);
+      return new CompressedStream(name, input, offsets, length, codec,
+          bufferSize);
     }
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Aug 15 19:05:35 2013
@@ -18,13 +18,22 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -43,6 +52,8 @@ import java.util.List;
 public class OrcInputFormat  extends FileInputFormat<NullWritable, OrcStruct>
   implements InputFormatChecker {
 
+  private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
+
   private static class OrcRecordReader
       implements RecordReader<NullWritable, OrcStruct> {
     private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
@@ -53,14 +64,38 @@ public class OrcInputFormat  extends Fil
 
     OrcRecordReader(Reader file, Configuration conf,
                     long offset, long length) throws IOException {
-      this.reader = file.rows(offset, length,
-          findIncludedColumns(file.getTypes(), conf));
+      String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+      String columnNamesString =
+          conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+      String[] columnNames = null;
+      SearchArgument sarg = null;
       List<OrcProto.Type> types = file.getTypes();
       if (types.size() == 0) {
         numColumns = 0;
       } else {
         numColumns = types.get(0).getSubtypesCount();
       }
+      columnNames = new String[types.size()];
+      LOG.info("included column ids = " +
+          conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "null"));
+      LOG.info("included columns names = " +
+          conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "null"));
+      boolean[] includeColumn = findIncludedColumns(types, conf);
+      if (serializedPushdown != null && columnNamesString != null) {
+        sarg = SearchArgument.FACTORY.create
+            (Utilities.deserializeExpression(serializedPushdown, conf));
+        LOG.info("ORC pushdown predicate: " + sarg);
+        String[] neededColumnNames = columnNamesString.split(",");
+        int i = 0;
+        for(int columnId: types.get(0).getSubtypesList()) {
+          if (includeColumn[columnId]) {
+            columnNames[columnId] = neededColumnNames[i++];
+          }
+        }
+      } else {
+        LOG.info("No ORC pushdown predicate");
+      }
+      this.reader = file.rows(offset, length,includeColumn, sarg, columnNames);
       this.offset = offset;
       this.length = length;
     }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Thu Aug 15 19:05:35 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
@@ -38,6 +40,9 @@ import java.util.Properties;
  * It transparently passes the object to/from the ORC file reader/writer.
  */
 public class OrcSerde implements SerDe {
+
+  private static final Log LOG = LogFactory.getLog(OrcSerde.class);
+
   private final OrcSerdeRow row = new OrcSerdeRow();
   private ObjectInspector inspector = null;
 
@@ -129,4 +134,5 @@ public class OrcSerde implements SerDe {
   public SerDeStats getSerDeStats() {
     return null;
   }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Thu Aug 15 19:05:35 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 import java.io.IOException;
@@ -118,8 +119,26 @@ public interface Reader {
    * @param include true for each column that should be included
    * @return a new RecordReader that will read the specified rows.
    * @throws IOException
+   * @deprecated
    */
   RecordReader rows(long offset, long length,
                     boolean[] include) throws IOException;
 
+  /**
+   * Create a RecordReader that will read a section of a file. It starts reading
+   * at the first stripe after the offset and continues to the stripe that
+   * starts at offset + length. It also accepts a list of columns to read and a
+   * search argument.
+   * @param offset the minimum offset of the first stripe to read
+   * @param length the distance from offset of the first address to stop reading
+   *               at
+   * @param include true for each column that should be included
+   * @param sarg a search argument that limits the rows that should be read.
+   * @param neededColumns the names of the included columns
+   * @return the record reader for the rows
+   */
+  RecordReader rows(long offset, long length,
+                    boolean[] include, SearchArgument sarg,
+                    String[] neededColumns) throws IOException;
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Thu Aug 15 19:05:35 2013
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Text;
 
@@ -307,7 +308,8 @@ final class ReaderImpl implements Reader
       buffer.position(psOffset - footerSize);
       buffer.limit(psOffset);
     }
-    InputStream instream = InStream.create("footer", buffer, codec, bufferSize);
+    InputStream instream = InStream.create("footer", new ByteBuffer[]{buffer},
+        new long[]{0L}, footerSize, codec, bufferSize);
     footer = OrcProto.Footer.parseFrom(instream);
     inspector = OrcStruct.createObjectInspector(0, footer.getTypesList());
     file.close();
@@ -315,15 +317,22 @@ final class ReaderImpl implements Reader
 
   @Override
   public RecordReader rows(boolean[] include) throws IOException {
-    return rows(0, Long.MAX_VALUE, include);
+    return rows(0, Long.MAX_VALUE, include, null, null);
   }
 
   @Override
   public RecordReader rows(long offset, long length, boolean[] include
                            ) throws IOException {
+    return rows(offset, length, include, null, null);
+  }
+
+  @Override
+  public RecordReader rows(long offset, long length, boolean[] include,
+                           SearchArgument sarg, String[] columnNames
+                           ) throws IOException {
     return new RecordReaderImpl(this.getStripes(), fileSystem,  path, offset,
-      length, footer.getTypesList(), codec, bufferSize,
-      include, footer.getRowIndexStride());
+        length, footer.getTypesList(), codec, bufferSize,
+        include, footer.getRowIndexStride(), sarg, columnNames);
   }
 
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Thu Aug 15 19:05:35 2013
@@ -27,12 +27,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
 import org.apache.hadoop.hive.serde2.io.ByteWritable;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
@@ -45,6 +51,9 @@ import org.apache.hadoop.io.LongWritable
 import org.apache.hadoop.io.Text;
 
 class RecordReaderImpl implements RecordReader {
+
+  private static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
+
   private final FSDataInputStream file;
   private final long firstRow;
   private final List<StripeInformation> stripes =
@@ -52,17 +61,25 @@ class RecordReaderImpl implements Record
   private OrcProto.StripeFooter stripeFooter;
   private final long totalRowCount;
   private final CompressionCodec codec;
+  private final List<OrcProto.Type> types;
   private final int bufferSize;
   private final boolean[] included;
   private final long rowIndexStride;
   private long rowInStripe = 0;
-  private int currentStripe = 0;
+  private int currentStripe = -1;
   private long rowBaseInStripe = 0;
   private long rowCountInStripe = 0;
   private final Map<StreamName, InStream> streams =
       new HashMap<StreamName, InStream>();
   private final TreeReader reader;
   private final OrcProto.RowIndex[] indexes;
+  private final SearchArgument sarg;
+  // the leaf predicates for the sarg
+  private final List<PredicateLeaf> sargLeaves;
+  // an array the same length as the sargLeaves that map them to column ids
+  private final int[] filterColumns;
+  // an array about which row groups aren't skipped
+  private boolean[] includedRowGroups = null;
 
   RecordReaderImpl(Iterable<StripeInformation> stripes,
                    FileSystem fileSystem,
@@ -72,12 +89,27 @@ class RecordReaderImpl implements Record
                    CompressionCodec codec,
                    int bufferSize,
                    boolean[] included,
-                   long strideRate
+                   long strideRate,
+                   SearchArgument sarg,
+                   String[] columnNames
                   ) throws IOException {
     this.file = fileSystem.open(path);
     this.codec = codec;
+    this.types = types;
     this.bufferSize = bufferSize;
     this.included = included;
+    this.sarg = sarg;
+    if (sarg != null) {
+      sargLeaves = sarg.getLeaves();
+      filterColumns = new int[sargLeaves.size()];
+      for(int i=0; i < filterColumns.length; ++i) {
+        String colName = sargLeaves.get(i).getColumnName();
+        filterColumns[i] = findColumns(columnNames, colName);
+      }
+    } else {
+      sargLeaves = null;
+      filterColumns = null;
+    }
     long rows = 0;
     long skippedRows = 0;
     for(StripeInformation stripe: stripes) {
@@ -94,9 +126,17 @@ class RecordReaderImpl implements Record
     reader = createTreeReader(path, 0, types, included);
     indexes = new OrcProto.RowIndex[types.size()];
     rowIndexStride = strideRate;
-    if (this.stripes.size() > 0) {
-      readStripe();
+    advanceToNextRow(0L);
+  }
+
+  private static int findColumns(String[] columnNames,
+                                 String columnName) {
+    for(int i=0; i < columnNames.length; ++i) {
+      if (columnName.equals(columnNames[i])) {
+        return i;
+      }
     }
+    return -1;
   }
 
   private static final class PositionProviderImpl implements PositionProvider {
@@ -1418,113 +1458,707 @@ class RecordReaderImpl implements Record
     ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
     file.seek(offset);
     file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
-    return OrcProto.StripeFooter.parseFrom(InStream.create("footer", tailBuf,
-      codec, bufferSize));
+    return OrcProto.StripeFooter.parseFrom(InStream.create("footer",
+        new ByteBuffer[]{tailBuf}, new long[]{0}, tailLength, codec,
+        bufferSize));
   }
 
-  private void readStripe() throws IOException {
-    StripeInformation stripe = stripes.get(currentStripe);
-    stripeFooter = readStripeFooter(stripe);
-    long offset = stripe.getOffset();
-    streams.clear();
+  static enum Location {
+    BEFORE, MIN, MIDDLE, MAX, AFTER
+  }
 
-    // if we aren't projecting columns, just read the whole stripe
-    if (included == null) {
-      byte[] buffer =
-        new byte[(int) (stripe.getDataLength())];
-      file.seek(offset + stripe.getIndexLength());
-      file.readFully(buffer, 0, buffer.length);
-      int sectionOffset = 0;
-      for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
-        if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
-          int sectionLength = (int) section.getLength();
-          ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
-              sectionLength);
-          StreamName name = new StreamName(section.getColumn(),
-              section.getKind());
-          streams.put(name,
-              InStream.create(name.toString(), sectionBuffer, codec,
-                  bufferSize));
-          sectionOffset += sectionLength;
-        }
+  /**
+   * Given a point and min and max, determine if the point is before, at the
+   * min, in the middle, at the max, or after the range.
+   * @param point the point to test
+   * @param min the minimum point
+   * @param max the maximum point
+   * @param <T> the type of the comparision
+   * @return the location of the point
+   */
+  static <T> Location compareToRange(Comparable<T> point, T min, T max) {
+    int minCompare = point.compareTo(min);
+    if (minCompare < 0) {
+      return Location.BEFORE;
+    } else if (minCompare == 0) {
+      return Location.MIN;
+    }
+    int maxCompare = point.compareTo(max);
+    if (maxCompare > 0) {
+      return Location.AFTER;
+    } else if (maxCompare == 0) {
+      return Location.MAX;
+    }
+    return Location.MIDDLE;
+  }
+
+  /**
+   * Get the minimum value out of an index entry.
+   * @param index the index entry
+   * @return the object for the minimum value or null if there isn't one
+   */
+  static Object getMin(OrcProto.ColumnStatistics index) {
+    if (index.hasIntStatistics()) {
+      OrcProto.IntegerStatistics stat = index.getIntStatistics();
+      if (stat.hasMinimum()) {
+        return stat.getMinimum();
       }
-    } else {
-      List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
-      // the index of the current section
-      int currentSection = 0;
-      while (currentSection < streamList.size() &&
-          StreamName.getArea(streamList.get(currentSection).getKind()) !=
-              StreamName.Area.DATA) {
-        currentSection += 1;
-      }
-      // byte position of the current section relative to the stripe start
-      long sectionOffset = stripe.getIndexLength();
-      while (currentSection < streamList.size()) {
-        int bytes = 0;
-
-        // find the first section that shouldn't be read
-        int excluded=currentSection;
-        while (excluded < streamList.size() &&
-               included[streamList.get(excluded).getColumn()]) {
-          bytes += streamList.get(excluded).getLength();
-          excluded += 1;
-        }
-
-        // actually read the bytes as a big chunk
-        if (bytes != 0) {
-          byte[] buffer = new byte[bytes];
-          file.seek(offset + sectionOffset);
-          file.readFully(buffer, 0, bytes);
-          sectionOffset += bytes;
-
-          // create the streams for the sections we just read
-          bytes = 0;
-          while (currentSection < excluded) {
-            OrcProto.Stream section = streamList.get(currentSection);
-            StreamName name =
-              new StreamName(section.getColumn(), section.getKind());
-            this.streams.put(name,
-                InStream.create(name.toString(),
-                    ByteBuffer.wrap(buffer, bytes,
-                        (int) section.getLength()), codec, bufferSize));
-            currentSection += 1;
-            bytes += section.getLength();
+    }
+    if (index.hasStringStatistics()) {
+      OrcProto.StringStatistics stat = index.getStringStatistics();
+      if (stat.hasMinimum()) {
+        return stat.getMinimum();
+      }
+    }
+    if (index.hasDoubleStatistics()) {
+      OrcProto.DoubleStatistics stat = index.getDoubleStatistics();
+      if (stat.hasMinimum()) {
+        return stat.getMinimum();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get the maximum value out of an index entry.
+   * @param index the index entry
+   * @return the object for the maximum value or null if there isn't one
+   */
+  static Object getMax(OrcProto.ColumnStatistics index) {
+    if (index.hasIntStatistics()) {
+      OrcProto.IntegerStatistics stat = index.getIntStatistics();
+      if (stat.hasMaximum()) {
+        return stat.getMaximum();
+      }
+    }
+    if (index.hasStringStatistics()) {
+      OrcProto.StringStatistics stat = index.getStringStatistics();
+      if (stat.hasMaximum()) {
+        return stat.getMaximum();
+      }
+    }
+    if (index.hasDoubleStatistics()) {
+      OrcProto.DoubleStatistics stat = index.getDoubleStatistics();
+      if (stat.hasMaximum()) {
+        return stat.getMaximum();
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Evaluate a predicate with respect to the statistics from the column
+   * that is referenced in the predicate.
+   * @param index the statistics for the column mentioned in the predicate
+   * @param predicate the leaf predicate we need to evaluation
+   * @return the set of truth values that may be returned for the given
+   *   predicate.
+   */
+  static TruthValue evaluatePredicate(OrcProto.ColumnStatistics index,
+                               PredicateLeaf predicate) {
+    Object minValue = getMin(index);
+    // if we didn't have any values, everything must have been null
+    if (minValue == null) {
+      if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
+        return TruthValue.YES;
+      } else {
+        return TruthValue.NULL;
+      }
+    }
+    Object maxValue = getMax(index);
+    Location loc;
+    switch (predicate.getOperator()) {
+      case NULL_SAFE_EQUALS:
+        loc = compareToRange((Comparable) predicate.getLiteral(),
+            minValue, maxValue);
+        if (loc == Location.BEFORE || loc == Location.AFTER) {
+          return TruthValue.NO;
+        } else {
+          return TruthValue.YES_NO;
+        }
+      case EQUALS:
+        loc = compareToRange((Comparable) predicate.getLiteral(),
+            minValue, maxValue);
+        if (minValue.equals(maxValue) && loc == Location.MIN) {
+          return TruthValue.YES_NULL;
+        } else if (loc == Location.BEFORE || loc == Location.AFTER) {
+          return TruthValue.NO_NULL;
+        } else {
+          return TruthValue.YES_NO_NULL;
+        }
+      case LESS_THAN:
+        loc = compareToRange((Comparable) predicate.getLiteral(),
+            minValue, maxValue);
+        if (loc == Location.AFTER) {
+          return TruthValue.YES_NULL;
+        } else if (loc == Location.BEFORE || loc == Location.MIN) {
+          return TruthValue.NO_NULL;
+        } else {
+          return TruthValue.YES_NO_NULL;
+        }
+      case LESS_THAN_EQUALS:
+        loc = compareToRange((Comparable) predicate.getLiteral(),
+            minValue, maxValue);
+        if (loc == Location.AFTER || loc == Location.MAX) {
+          return TruthValue.YES_NULL;
+        } else if (loc == Location.BEFORE) {
+          return TruthValue.NO_NULL;
+        } else {
+          return TruthValue.YES_NO_NULL;
+        }
+      case IN:
+        if (minValue.equals(maxValue)) {
+          // for a single value, look through to see if that value is in the
+          // set
+          for(Object arg: predicate.getLiteralList()) {
+            loc = compareToRange((Comparable) arg, minValue, maxValue);
+            if (loc == Location.MIN) {
+              return TruthValue.YES_NULL;
+            }
+          }
+          return TruthValue.NO_NULL;
+        } else {
+          // are all of the values outside of the range?
+          for(Object arg: predicate.getLiteralList()) {
+            loc = compareToRange((Comparable) arg, minValue, maxValue);
+            if (loc == Location.MIN || loc == Location.MIDDLE ||
+                loc == Location.MAX) {
+              return TruthValue.YES_NO_NULL;
+            }
           }
+          return TruthValue.NO_NULL;
         }
-
-        // skip forward until we get back to a section that we need
-        while (currentSection < streamList.size() &&
-               !included[streamList.get(currentSection).getColumn()]) {
-          sectionOffset += streamList.get(currentSection).getLength();
-          currentSection += 1;
+      case BETWEEN:
+        List<Object> args = predicate.getLiteralList();
+        loc = compareToRange((Comparable) args.get(0), minValue, maxValue);
+        if (loc == Location.BEFORE || loc == Location.MIN) {
+          Location loc2 = compareToRange((Comparable) args.get(1), minValue,
+              maxValue);
+          if (loc2 == Location.AFTER || loc2 == Location.MAX) {
+            return TruthValue.YES_NULL;
+          } else if (loc2 == Location.BEFORE) {
+            return TruthValue.NO_NULL;
+          } else {
+            return TruthValue.YES_NO_NULL;
+          }
+        } else if (loc == Location.AFTER) {
+          return TruthValue.NO_NULL;
+        } else {
+          return TruthValue.YES_NO_NULL;
         }
+      case IS_NULL:
+        return TruthValue.YES_NO;
+      default:
+        return TruthValue.YES_NO_NULL;
+    }
+  }
+
+  /**
+   * Pick the row groups that we need to load from the current stripe.
+   * @return an array with a boolean for each row group or null if all of the
+   *    row groups must be read.
+   * @throws IOException
+   */
+  private boolean[] pickRowGroups() throws IOException {
+    // if we don't have a sarg or indexes, we read everything
+    if (sarg == null || rowIndexStride == 0) {
+      return null;
+    }
+    readRowIndex();
+    long rowsInStripe = stripes.get(currentStripe).getNumberOfRows();
+    int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) /
+        rowIndexStride);
+    boolean[] result = new boolean[groupsInStripe];
+    TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
+    for(int rowGroup=0; rowGroup < result.length; ++rowGroup) {
+      for(int pred=0; pred < leafValues.length; ++pred) {
+        OrcProto.ColumnStatistics stats =
+            indexes[filterColumns[pred]].getEntry(rowGroup).getStatistics();
+        leafValues[pred] = evaluatePredicate(stats, sargLeaves.get(pred));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Stats = " + stats);
+          LOG.debug("Setting " + sargLeaves.get(pred) + " to " +
+              leafValues[pred]);
+        }
+      }
+      result[rowGroup] = sarg.evaluate(leafValues).isNotNeeded();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
+            (rowIndexStride * (rowGroup+1) - 1) + " is " +
+            (result[rowGroup] ? "" : "not ") + "included.");
       }
     }
-    reader.startStripe(streams, stripeFooter.getColumnsList());
-    rowInStripe = 0;
+
+    // if we found something to skip, use the array. otherwise, return null.
+    for(boolean b: result) {
+      if (!b) {
+        return result;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Read the current stripe into memory.
+   * @throws IOException
+   */
+  private void readStripe() throws IOException {
+    StripeInformation stripe = stripes.get(currentStripe);
+    stripeFooter = readStripeFooter(stripe);
+    streams.clear();
+    // setup the position in the stripe
     rowCountInStripe = stripe.getNumberOfRows();
+    rowInStripe = 0;
     rowBaseInStripe = 0;
     for(int i=0; i < currentStripe; ++i) {
       rowBaseInStripe += stripes.get(i).getNumberOfRows();
     }
+    // reset all of the indexes
     for(int i=0; i < indexes.length; ++i) {
       indexes[i] = null;
     }
+    includedRowGroups = pickRowGroups();
+
+    // move forward to the first unskipped row
+    if (includedRowGroups != null) {
+      while (rowInStripe < rowCountInStripe &&
+             !includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
+        rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
+      }
+    }
+
+    // if we haven't skipped the whole stripe, read the data
+    if (rowInStripe < rowCountInStripe) {
+      // if we aren't projecting columns or filtering rows, just read it all
+      if (included == null && includedRowGroups == null) {
+        readAllDataStreams(stripe);
+      } else {
+        readPartialDataStreams(stripe);
+      }
+      reader.startStripe(streams, stripeFooter.getColumnsList());
+      // if we skipped the first row group, move the pointers forward
+      if (rowInStripe != 0) {
+        seekToRowEntry((int) (rowInStripe / rowIndexStride));
+      }
+    }
+  }
+
+  private void readAllDataStreams(StripeInformation stripe
+                                  ) throws IOException {
+    byte[] buffer =
+      new byte[(int) (stripe.getDataLength())];
+    file.seek(stripe.getOffset() + stripe.getIndexLength());
+    file.readFully(buffer, 0, buffer.length);
+    int sectionOffset = 0;
+    for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
+      if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
+        int sectionLength = (int) section.getLength();
+        ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
+            sectionLength);
+        StreamName name = new StreamName(section.getColumn(),
+            section.getKind());
+        streams.put(name,
+            InStream.create(name.toString(), new ByteBuffer[]{sectionBuffer},
+                new long[]{0}, sectionLength, codec, bufferSize));
+        sectionOffset += sectionLength;
+      }
+    }
+  }
+
+  /**
+   * The secionts of stripe that we need to read.
+   */
+  static class DiskRange {
+    /** the first address we need to read. */
+    long offset;
+    /** the first address afterwards. */
+    long end;
+
+    DiskRange(long offset, long end) {
+      this.offset = offset;
+      this.end = end;
+      if (end < offset) {
+        throw new IllegalArgumentException("invalid range " + this);
+      }
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (other == null || other.getClass() != getClass()) {
+        return false;
+      }
+      DiskRange otherR = (DiskRange) other;
+      return otherR.offset == offset && otherR.end == end;
+    }
+
+    @Override
+    public String toString() {
+      return "range start: " + offset + " end: " + end;
+    }
+  }
+
+  private static final int BYTE_STREAM_POSITIONS = 1;
+  private static final int RUN_LENGTH_BYTE_POSITIONS =
+      BYTE_STREAM_POSITIONS + 1;
+  private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
+  private static final int RUN_LENGTH_INT_POSITIONS =
+    BYTE_STREAM_POSITIONS + 1;
+
+  /**
+   * Get the offset in the index positions for the column that the given
+   * stream starts.
+   * @param encoding the encoding of the column
+   * @param type the type of the column
+   * @param stream the kind of the stream
+   * @param isCompressed is the file compressed
+   * @param hasNulls does the column have a PRESENT stream?
+   * @return the number of positions that will be used for that stream
+   */
+  static int getIndexPosition(OrcProto.ColumnEncoding.Kind encoding,
+                              OrcProto.Type.Kind type,
+                              OrcProto.Stream.Kind stream,
+                              boolean isCompressed,
+                              boolean hasNulls) {
+    if (stream == OrcProto.Stream.Kind.PRESENT) {
+      return 0;
+    }
+    int compressionValue = isCompressed ? 1 : 0;
+    int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
+    switch (type) {
+      case BOOLEAN:
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case STRUCT:
+      case MAP:
+      case LIST:
+      case UNION:
+        return base;
+      case STRING:
+        if (encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+            encoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
+          return base;
+        } else {
+          if (stream == OrcProto.Stream.Kind.DATA) {
+            return base;
+          } else {
+            return base + BYTE_STREAM_POSITIONS + compressionValue;
+          }
+        }
+      case BINARY:
+        if (stream == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case DECIMAL:
+        if (stream == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + BYTE_STREAM_POSITIONS + compressionValue;
+      case TIMESTAMP:
+        if (stream == OrcProto.Stream.Kind.DATA) {
+          return base;
+        }
+        return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
+      default:
+        throw new IllegalArgumentException("Unknown type " + type);
+    }
+  }
+
+  // for uncompressed streams, what is the most overlap with the following set
+  // of rows (long vint literal group).
+  static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
+
+  /**
+   * Is this stream part of a dictionary?
+   * @return is this part of a dictionary?
+   */
+  static boolean isDictionary(OrcProto.Stream.Kind kind,
+                              OrcProto.ColumnEncoding encoding) {
+    OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
+    return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
+      (kind == OrcProto.Stream.Kind.LENGTH &&
+       (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
+        encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
+  }
+
+  /**
+   * Plan the ranges of the file that we need to read given the list of
+   * columns and row groups.
+   * @param streamList the list of streams avaiable
+   * @param indexes the indexes that have been loaded
+   * @param includedColumns which columns are needed
+   * @param includedRowGroups which row groups are needed
+   * @param isCompressed does the file have generic compression
+   * @param encodings the encodings for each column
+   * @param types the types of the columns
+   * @param compressionSize the compression block size
+   * @return the list of disk ranges that will be loaded
+   */
+  static List<DiskRange> planReadPartialDataStreams
+      (List<OrcProto.Stream> streamList,
+       OrcProto.RowIndex[] indexes,
+       boolean[] includedColumns,
+       boolean[] includedRowGroups,
+       boolean isCompressed,
+       List<OrcProto.ColumnEncoding> encodings,
+       List<OrcProto.Type> types,
+       int compressionSize) {
+    List<DiskRange> result = new ArrayList<DiskRange>();
+    long offset = 0;
+    // figure out which columns have a present stream
+    boolean[] hasNull = new boolean[types.size()];
+    for(OrcProto.Stream stream: streamList) {
+      if (stream.getKind() == OrcProto.Stream.Kind.PRESENT) {
+        hasNull[stream.getColumn()] = true;
+      }
+    }
+    for(OrcProto.Stream stream: streamList) {
+      long length = stream.getLength();
+      int column = stream.getColumn();
+      OrcProto.Stream.Kind streamKind = stream.getKind();
+      if (StreamName.getArea(streamKind) == StreamName.Area.DATA &&
+          includedColumns[column]) {
+        // if we aren't filtering or it is a dictionary, load it.
+        if (includedRowGroups == null ||
+            isDictionary(streamKind, encodings.get(column))) {
+          result.add(new DiskRange(offset, offset + length));
+        } else {
+          for(int group=0; group < includedRowGroups.length; ++group) {
+            if (includedRowGroups[group]) {
+              int posn = getIndexPosition(encodings.get(column).getKind(),
+                  types.get(column).getKind(), stream.getKind(), isCompressed,
+                  hasNull[column]);
+              long start = indexes[column].getEntry(group).getPositions(posn);
+              // figure out the worst case last location
+              long end = (group == includedRowGroups.length - 1) ?
+                  length : Math.min(length,
+                                    indexes[column].getEntry(group + 1)
+                                        .getPositions(posn)
+                                        + (isCompressed ?
+                                            (OutStream.HEADER_SIZE
+                                              + compressionSize) :
+                                            WORST_UNCOMPRESSED_SLOP));
+              result.add(new DiskRange(offset + start, offset + end));
+            }
+          }
+        }
+      }
+      offset += length;
+    }
+    return result;
+  }
+
+  /**
+   * Update the disk ranges to collapse adjacent or overlapping ranges. It
+   * assumes that the ranges are sorted.
+   * @param ranges the list of disk ranges to merge
+   */
+  static void mergeDiskRanges(List<DiskRange> ranges) {
+    DiskRange prev = null;
+    for(int i=0; i < ranges.size(); ++i) {
+      DiskRange current = ranges.get(i);
+      if (prev != null && overlap(prev.offset, prev.end,
+          current.offset, current.end)) {
+        prev.offset = Math.min(prev.offset, current.offset);
+        prev.end = Math.max(prev.end, current.end);
+        ranges.remove(i);
+        i -= 1;
+      } else {
+        prev = current;
+      }
+    }
+  }
+
+  /**
+   * Read the list of ranges from the file.
+   * @param file the file to read
+   * @param base the base of the stripe
+   * @param ranges the disk ranges within the stripe to read
+   * @return the bytes read for each disk range, which is the same length as
+   *    ranges
+   * @throws IOException
+   */
+  static byte[][] readDiskRanges(FSDataInputStream file,
+                                 long base,
+                                 List<DiskRange> ranges) throws IOException {
+    byte[][] result = new byte[ranges.size()][];
+    int i = 0;
+    for(DiskRange range: ranges) {
+      int len = (int) (range.end - range.offset);
+      result[i] = new byte[len];
+      file.seek(base + range.offset);
+      file.readFully(result[i]);
+      i += 1;
+    }
+    return result;
+  }
+
+  /**
+   * Does region A overlap region B? The end points are inclusive on both sides.
+   * @param leftA A's left point
+   * @param rightA A's right point
+   * @param leftB B's left point
+   * @param rightB B's right point
+   * @return Does region A overlap region B?
+   */
+  static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
+    if (leftA <= leftB) {
+      return rightA >= leftB;
+    }
+    return rightB >= leftA;
+  }
+
+  /**
+   * Build a string representation of a list of disk ranges.
+   * @param ranges ranges to stringify
+   * @return the resulting string
+   */
+  static String stringifyDiskRanges(List<DiskRange> ranges) {
+    StringBuilder buffer = new StringBuilder();
+    buffer.append("[");
+    for(int i=0; i < ranges.size(); ++i) {
+      if (i != 0) {
+        buffer.append(", ");
+      }
+      buffer.append(ranges.get(i).toString());
+    }
+    buffer.append("]");
+    return buffer.toString();
+  }
+
+  static void createStreams(List<OrcProto.Stream> streamDescriptions,
+                            List<DiskRange> ranges,
+                            byte[][] bytes,
+                            boolean[] includeColumn,
+                            CompressionCodec codec,
+                            int bufferSize,
+                            Map<StreamName, InStream> streams
+                           ) throws IOException {
+    long offset = 0;
+    for(OrcProto.Stream streamDesc: streamDescriptions) {
+      int column = streamDesc.getColumn();
+      if (includeColumn[column] &&
+          StreamName.getArea(streamDesc.getKind()) == StreamName.Area.DATA) {
+        long length = streamDesc.getLength();
+        int first = -1;
+        int last = -2;
+        for(int i=0; i < bytes.length; ++i) {
+          DiskRange range = ranges.get(i);
+          if (overlap(offset, offset+length, range.offset, range.end)) {
+            if (first == -1) {
+              first = i;
+            }
+            last = i;
+          }
+        }
+        ByteBuffer[] buffers = new ByteBuffer[last - first + 1];
+        long[] offsets = new long[last - first + 1];
+        for(int i=0; i < buffers.length; ++i) {
+          DiskRange range = ranges.get(i + first);
+          long start = Math.max(range.offset, offset);
+          long end = Math.min(range.end, offset+length);
+          buffers[i] = ByteBuffer.wrap(bytes[first + i],
+              Math.max(0, (int) (offset - range.offset)), (int) (end - start));
+          offsets[i] = Math.max(0, range.offset - offset);
+        }
+        StreamName name = new StreamName(column, streamDesc.getKind());
+        streams.put(name, InStream.create(name.toString(), buffers, offsets,
+            length, codec, bufferSize));
+      }
+      offset += streamDesc.getLength();
+    }
+  }
+
+  private void readPartialDataStreams(StripeInformation stripe
+                                      ) throws IOException {
+    List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
+    List<DiskRange> chunks =
+        planReadPartialDataStreams(streamList,
+            indexes, included, includedRowGroups, codec != null,
+            stripeFooter.getColumnsList(), types, bufferSize);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("chunks = " + stringifyDiskRanges(chunks));
+    }
+    mergeDiskRanges(chunks);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("merge = " + stringifyDiskRanges(chunks));
+    }
+    byte[][] bytes = readDiskRanges(file, stripe.getOffset(), chunks);
+    createStreams(streamList, chunks, bytes, included, codec, bufferSize,
+        streams);
   }
 
   @Override
   public boolean hasNext() throws IOException {
-    return rowInStripe < rowCountInStripe || currentStripe < stripes.size() - 1;
+    return rowInStripe < rowCountInStripe;
   }
 
-  @Override
-  public Object next(Object previous) throws IOException {
-    if (rowInStripe >= rowCountInStripe) {
+  /**
+   * Read the next stripe until we find a row that we don't skip.
+   * @throws IOException
+   */
+  private void advanceStripe() throws IOException {
+    rowInStripe = rowCountInStripe;
+    while (rowInStripe >= rowCountInStripe &&
+        currentStripe < stripes.size() - 1) {
       currentStripe += 1;
       readStripe();
     }
+  }
+
+  /**
+   * Skip over rows that we aren't selecting, so that the next row is
+   * one that we will read.
+   * @param nextRow the row we want to go to
+   * @throws IOException
+   */
+  private void advanceToNextRow(long nextRow) throws IOException {
+    long nextRowInStripe = nextRow - rowBaseInStripe;
+    // check for row skipping
+    if (rowIndexStride != 0 &&
+        includedRowGroups != null &&
+        nextRowInStripe < rowCountInStripe) {
+      int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+      if (!includedRowGroups[rowGroup]) {
+        while (rowGroup < includedRowGroups.length &&
+               !includedRowGroups[rowGroup]) {
+          rowGroup += 1;
+        }
+        // if we are off the end of the stripe, just move stripes
+        if (rowGroup >= includedRowGroups.length) {
+          advanceStripe();
+          return;
+        }
+        nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
+      }
+    }
+    if (nextRowInStripe < rowCountInStripe) {
+      if (nextRowInStripe != rowInStripe) {
+        if (rowIndexStride != 0) {
+          int rowGroup = (int) (nextRowInStripe / rowIndexStride);
+          seekToRowEntry(rowGroup);
+          reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
+        } else {
+          reader.skipRows(nextRowInStripe - rowInStripe);
+        }
+        rowInStripe = nextRowInStripe;
+      }
+    } else {
+      advanceStripe();
+    }
+  }
+
+  @Override
+  public Object next(Object previous) throws IOException {
+    Object result = reader.next(previous);
+    // find the next row
     rowInStripe += 1;
-    return reader.next(previous);
+    advanceToNextRow(rowInStripe + rowBaseInStripe);
+    return result;
   }
 
   @Override
@@ -1548,14 +2182,6 @@ class RecordReaderImpl implements Record
   }
 
   private int findStripe(long rowNumber) {
-    if (rowNumber < 0) {
-      throw new IllegalArgumentException("Seek to a negative row number " +
-          rowNumber);
-    } else if (rowNumber < firstRow) {
-      throw new IllegalArgumentException("Seek before reader range " +
-          rowNumber);
-    }
-    rowNumber -= firstRow;
     for(int i=0; i < stripes.size(); i++) {
       StripeInformation stripe = stripes.get(i);
       if (stripe.getNumberOfRows() > rowNumber) {
@@ -1576,7 +2202,8 @@ class RecordReaderImpl implements Record
           file.seek(offset);
           file.readFully(buffer);
           indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
-              ByteBuffer.wrap(buffer), codec, bufferSize));
+              new ByteBuffer[] {ByteBuffer.wrap(buffer)}, new long[]{0},
+              stream.getLength(), codec, bufferSize));
         }
       }
       offset += stream.getLength();
@@ -1596,19 +2223,25 @@ class RecordReaderImpl implements Record
 
   @Override
   public void seekToRow(long rowNumber) throws IOException {
+    if (rowNumber < 0) {
+      throw new IllegalArgumentException("Seek to a negative row number " +
+                                         rowNumber);
+    } else if (rowNumber < firstRow) {
+      throw new IllegalArgumentException("Seek before reader range " +
+                                         rowNumber);
+    }
+    // convert to our internal form (rows from the beginning of slice)
+    rowNumber -= firstRow;
+
+    // move to the right stripe
     int rightStripe = findStripe(rowNumber);
     if (rightStripe != currentStripe) {
       currentStripe = rightStripe;
       readStripe();
     }
     readRowIndex();
-    rowInStripe = rowNumber - rowBaseInStripe - firstRow;
-    if (rowIndexStride != 0) {
-      long entry = rowInStripe / rowIndexStride;
-      seekToRowEntry((int) entry);
-      reader.skipRows(rowInStripe - entry * rowIndexStride);
-    } else {
-      reader.skipRows(rowInStripe);
-    }
+
+    // if we aren't to the right row yet, advanance in the stripe.
+    advanceToNextRow(rowNumber);
   }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java Thu Aug 15 19:05:35 2013
@@ -57,7 +57,7 @@ class RunLengthByteReader {
       while (bytes < numLiterals) {
         int result = input.read(literals, bytes, numLiterals - bytes);
         if (result == -1) {
-          throw new EOFException("Reading RLE byte literal got EOF");
+          throw new EOFException("Reading RLE byte literal got EOF in " + this);
         }
         bytes += result;
       }
@@ -108,4 +108,10 @@ class RunLengthByteReader {
       items -= consume;
     }
   }
+
+  @Override
+  public String toString() {
+    return "byte rle " + (repeat ? "repeat" : "literal") + " used: " +
+        used + "/" + numLiterals + " from " + input;
+  }
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgument.java Thu Aug 15 19:05:35 2013
@@ -134,6 +134,21 @@ public interface SearchArgument {
           throw new IllegalArgumentException("Unknown value: " + this);
       }
     }
+
+    /**
+     * Does the RecordReader need to include this set of records?
+     * @return true unless none of the rows qualify
+     */
+    public boolean isNotNeeded() {
+      switch (this) {
+        case NO:
+        case NULL:
+        case NO_NULL:
+          return false;
+        default:
+          return true;
+      }
+    }
   }
 
   /**

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java Thu Aug 15 19:05:35 2013
@@ -36,16 +36,12 @@ import org.apache.hadoop.hive.ql.udf.gen
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNotNull;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPNull;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
@@ -57,7 +53,7 @@ import java.util.Map;
  */
 final class SearchArgumentImpl implements SearchArgument {
 
-  private static final class PredicateLeafImpl implements PredicateLeaf {
+  static final class PredicateLeafImpl implements PredicateLeaf {
     private final Operator operator;
     private final Type type;
     private final String columnName;
@@ -270,7 +266,6 @@ final class SearchArgumentImpl implement
   }
 
   static class ExpressionBuilder {
-    private ExpressionTree expression = null;
     private final List<PredicateLeaf> leaves = new ArrayList<PredicateLeaf>();
 
     /**
@@ -321,11 +316,11 @@ final class SearchArgumentImpl implement
     private static Object boxLiteral(ExprNodeConstantDesc lit) {
       switch (getType(lit)) {
         case INTEGER:
-          return new LongWritable(((Number) lit.getValue()).longValue());
+          return ((Number) lit.getValue()).longValue();
         case STRING:
-          return new Text(lit.getValue().toString());
+          return lit.getValue().toString();
         case FLOAT:
-          return new DoubleWritable(((Number) lit.getValue()).doubleValue());
+          return ((Number) lit.getValue()).doubleValue();
         default:
           throw new IllegalArgumentException("Unknown literal " + getType(lit));
       }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Thu Aug 15 19:05:35 2013
@@ -309,6 +309,7 @@ public final class ColumnPrunerProcFacto
       cppCtx.getPrunedColLists().put((Operator<? extends OperatorDesc>) nd,
           cols);
       ArrayList<Integer> needed_columns = new ArrayList<Integer>();
+      List<String> neededColumnNames = new ArrayList<String>();
       RowResolver inputRR = cppCtx.getOpToParseCtxMap().get(scanOp).getRowResolver();
       TableScanDesc desc = scanOp.getConf();
       List<VirtualColumn> virtualCols = desc.getVirtualCols();
@@ -339,12 +340,15 @@ public final class ColumnPrunerProcFacto
         }
         int position = inputRR.getPosition(cols.get(i));
         if (position >=0) {
+          // get the needed columns by id and name
           needed_columns.add(position);
+          neededColumnNames.add(cols.get(i));
         }
       }
 
       desc.setVirtualCols(newVirtualCols);
       scanOp.setNeededColumnIDs(needed_columns);
+      scanOp.setNeededColumns(neededColumnNames);
       return null;
     }
   }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitFieldReader.java Thu Aug 15 19:05:35 2013
@@ -47,7 +47,8 @@ public class TestBitFieldReader {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    BitFieldReader in = new BitFieldReader(InStream.create("test", inBuf,
+    BitFieldReader in = new BitFieldReader(InStream.create("test",
+        new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
         codec, 500), 1);
     for(int i=0; i < COUNT; ++i) {
       int x = in.next();
@@ -96,7 +97,8 @@ public class TestBitFieldReader {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    BitFieldReader in = new BitFieldReader(InStream.create("test", inBuf,
+    BitFieldReader in = new BitFieldReader(InStream.create("test",
+        new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
         null, 500), 3);
     for(int i=0; i < COUNT; ++i) {
       int x = in.next();
@@ -126,7 +128,8 @@ public class TestBitFieldReader {
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
     BitFieldReader in = new BitFieldReader(InStream.create
-        ("test", inBuf, null, 100), 1);
+        ("test", new ByteBuffer[]{inBuf}, new long[]{0}, inBuf.remaining(),
+            null, 100), 1);
     for(int i=0; i < COUNT; i += 5) {
       int x = (int) in.next();
       if (i < COUNT/2) {

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestBitPack.java Thu Aug 15 19:05:35 2013
@@ -84,7 +84,11 @@ public class TestBitPack {
     inBuf.flip();
     long[] buff = new long[SIZE];
     SerializationUtils.readInts(buff, 0, SIZE, fixedWidth,
-        InStream.create("test", inBuf, null, SIZE));
+                                InStream.create("test",
+                                                new ByteBuffer[]{inBuf},
+                                                new long[]{0},
+                                                inBuf.remaining(),
+                                                null, SIZE));
     for(int i = 0; i < SIZE; i++) {
       buff[i] = SerializationUtils.zigzagDecode(buff[i]);
     }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInStream.java Thu Aug 15 19:05:35 2013
@@ -20,6 +20,9 @@ package org.apache.hadoop.hive.ql.io.orc
 
 import org.junit.Test;
 
+import java.io.DataInputStream;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -40,7 +43,8 @@ public class TestInStream {
     }
   }
 
-  static class PositionCollector implements PositionProvider, PositionRecorder {
+  static class PositionCollector
+      implements PositionProvider, PositionRecorder {
     private List<Long> positions = new ArrayList<Long>();
     private int index = 0;
 
@@ -53,6 +57,22 @@ public class TestInStream {
     public void addPosition(long offset) {
       positions.add(offset);
     }
+
+    public void reset() {
+      index = 0;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder("position: ");
+      for(int i=0; i < positions.size(); ++i) {
+        if (i != 0) {
+          builder.append(", ");
+        }
+        builder.append(positions.get(i));
+      }
+      return builder.toString();
+    }
   }
 
   @Test
@@ -73,9 +93,11 @@ public class TestInStream {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    InStream in = InStream.create("test", inBuf, null, 100);
-    assertEquals("uncompressed stream test base: 0 offset: 0 limit: 1024",
-        in.toString());
+    InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
+        new long[]{0}, inBuf.remaining(), null, 100);
+    assertEquals("uncompressed stream test position: 0 length: 1024" +
+                 " range: 0 offset: 0 limit: 0",
+                 in.toString());
     for(int i=0; i < 1024; ++i) {
       int x = in.read();
       assertEquals(i & 0xff, x);
@@ -103,9 +125,11 @@ public class TestInStream {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    InStream in = InStream.create("test", inBuf, codec, 300);
-    assertEquals("compressed stream test base: 0 offset: 0 limit: 961",
-        in.toString());
+    InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
+        new long[]{0}, inBuf.remaining(), codec, 300);
+    assertEquals("compressed stream test position: 0 length: 961 range: 0" +
+                 " offset: 0 limit: 0 range 0 = 0 to 961",
+                 in.toString());
     for(int i=0; i < 1024; ++i) {
       int x = in.read();
       assertEquals(i & 0xff, x);
@@ -134,7 +158,8 @@ public class TestInStream {
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    InStream in = InStream.create("test", inBuf, codec, 100);
+    InStream in = InStream.create("test", new ByteBuffer[]{inBuf},
+        new long[]{0}, inBuf.remaining(), codec, 100);
     byte[] contents = new byte[1024];
     try {
       in.read(contents);
@@ -148,7 +173,8 @@ public class TestInStream {
     inBuf.put((byte) 32);
     inBuf.put((byte) 0);
     inBuf.flip();
-    in = InStream.create("test2", inBuf, codec, 300);
+    in = InStream.create("test2", new ByteBuffer[]{inBuf}, new long[]{0},
+        inBuf.remaining(), codec, 300);
     try {
       in.read();
       fail();
@@ -156,4 +182,132 @@ public class TestInStream {
       // EXPECTED
     }
   }
+
+  @Test
+  public void testDisjointBuffers() throws Exception {
+    OutputCollector collect = new OutputCollector();
+    CompressionCodec codec = new ZlibCodec();
+    OutStream out = new OutStream("test", 400, codec, collect);
+    PositionCollector[] positions = new PositionCollector[1024];
+    DataOutput stream = new DataOutputStream(out);
+    for(int i=0; i < 1024; ++i) {
+      positions[i] = new PositionCollector();
+      out.getPosition(positions[i]);
+      stream.writeInt(i);
+    }
+    out.flush();
+    assertEquals("test", out.toString());
+    assertEquals(1674, collect.buffer.size());
+    ByteBuffer[] inBuf = new ByteBuffer[3];
+    inBuf[0] = ByteBuffer.allocate(500);
+    inBuf[1] = ByteBuffer.allocate(1200);
+    inBuf[2] = ByteBuffer.allocate(500);
+    collect.buffer.setByteBuffer(inBuf[0], 0, 483);
+    collect.buffer.setByteBuffer(inBuf[1], 483, 1625 - 483);
+    collect.buffer.setByteBuffer(inBuf[2], 1625, 1674 - 1625);
+
+    for(int i=0; i < inBuf.length; ++i) {
+      inBuf[i].flip();
+    }
+    InStream in = InStream.create("test", inBuf,
+        new long[]{0,483, 1625}, 1674, codec, 400);
+    assertEquals("compressed stream test position: 0 length: 1674 range: 0" +
+                 " offset: 0 limit: 0 range 0 = 0 to 483;" +
+                 "  range 1 = 483 to 1142;  range 2 = 1625 to 49",
+                 in.toString());
+    DataInputStream inStream = new DataInputStream(in);
+    for(int i=0; i < 1024; ++i) {
+      int x = inStream.readInt();
+      assertEquals(i, x);
+    }
+    assertEquals(0, in.available());
+    for(int i=1023; i >= 0; --i) {
+      in.seek(positions[i]);
+      assertEquals(i, inStream.readInt());
+    }
+
+    in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]},
+        new long[]{483, 1625}, 1674, codec, 400);
+    inStream = new DataInputStream(in);
+    positions[303].reset();
+    in.seek(positions[303]);
+    for(int i=303; i < 1024; ++i) {
+      assertEquals(i, inStream.readInt());
+    }
+
+    in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]},
+        new long[]{0, 1625}, 1674, codec, 400);
+    inStream = new DataInputStream(in);
+    positions[1001].reset();
+    for(int i=0; i < 300; ++i) {
+      assertEquals(i, inStream.readInt());
+    }
+    in.seek(positions[1001]);
+    for(int i=1001; i < 1024; ++i) {
+      assertEquals(i, inStream.readInt());
+    }
+  }
+
+  @Test
+  public void testUncompressedDisjointBuffers() throws Exception {
+    OutputCollector collect = new OutputCollector();
+    OutStream out = new OutStream("test", 400, null, collect);
+    PositionCollector[] positions = new PositionCollector[1024];
+    DataOutput stream = new DataOutputStream(out);
+    for(int i=0; i < 1024; ++i) {
+      positions[i] = new PositionCollector();
+      out.getPosition(positions[i]);
+      stream.writeInt(i);
+    }
+    out.flush();
+    assertEquals("test", out.toString());
+    assertEquals(4096, collect.buffer.size());
+    ByteBuffer[] inBuf = new ByteBuffer[3];
+    inBuf[0] = ByteBuffer.allocate(1100);
+    inBuf[1] = ByteBuffer.allocate(2200);
+    inBuf[2] = ByteBuffer.allocate(1100);
+    collect.buffer.setByteBuffer(inBuf[0], 0, 1024);
+    collect.buffer.setByteBuffer(inBuf[1], 1024, 2048);
+    collect.buffer.setByteBuffer(inBuf[2], 3072, 1024);
+
+    for(int i=0; i < inBuf.length; ++i) {
+      inBuf[i].flip();
+    }
+    InStream in = InStream.create("test", inBuf,
+        new long[]{0, 1024, 3072}, 4096, null, 400);
+    assertEquals("uncompressed stream test position: 0 length: 4096" +
+                 " range: 0 offset: 0 limit: 0",
+                 in.toString());
+    DataInputStream inStream = new DataInputStream(in);
+    for(int i=0; i < 1024; ++i) {
+      int x = inStream.readInt();
+      assertEquals(i, x);
+    }
+    assertEquals(0, in.available());
+    for(int i=1023; i >= 0; --i) {
+      in.seek(positions[i]);
+      assertEquals(i, inStream.readInt());
+    }
+
+    in = InStream.create("test", new ByteBuffer[]{inBuf[1], inBuf[2]},
+        new long[]{1024, 3072}, 4096, null, 400);
+    inStream = new DataInputStream(in);
+    positions[256].reset();
+    in.seek(positions[256]);
+    for(int i=256; i < 1024; ++i) {
+      assertEquals(i, inStream.readInt());
+    }
+
+    in = InStream.create("test", new ByteBuffer[]{inBuf[0], inBuf[2]},
+        new long[]{0, 3072}, 4096, null, 400);
+    inStream = new DataInputStream(in);
+    positions[768].reset();
+    for(int i=0; i < 256; ++i) {
+      assertEquals(i, inStream.readInt());
+    }
+    in.seek(positions[768]);
+    for(int i=768; i < 1024; ++i) {
+      assertEquals(i, inStream.readInt());
+    }
+  }
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java?rev=1514438&r1=1514437&r2=1514438&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestIntegerCompressionReader.java Thu Aug 15 19:05:35 2013
@@ -53,8 +53,11 @@ public class TestIntegerCompressionReade
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
-        ("test", inBuf, codec, 1000), true);
+    RunLengthIntegerReaderV2 in =
+      new RunLengthIntegerReaderV2(InStream.create
+                                   ("test", new ByteBuffer[]{inBuf},
+                                    new long[]{0}, inBuf.remaining(),
+                                    codec, 1000), true);
     for(int i=0; i < 2048; ++i) {
       int x = (int) in.next();
       if (i < 1024) {
@@ -104,8 +107,12 @@ public class TestIntegerCompressionReade
     ByteBuffer inBuf = ByteBuffer.allocate(collect.buffer.size());
     collect.buffer.setByteBuffer(inBuf, 0, collect.buffer.size());
     inBuf.flip();
-    RunLengthIntegerReaderV2 in = new RunLengthIntegerReaderV2(InStream.create
-        ("test", inBuf, null, 100), true);
+    RunLengthIntegerReaderV2 in =
+      new RunLengthIntegerReaderV2(InStream.create("test",
+                                                   new ByteBuffer[]{inBuf},
+                                                   new long[]{0},
+                                                   inBuf.remaining(),
+                                                   null, 100), true);
     for(int i=0; i < 2048; i += 10) {
       int x = (int) in.next();
       if (i < 1024) {