You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 03:21:29 UTC

svn commit: r1522098 [17/30] - in /hive/branches/vectorization: ./ beeline/src/test/org/apache/hive/beeline/src/test/ bin/ bin/ext/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/a...

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java Thu Sep 12 01:21:10 2013
@@ -18,30 +18,46 @@
 
 package org.apache.hadoop.hive.ql.exec.persistence;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.AbstractList;
 import java.util.ArrayList;
+import java.util.ConcurrentModificationException;
 import java.util.List;
 
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-public class MapJoinRowContainer<Row> extends AbstractRowContainer<Row> {
-
-  private List<Row> list;
-
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.io.Writable;
+
+@SuppressWarnings("deprecation")
+public class MapJoinRowContainer extends AbstractRowContainer<List<Object>> {
+  private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+  
+  private final List<List<Object>> list;
   private int index;
+  private byte aliasFilter = (byte) 0xff;
 
   public MapJoinRowContainer() {
     index = 0;
-    list = new ArrayList<Row>(1);
-  }
+    list = new ArrayList<List<Object>>(1);
+  } 
 
   @Override
-  public void add(Row t) throws HiveException {
+  public void add(List<Object> t) {
     list.add(t);
   }
 
+  public void add(Object[] t) {
+    add(toList(t));
+  }
 
   @Override
-  public Row first() throws HiveException {
+  public List<Object> first() {
     index = 0;
     if (index < list.size()) {
       return list.get(index);
@@ -50,13 +66,12 @@ public class MapJoinRowContainer<Row> ex
   }
 
   @Override
-  public Row next() throws HiveException {
+  public List<Object> next() {
     index++;
     if (index < list.size()) {
       return list.get(index);
     }
     return null;
-
   }
 
   /**
@@ -73,28 +88,88 @@ public class MapJoinRowContainer<Row> ex
    * Remove all elements in the RowContainer.
    */
   @Override
-  public void clear() throws HiveException {
+  public void clear() {
     list.clear();
     index = 0;
   }
-
-  public List<Row> getList() {
-    return list;
+  
+  public byte getAliasFilter() {
+    return aliasFilter;
+  }
+  
+  public MapJoinRowContainer copy() {
+    MapJoinRowContainer result = new MapJoinRowContainer();
+    for(List<Object> item : list) {
+      result.add(item);
+    }
+    return result;
   }
-
-  public void setList(List<Row> list) {
-    this.list = list;
+  
+  @SuppressWarnings({"unchecked"})
+  public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container) 
+  throws IOException, SerDeException {
+    clear();
+    SerDe serde = context.getSerDe();
+    long numRows = in.readLong();
+    for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
+      container.readFields(in);      
+      List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container),
+          serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+      if(value == null) {
+        add(toList(EMPTY_OBJECT_ARRAY));
+      } else {
+        Object[] valuesArray = value.toArray();
+        if (context.hasFilterTag()) {
+          aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get();
+        }
+        add(toList(valuesArray));
+      }
+    }
   }
+  
+  public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out) 
+  throws IOException, SerDeException {
+    SerDe serde = context.getSerDe();
+    ObjectInspector valueObjectInspector = context.getStandardOI();
+    long numRows = size();
+    long numRowsWritten = 0L;
+    out.writeLong(numRows);
+    for (List<Object> row = first(); row != null; row = next()) {
+      serde.serialize(row.toArray(), valueObjectInspector).write(out);
+      ++numRowsWritten;      
+    }
+    if(numRows != size()) {
+      throw new ConcurrentModificationException("Values was modifified while persisting");
+    }
+    if(numRowsWritten != numRows) {
+      throw new IllegalStateException("Expected to write " + numRows + " but wrote " + numRowsWritten);
+    }
+  }
+  
+  private List<Object> toList(Object[] array) {
+    return new NoCopyingArrayList(array);
+  }
+  /**
+   * In this use case our objects will not be modified
+   * so we don't care about copying in and out.
+   */
+  private static class NoCopyingArrayList extends AbstractList<Object> {
+    private Object[] array;
+    public NoCopyingArrayList(Object[] array) {
+      this.array = array;
+    }
+    @Override
+    public Object get(int index) {
+      return array[index];
+    }
 
-  public void reset(MapJoinRowContainer<Object[]> other) throws HiveException {
-    list.clear();
-    Object[] obj;
-    for (obj = other.first(); obj != null; obj = other.next()) {
-      ArrayList<Object> ele = new ArrayList(obj.length);
-      for (int i = 0; i < obj.length; i++) {
-        ele.add(obj[i]);
-      }
-      list.add((Row) ele);
+    @Override
+    public int size() {
+      return array.length;
     }
+    
+    public Object[] toArray() {
+      return array;
+    }    
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Thu Sep 12 01:21:10 2013
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -71,18 +71,18 @@ import org.apache.hadoop.util.Reflection
  * reading.
  *
  */
-public class RowContainer<Row extends List<Object>> extends AbstractRowContainer<Row> {
+public class RowContainer<ROW extends List<Object>> extends AbstractRowContainer<ROW> {
 
   protected static Log LOG = LogFactory.getLog(RowContainer.class);
 
   // max # of rows can be put into one block
   private static final int BLOCKSIZE = 25000;
 
-  private Row[] currentWriteBlock; // the last block that add() should append to
-  private Row[] currentReadBlock; // the current block where the cursor is in
+  private ROW[] currentWriteBlock; // the last block that add() should append to
+  private ROW[] currentReadBlock; // the current block where the cursor is in
   // since currentReadBlock may assigned to currentWriteBlock, we need to store
   // original read block
-  private Row[] firstReadBlockPointer;
+  private ROW[] firstReadBlockPointer;
   private int blockSize; // number of objects in the block before it is spilled
   // to disk
   private int numFlushedBlocks; // total # of blocks
@@ -108,7 +108,7 @@ public class RowContainer<Row extends Li
   RecordWriter rw = null;
   InputFormat<WritableComparable, Writable> inputFormat = null;
   InputSplit[] inputSplits = null;
-  private Row dummyRow = null;
+  private ROW dummyRow = null;
   private final Reporter reporter;
 
   Writable val = null; // cached to use serialize data
@@ -130,7 +130,7 @@ public class RowContainer<Row extends Li
     this.addCursor = 0;
     this.numFlushedBlocks = 0;
     this.tmpFile = null;
-    this.currentWriteBlock = (Row[]) new ArrayList[blockSize];
+    this.currentWriteBlock = (ROW[]) new ArrayList[blockSize];
     this.currentReadBlock = this.currentWriteBlock;
     this.firstReadBlockPointer = currentReadBlock;
     this.serde = null;
@@ -142,7 +142,7 @@ public class RowContainer<Row extends Li
       this.reporter = reporter;
     }
   }
-  
+
   private JobConf getLocalFSJobConfClone(Configuration jc) {
     if (this.jobCloneUsingLocalFs == null) {
       this.jobCloneUsingLocalFs = new JobConf(jc);
@@ -158,13 +158,13 @@ public class RowContainer<Row extends Li
   }
 
   @Override
-  public void add(Row t) throws HiveException {
+  public void add(ROW t) throws HiveException {
     if (this.tblDesc != null) {
-      if (addCursor >= blockSize) { // spill the current block to tmp file
+      if (willSpill()) { // spill the current block to tmp file
         spillBlock(currentWriteBlock, addCursor);
         addCursor = 0;
         if (numFlushedBlocks == 1) {
-          currentWriteBlock = (Row[]) new ArrayList[blockSize];
+          currentWriteBlock = (ROW[]) new ArrayList[blockSize];
         }
       }
       currentWriteBlock[addCursor++] = t;
@@ -178,7 +178,7 @@ public class RowContainer<Row extends Li
   }
 
   @Override
-  public Row first() throws HiveException {
+  public ROW first() throws HiveException {
     if (size == 0) {
       return null;
     }
@@ -221,10 +221,10 @@ public class RowContainer<Row extends Li
           localJc, reporter);
         currentSplitPointer++;
 
-        nextBlock();
+        nextBlock(0);
       }
       // we are guaranteed that we can get data here (since 'size' is not zero)
-      Row ret = currentReadBlock[itrCursor++];
+      ROW ret = currentReadBlock[itrCursor++];
       removeKeys(ret);
       return ret;
     } catch (Exception e) {
@@ -234,7 +234,7 @@ public class RowContainer<Row extends Li
   }
 
   @Override
-  public Row next() throws HiveException {
+  public ROW next() throws HiveException {
 
     if (!firstCalled) {
       throw new RuntimeException("Call first() then call next().");
@@ -252,19 +252,16 @@ public class RowContainer<Row extends Li
       return null;
     }
 
-    Row ret;
+    ROW ret;
     if (itrCursor < this.readBlockSize) {
       ret = this.currentReadBlock[itrCursor++];
       removeKeys(ret);
       return ret;
     } else {
-      nextBlock();
+      nextBlock(0);
       if (this.readBlockSize == 0) {
         if (currentWriteBlock != null && currentReadBlock != currentWriteBlock) {
-          this.itrCursor = 0;
-          this.readBlockSize = this.addCursor;
-          this.firstReadBlockPointer = this.currentReadBlock;
-          currentReadBlock = currentWriteBlock;
+          setWriteBlockAsReadBlock();
         } else {
           return null;
         }
@@ -273,7 +270,7 @@ public class RowContainer<Row extends Li
     }
   }
 
-  private void removeKeys(Row ret) {
+  private void removeKeys(ROW ret) {
     if (this.keyObject != null && this.currentReadBlock != this.currentWriteBlock) {
       int len = this.keyObject.size();
       int rowSize = ((ArrayList) ret).size();
@@ -285,39 +282,10 @@ public class RowContainer<Row extends Li
 
   private final ArrayList<Object> row = new ArrayList<Object>(2);
 
-  private void spillBlock(Row[] block, int length) throws HiveException {
+  private void spillBlock(ROW[] block, int length) throws HiveException {
     try {
       if (tmpFile == null) {
-
-        String suffix = ".tmp";
-        if (this.keyObject != null) {
-          suffix = "." + this.keyObject.toString() + suffix;
-        }
-
-        while (true) {
-          parentFile = File.createTempFile("hive-rowcontainer", "");
-          boolean success = parentFile.delete() && parentFile.mkdir();
-          if (success) {
-            break;
-          }
-          LOG.debug("retry creating tmp row-container directory...");
-        }
-
-        tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
-        LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
-        // Delete the temp file if the JVM terminate normally through Hadoop job
-        // kill command.
-        // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
-        parentFile.deleteOnExit();
-        tmpFile.deleteOnExit();
-
-        // rFile = new RandomAccessFile(tmpFile, "rw");
-        HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
-        tempOutPath = new Path(tmpFile.toString());
-        JobConf localJc = getLocalFSJobConfClone(jc);
-        rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs,
-            hiveOutputFormat, serde.getSerializedClass(), false,
-            tblDesc.getProperties(), tempOutPath, reporter);
+        setupWriter();
       } else if (rw == null) {
         throw new HiveException("RowContainer has already been closed for writing.");
       }
@@ -329,14 +297,14 @@ public class RowContainer<Row extends Li
       if (this.keyObject != null) {
         row.set(1, this.keyObject);
         for (int i = 0; i < length; ++i) {
-          Row currentValRow = block[i];
+          ROW currentValRow = block[i];
           row.set(0, currentValRow);
           Writable outVal = serde.serialize(row, standardOI);
           rw.write(outVal);
         }
       } else {
         for (int i = 0; i < length; ++i) {
-          Row currentValRow = block[i];
+          ROW currentValRow = block[i];
           Writable outVal = serde.serialize(currentValRow, standardOI);
           rw.write(outVal);
         }
@@ -350,6 +318,9 @@ public class RowContainer<Row extends Li
     } catch (Exception e) {
       clear();
       LOG.error(e.toString(), e);
+      if ( e instanceof HiveException ) {
+        throw (HiveException) e;
+      }
       throw new HiveException(e);
     }
   }
@@ -364,7 +335,7 @@ public class RowContainer<Row extends Li
     return size;
   }
 
-  private boolean nextBlock() throws HiveException {
+  protected boolean nextBlock(int readIntoOffset) throws HiveException {
     itrCursor = 0;
     this.readBlockSize = 0;
     if (this.numFlushedBlocks == 0) {
@@ -376,13 +347,13 @@ public class RowContainer<Row extends Li
         val = serde.getSerializedClass().newInstance();
       }
       boolean nextSplit = true;
-      int i = 0;
+      int i = readIntoOffset;
 
       if (rr != null) {
         Object key = rr.createKey();
         while (i < this.currentReadBlock.length && rr.next(key, val)) {
           nextSplit = false;
-          this.currentReadBlock[i++] = (Row) ObjectInspectorUtils.copyToStandardObject(serde
+          this.currentReadBlock[i++] = (ROW) ObjectInspectorUtils.copyToStandardObject(serde
               .deserialize(val), serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
         }
       }
@@ -393,7 +364,7 @@ public class RowContainer<Row extends Li
         rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
             reporter);
         currentSplitPointer++;
-        return nextBlock();
+        return nextBlock(0);
       }
 
       this.readBlockSize = i;
@@ -504,4 +475,119 @@ public class RowContainer<Row extends Li
     this.tblDesc = tblDesc;
   }
 
+  protected int getAddCursor() {
+    return addCursor;
+  }
+
+  protected final boolean willSpill() {
+    return addCursor >= blockSize;
+  }
+
+  protected int getBlockSize() {
+    return blockSize;
+  }
+
+  protected void setupWriter() throws HiveException {
+    try {
+
+      if ( tmpFile != null ) {
+        return;
+      }
+
+      String suffix = ".tmp";
+      if (this.keyObject != null) {
+        suffix = "." + this.keyObject.toString() + suffix;
+      }
+
+      while (true) {
+        parentFile = File.createTempFile("hive-rowcontainer", "");
+        boolean success = parentFile.delete() && parentFile.mkdir();
+        if (success) {
+          break;
+        }
+        LOG.debug("retry creating tmp row-container directory...");
+      }
+
+      tmpFile = File.createTempFile("RowContainer", suffix, parentFile);
+      LOG.info("RowContainer created temp file " + tmpFile.getAbsolutePath());
+      // Delete the temp file if the JVM terminate normally through Hadoop job
+      // kill command.
+      // Caveat: it won't be deleted if JVM is killed by 'kill -9'.
+      parentFile.deleteOnExit();
+      tmpFile.deleteOnExit();
+
+      // rFile = new RandomAccessFile(tmpFile, "rw");
+      HiveOutputFormat<?, ?> hiveOutputFormat = tblDesc.getOutputFileFormatClass().newInstance();
+      tempOutPath = new Path(tmpFile.toString());
+      JobConf localJc = getLocalFSJobConfClone(jc);
+      rw = HiveFileFormatUtils.getRecordWriter(this.jobCloneUsingLocalFs,
+          hiveOutputFormat, serde.getSerializedClass(), false,
+          tblDesc.getProperties(), tempOutPath, reporter);
+    } catch (Exception e) {
+      clear();
+      LOG.error(e.toString(), e);
+      throw new HiveException(e);
+    }
+
+  }
+
+  protected RecordWriter getRecordWriter() {
+    return rw;
+  }
+
+  protected InputSplit[] getInputSplits() {
+    return inputSplits;
+  }
+
+  protected boolean endOfCurrentReadBlock() {
+    if (tblDesc == null) {
+      return false;
+    }
+    return itrCursor >= this.readBlockSize;
+  }
+
+  protected int getCurrentReadBlockSize() {
+    return readBlockSize;
+  }
+
+  protected void setWriteBlockAsReadBlock() {
+    this.itrCursor = 0;
+    this.readBlockSize = this.addCursor;
+    this.firstReadBlockPointer = this.currentReadBlock;
+    currentReadBlock = currentWriteBlock;
+  }
+
+  protected org.apache.hadoop.mapred.RecordReader setReaderAtSplit(int splitNum)
+      throws IOException {
+    JobConf localJc = getLocalFSJobConfClone(jc);
+    currentSplitPointer = splitNum;
+    if ( rr != null ) {
+      rr.close();
+    }
+    // open record reader to read next split
+    rr = inputFormat.getRecordReader(inputSplits[currentSplitPointer], jobCloneUsingLocalFs,
+        reporter);
+    currentSplitPointer++;
+    return rr;
+  }
+
+  protected ROW getReadBlockRow(int rowOffset) {
+    itrCursor = rowOffset + 1;
+    return currentReadBlock[rowOffset];
+  }
+
+  protected void resetCurrentReadBlockToFirstReadBlock() {
+    currentReadBlock = firstReadBlockPointer;
+  }
+
+  protected void resetReadBlocks() {
+    this.currentReadBlock = this.currentWriteBlock;
+    this.firstReadBlockPointer = currentReadBlock;
+  }
+
+  protected void close() throws HiveException {
+    clear();
+    currentReadBlock = firstReadBlockPointer = currentWriteBlock = null;
+  }
+
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColLikeStringScalar.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColLikeStringScalar.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColLikeStringScalar.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/FilterStringColLikeStringScalar.java Thu Sep 12 01:21:10 2013
@@ -40,11 +40,11 @@ public class FilterStringColLikeStringSc
   private Pattern compiledPattern;
   private PatternType type = PatternType.NONE;
   private String simpleStringPattern;
+  private final Text simplePattern = new Text();
 
-  private transient Text simplePattern = new Text();
   private transient ByteBuffer byteBuffer;
   private transient CharBuffer charBuffer;
-  private transient CharsetDecoder decoder;
+  private transient final CharsetDecoder decoder;
 
   // Doing characters comparison directly instead of regular expression
   // matching for simple patterns like "%abc%".

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryUtil.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryUtil.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/history/HiveHistoryUtil.java Thu Sep 12 01:21:10 2013
@@ -70,9 +70,6 @@ public class HiveHistoryUtil {
  private static final Pattern pattern = Pattern.compile(KEY + "=" + "\""
      + VALUE + "\"");
 
- // temp buffer for parsed dataa
- private static Map<String, String> parseBuffer = new HashMap<String, String>();
-
  /**
   * Parse a single line of history.
   *
@@ -81,6 +78,8 @@ public class HiveHistoryUtil {
   * @throws IOException
   */
  private static void parseLine(String line, Listener l) throws IOException {
+   Map<String, String> parseBuffer = new HashMap<String, String>();
+   
    // extract the record type
    int idx = line.indexOf(' ');
    String recType = line.substring(0, idx);
@@ -96,8 +95,5 @@ public class HiveHistoryUtil {
    }
 
    l.handle(RecordTypes.valueOf(recType), parseBuffer);
-
-   parseBuffer.clear();
  }
-
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java Thu Sep 12 01:21:10 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.parse.SplitSample;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
@@ -262,6 +263,8 @@ public class CombineHiveInputFormat<K ex
    */
   @Override
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.GET_SPLITS);
     init(job);
     Map<String, ArrayList<String>> pathToAliases = mrwork.getPathToAliases();
     Map<String, Operator<? extends OperatorDesc>> aliasToWork =
@@ -269,8 +272,11 @@ public class CombineHiveInputFormat<K ex
     CombineFileInputFormatShim combine = ShimLoader.getHadoopShims()
         .getCombineFileInputFormat();
 
+    InputSplit[] splits = null;
     if (combine == null) {
-      return super.getSplits(job, numSplits);
+      splits = super.getSplits(job, numSplits);
+      perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+      return splits;
     }
 
     if (combine.getInputPathsShim(job).length == 0) {
@@ -320,9 +326,10 @@ public class CombineHiveInputFormat<K ex
           // If path is a directory
           if (fStats.isDir()) {
             dirs.offer(path);
-          }
-          else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
-            return super.getSplits(job, numSplits);
+          } else if ((new CompressionCodecFactory(job)).getCodec(path) != null) {
+            splits = super.getSplits(job, numSplits);
+            perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+            return splits;
           }
 
           while (dirs.peek() != null) {
@@ -331,9 +338,11 @@ public class CombineHiveInputFormat<K ex
             for (int idx = 0; idx < fStatus.length; idx++) {
               if (fStatus[idx].isDir()) {
                 dirs.offer(fStatus[idx].getPath());
-              }
-              else if ((new CompressionCodecFactory(job)).getCodec(fStatus[idx].getPath()) != null) {
-                return super.getSplits(job, numSplits);
+              } else if ((new CompressionCodecFactory(job)).getCodec(
+                  fStatus[idx].getPath()) != null) {
+                splits = super.getSplits(job, numSplits);
+                perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+                return splits;
               }
             }
           }
@@ -341,7 +350,9 @@ public class CombineHiveInputFormat<K ex
       }
 
       if (inputFormat instanceof SymlinkTextInputFormat) {
-        return super.getSplits(job, numSplits);
+        splits = super.getSplits(job, numSplits);
+        perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
+        return splits;
       }
 
       Path filterPath = path;
@@ -411,10 +422,11 @@ public class CombineHiveInputFormat<K ex
     }
 
     LOG.info("number of splits " + result.size());
+    perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
     return result.toArray(new CombineHiveInputSplit[result.size()]);
   }
 
-    private void processPaths(JobConf job, CombineFileInputFormatShim combine,
+  private void processPaths(JobConf job, CombineFileInputFormatShim combine,
       List<InputSplitShim> iss, Path... path) throws IOException {
     JobConf currJob = new JobConf(job);
     FileInputFormat.setInputPaths(currJob, path);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Thu Sep 12 01:21:10 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.io.HiveIOE
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -257,7 +258,8 @@ public class HiveInputFormat<K extends W
   }
 
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
-
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.GET_SPLITS);
     init(job);
 
     Path[] dirs = FileInputFormat.getInputPaths(job);
@@ -296,7 +298,7 @@ public class HiveInputFormat<K extends W
     }
 
     LOG.info("number of splits " + result.size());
-
+    perfLogger.PerfLogEnd(LOG, PerfLogger.GET_SPLITS);
     return result.toArray(new HiveInputSplit[result.size()]);
   }
 
@@ -428,6 +430,8 @@ public class HiveInputFormat<K extends W
         } else {
           ColumnProjectionUtils.setFullyReadColumns(jobConf);
         }
+        ColumnProjectionUtils.appendReadColumnNames(jobConf,
+            tableScan.getNeededColumns());
 
         pushFilters(jobConf, tableScan);
       }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Thu Sep 12 01:21:10 2013
@@ -35,6 +35,12 @@ public class HiveKey extends BytesWritab
     hashCodeValid = false;
   }
 
+  public HiveKey(byte[] bytes, int hashcode) {
+    super(bytes);
+    myHashCode = hashcode;
+    hashCodeValid = true;
+  }
+
   protected int myHashCode;
 
   public void setHashCode(int myHashCode) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Thu Sep 12 01:21:10 2013
@@ -1601,7 +1601,7 @@ public class RCFile {
             }
           }
           /* move the last 16 bytes to the prefix area */
-          System.arraycopy(buffer, buffer.length - prefix - 1, buffer, 0, prefix);
+          System.arraycopy(buffer, buffer.length - prefix, buffer, 0, prefix);
           n = (int)Math.min(n, end - in.getPos());
         }
       } catch (ChecksumException e) { // checksum failure

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java Thu Sep 12 01:21:10 2013
@@ -41,7 +41,7 @@ class BitFieldReader {
       current = 0xff & input.next();
       bitsLeft = 8;
     } else {
-      throw new EOFException("Read past end of bit field from " + input);
+      throw new EOFException("Read past end of bit field from " + this);
     }
   }
 
@@ -111,4 +111,10 @@ class BitFieldReader {
       bitsLeft = (int) (8 - (totalBits % 8));
     }
   }
+
+  @Override
+  public String toString() {
+    return "bit reader current: " + current + " bits left: " + bitsLeft +
+        " bit size: " + bitSize + " from " + input;
+  }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java Thu Sep 12 01:21:10 2013
@@ -25,97 +25,149 @@ abstract class InStream extends InputStr
 
   private static class UncompressedStream extends InStream {
     private final String name;
-    private byte[] array;
-    private int offset;
-    private final int base;
-    private final int limit;
+    private final ByteBuffer[] bytes;
+    private final long[] offsets;
+    private final long length;
+    private long currentOffset;
+    private byte[] range;
+    private int currentRange;
+    private int offsetInRange;
+    private int limitInRange;
 
-    public UncompressedStream(String name, ByteBuffer input) {
+    public UncompressedStream(String name, ByteBuffer[] input, long[] offsets,
+                              long length) {
       this.name = name;
-      this.array = input.array();
-      base = input.arrayOffset() + input.position();
-      offset = base;
-      limit = input.arrayOffset() + input.limit();
+      this.bytes = input;
+      this.offsets = offsets;
+      this.length = length;
+      currentRange = 0;
+      offsetInRange = 0;
+      limitInRange = 0;
+      currentOffset = 0;
     }
 
     @Override
     public int read() {
-      if (offset == limit) {
-        return -1;
+      if (offsetInRange >= limitInRange) {
+        if (currentOffset == length) {
+          return -1;
+        }
+        seek(currentOffset);
       }
-      return 0xff & array[offset++];
+      currentOffset += 1;
+      return 0xff & range[offsetInRange++];
     }
 
     @Override
     public int read(byte[] data, int offset, int length) {
-      if (this.offset == limit) {
-        return -1;
+      if (offsetInRange >= limitInRange) {
+        if (currentOffset == this.length) {
+          return -1;
+        }
+        seek(currentOffset);
       }
-      int actualLength = Math.min(length, limit - this.offset);
-      System.arraycopy(array, this.offset, data, offset, actualLength);
-      this.offset += actualLength;
+      int actualLength = Math.min(length, limitInRange - offsetInRange);
+      System.arraycopy(range, offsetInRange, data, offset, actualLength);
+      offsetInRange += actualLength;
+      currentOffset += actualLength;
       return actualLength;
     }
 
     @Override
     public int available() {
-      return limit - offset;
+      if (offsetInRange < limitInRange) {
+        return limitInRange - offsetInRange;
+      }
+      return (int) (length - currentOffset);
     }
 
     @Override
     public void close() {
-      array = null;
-      offset = 0;
+      currentRange = bytes.length;
+      currentOffset = length;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
-      offset = base + (int) index.getNext();
+      seek(index.getNext());
+    }
+
+    public void seek(long desired) {
+      for(int i = 0; i < bytes.length; ++i) {
+        if (offsets[i] <= desired &&
+            desired - offsets[i] < bytes[i].remaining()) {
+          currentOffset = desired;
+          currentRange = i;
+          this.range = bytes[i].array();
+          offsetInRange = bytes[i].arrayOffset() + bytes[i].position();
+          limitInRange = bytes[i].arrayOffset() + bytes[i].limit();
+          offsetInRange += desired - offsets[i];
+          return;
+        }
+      }
+      throw new IllegalArgumentException("Seek in " + name + " to " +
+        desired + " is outside of the data");
     }
 
     @Override
     public String toString() {
-      return "uncompressed stream " + name + " base: " + base +
-         " offset: " + offset + " limit: " + limit;
+      return "uncompressed stream " + name + " position: " + currentOffset +
+          " length: " + length + " range: " + currentRange +
+          " offset: " + offsetInRange + " limit: " + limitInRange;
     }
   }
 
   private static class CompressedStream extends InStream {
     private final String name;
-    private byte[] array;
+    private final ByteBuffer[] bytes;
+    private final long[] offsets;
     private final int bufferSize;
+    private final long length;
     private ByteBuffer uncompressed = null;
     private final CompressionCodec codec;
-    private int offset;
-    private final int base;
-    private final int limit;
+    private byte[] compressed = null;
+    private long currentOffset;
+    private int currentRange;
+    private int offsetInCompressed;
+    private int limitInCompressed;
     private boolean isUncompressedOriginal;
 
-    public CompressedStream(String name, ByteBuffer input,
+    public CompressedStream(String name, ByteBuffer[] input,
+                            long[] offsets, long length,
                             CompressionCodec codec, int bufferSize
                            ) {
-      this.array = input.array();
+      this.bytes = input;
       this.name = name;
       this.codec = codec;
+      this.length = length;
+      this.offsets = offsets;
       this.bufferSize = bufferSize;
-      base = input.arrayOffset() + input.position();
-      offset = base;
-      limit = input.arrayOffset() + input.limit();
+      currentOffset = 0;
+      currentRange = 0;
+      offsetInCompressed = 0;
+      limitInCompressed = 0;
     }
 
     private void readHeader() throws IOException {
-      if (limit - offset > OutStream.HEADER_SIZE) {
-        int chunkLength = ((0xff & array[offset + 2]) << 15) |
-          ((0xff & array[offset + 1]) << 7) | ((0xff & array[offset]) >> 1);
+      if (compressed == null || offsetInCompressed >= limitInCompressed) {
+        seek(currentOffset);
+      }
+      if (limitInCompressed - offsetInCompressed > OutStream.HEADER_SIZE) {
+        int chunkLength = ((0xff & compressed[offsetInCompressed + 2]) << 15) |
+          ((0xff & compressed[offsetInCompressed + 1]) << 7) |
+            ((0xff & compressed[offsetInCompressed]) >> 1);
         if (chunkLength > bufferSize) {
           throw new IllegalArgumentException("Buffer size too small. size = " +
               bufferSize + " needed = " + chunkLength);
         }
-        boolean isOriginal = (array[offset] & 0x01) == 1;
-        offset += OutStream.HEADER_SIZE;
+        boolean isOriginal = (compressed[offsetInCompressed] & 0x01) == 1;
+        offsetInCompressed += OutStream.HEADER_SIZE;
         if (isOriginal) {
           isUncompressedOriginal = true;
-          uncompressed = ByteBuffer.wrap(array, offset, chunkLength);
+          uncompressed = bytes[currentRange].duplicate();
+          uncompressed.position(offsetInCompressed -
+              bytes[currentRange].arrayOffset());
+          uncompressed.limit(offsetInCompressed + chunkLength);
         } else {
           if (isUncompressedOriginal) {
             uncompressed = ByteBuffer.allocate(bufferSize);
@@ -125,19 +177,21 @@ abstract class InStream extends InputStr
           } else {
             uncompressed.clear();
           }
-          codec.decompress(ByteBuffer.wrap(array, offset, chunkLength),
+          codec.decompress(ByteBuffer.wrap(compressed, offsetInCompressed,
+              chunkLength),
             uncompressed);
         }
-        offset += chunkLength;
+        offsetInCompressed += chunkLength;
+        currentOffset += chunkLength + OutStream.HEADER_SIZE;
       } else {
-        throw new IllegalStateException("Can't read header");
+        throw new IllegalStateException("Can't read header at " + this);
       }
     }
 
     @Override
     public int read() throws IOException {
       if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (offset == limit) {
+        if (currentOffset == length) {
           return -1;
         }
         readHeader();
@@ -148,7 +202,7 @@ abstract class InStream extends InputStr
     @Override
     public int read(byte[] data, int offset, int length) throws IOException {
       if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (this.offset == this.limit) {
+        if (currentOffset == this.length) {
           return -1;
         }
         readHeader();
@@ -164,7 +218,7 @@ abstract class InStream extends InputStr
     @Override
     public int available() throws IOException {
       if (uncompressed == null || uncompressed.remaining() == 0) {
-        if (offset == limit) {
+        if (currentOffset == length) {
           return 0;
         }
         readHeader();
@@ -174,27 +228,74 @@ abstract class InStream extends InputStr
 
     @Override
     public void close() {
-      array = null;
       uncompressed = null;
-      offset = 0;
+      currentRange = bytes.length;
+      offsetInCompressed = 0;
+      limitInCompressed = 0;
+      currentOffset = length;
     }
 
     @Override
     public void seek(PositionProvider index) throws IOException {
-      offset = base + (int) index.getNext();
-      int uncompBytes = (int) index.getNext();
-      if (uncompBytes != 0) {
+      seek(index.getNext());
+      long uncompressedBytes = index.getNext();
+      if (uncompressedBytes != 0) {
         readHeader();
-        uncompressed.position(uncompressed.position() + uncompBytes);
+        uncompressed.position(uncompressed.position() +
+                              (int) uncompressedBytes);
       } else if (uncompressed != null) {
+        // mark the uncompressed buffer as done
         uncompressed.position(uncompressed.limit());
       }
     }
 
+    private void seek(long desired) throws IOException {
+      for(int i = 0; i < bytes.length; ++i) {
+        if (offsets[i] <= desired &&
+            desired - offsets[i] < bytes[i].remaining()) {
+          currentRange = i;
+          compressed = bytes[i].array();
+          offsetInCompressed = (int) (bytes[i].arrayOffset() +
+              bytes[i].position() + (desired - offsets[i]));
+          currentOffset = desired;
+          limitInCompressed = bytes[i].arrayOffset() + bytes[i].limit();
+          return;
+        }
+      }
+      // if they are seeking to the precise end, go ahead and let them go there
+      int segments = bytes.length;
+      if (segments != 0 &&
+          desired == offsets[segments - 1] + bytes[segments - 1].remaining()) {
+        currentRange = segments - 1;
+        compressed = bytes[currentRange].array();
+        offsetInCompressed = bytes[currentRange].arrayOffset() +
+          bytes[currentRange].limit();
+        currentOffset = desired;
+        limitInCompressed = offsetInCompressed;
+        return;
+      }
+      throw new IOException("Seek outside of data in " + this + " to " +
+        desired);
+    }
+
+    private String rangeString() {
+      StringBuilder builder = new StringBuilder();
+      for(int i=0; i < offsets.length; ++i) {
+        if (i != 0) {
+          builder.append("; ");
+        }
+        builder.append(" range " + i + " = " + offsets[i] + " to " +
+            bytes[i].remaining());
+      }
+      return builder.toString();
+    }
+
     @Override
     public String toString() {
-      return "compressed stream " + name + " base: " + base +
-          " offset: " + offset + " limit: " + limit +
+      return "compressed stream " + name + " position: " + currentOffset +
+          " length: " + length + " range: " + currentRange +
+          " offset: " + offsetInCompressed + " limit: " + limitInCompressed +
+          rangeString() +
           (uncompressed == null ? "" :
               " uncompressed: " + uncompressed.position() + " to " +
                   uncompressed.limit());
@@ -203,14 +304,29 @@ abstract class InStream extends InputStr
 
   public abstract void seek(PositionProvider index) throws IOException;
 
+  /**
+   * Create an input stream from a list of buffers.
+   * @param name the name of the stream
+   * @param input the list of ranges of bytes for the stream
+   * @param offsets a list of offsets (the same length as input) that must
+   *                contain the first offset of the each set of bytes in input
+   * @param length the length in bytes of the stream
+   * @param codec the compression codec
+   * @param bufferSize the compression buffer size
+   * @return an input stream
+   * @throws IOException
+   */
   public static InStream create(String name,
-                                ByteBuffer input,
+                                ByteBuffer[] input,
+                                long[] offsets,
+                                long length,
                                 CompressionCodec codec,
                                 int bufferSize) throws IOException {
     if (codec == null) {
-      return new UncompressedStream(name, input);
+      return new UncompressedStream(name, input, offsets, length);
     } else {
-      return new CompressedStream(name, input, codec, bufferSize);
+      return new CompressedStream(name, input, offsets, length, codec,
+          bufferSize);
     }
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Thu Sep 12 01:21:10 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.io.orc
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 import java.io.IOException;
@@ -47,19 +48,67 @@ public final class OrcFile {
    * prevent the new reader from reading ORC files generated by any released
    * version of Hive.
    */
-  public static final int MAJOR_VERSION = 0;
-  public static final int MINOR_VERSION = 11;
+  public static enum Version {
+    V_0_11("0.11", 0, 11),
+      V_0_12("0.12", 0, 12);
+
+    public static final Version CURRENT = V_0_12;
+
+    private final String name;
+    private final int major;
+    private final int minor;
+
+    private Version(String name, int major, int minor) {
+      this.name = name;
+      this.major = major;
+      this.minor = minor;
+    }
+
+    public static Version byName(String name) {
+      for(Version version: values()) {
+        if (version.name.equals(name)) {
+          return version;
+        }
+      }
+      throw new IllegalArgumentException("Unknown ORC version " + name);
+    }
+
+    /**
+     * Get the human readable name for the version.
+     */
+    public String getName() {
+      return name;
+    }
+
+    /**
+     * Get the major version number.
+     */
+    public int getMajor() {
+      return major;
+    }
+
+    /**
+     * Get the minor version number.
+     */
+    public int getMinor() {
+      return minor;
+    }
+  }
 
   // the table properties that control ORC files
   public static final String COMPRESSION = "orc.compress";
-  static final String DEFAULT_COMPRESSION = "ZLIB";
   public static final String COMPRESSION_BLOCK_SIZE = "orc.compress.size";
-  static final String DEFAULT_COMPRESSION_BLOCK_SIZE = "262144";
   public static final String STRIPE_SIZE = "orc.stripe.size";
-  static final String DEFAULT_STRIPE_SIZE = "268435456";
   public static final String ROW_INDEX_STRIDE = "orc.row.index.stride";
-  static final String DEFAULT_ROW_INDEX_STRIDE = "10000";
   public static final String ENABLE_INDEXES = "orc.create.index";
+  public static final String BLOCK_PADDING = "orc.block.padding";
+
+  static final long DEFAULT_STRIPE_SIZE = 256 * 1024 * 1024;
+  static final CompressionKind DEFAULT_COMPRESSION_KIND =
+    CompressionKind.ZLIB;
+  static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
+  static final int DEFAULT_ROW_INDEX_STRIDE = 10000;
+  static final boolean DEFAULT_BLOCK_PADDING = true;
 
   // unused
   private OrcFile() {}
@@ -77,7 +126,145 @@ public final class OrcFile {
   }
 
   /**
-   * Create an ORC file streamFactory.
+   * Options for creating ORC file writers.
+   */
+  public static class WriterOptions {
+    private final Configuration configuration;
+    private FileSystem fileSystemValue = null;
+    private ObjectInspector inspectorValue = null;
+    private long stripeSizeValue = DEFAULT_STRIPE_SIZE;
+    private int rowIndexStrideValue = DEFAULT_ROW_INDEX_STRIDE;
+    private int bufferSizeValue = DEFAULT_BUFFER_SIZE;
+    private boolean blockPaddingValue = DEFAULT_BLOCK_PADDING;
+    private CompressionKind compressValue = DEFAULT_COMPRESSION_KIND;
+    private MemoryManager memoryManagerValue;
+    private Version versionValue;
+
+    WriterOptions(Configuration conf) {
+      configuration = conf;
+      memoryManagerValue = getMemoryManager(conf);
+      String versionName =
+        conf.get(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
+      if (versionName == null) {
+        versionValue = Version.CURRENT;
+      } else {
+        versionValue = Version.byName(versionName);
+      }
+    }
+
+    /**
+     * Provide the filesystem for the path, if the client has it available.
+     * If it is not provided, it will be found from the path.
+     */
+    public WriterOptions fileSystem(FileSystem value) {
+      fileSystemValue = value;
+      return this;
+    }
+
+    /**
+     * Set the stripe size for the file. The writer stores the contents of the
+     * stripe in memory until this memory limit is reached and the stripe
+     * is flushed to the HDFS file and the next stripe started.
+     */
+    public WriterOptions stripeSize(long value) {
+      stripeSizeValue = value;
+      return this;
+    }
+
+    /**
+     * Set the distance between entries in the row index. The minimum value is
+     * 1000 to prevent the index from overwhelming the data. If the stride is
+     * set to 0, no indexes will be included in the file.
+     */
+    public WriterOptions rowIndexStride(int value) {
+      rowIndexStrideValue = value;
+      return this;
+    }
+
+    /**
+     * The size of the memory buffers used for compressing and storing the
+     * stripe in memory.
+     */
+    public WriterOptions bufferSize(int value) {
+      bufferSizeValue = value;
+      return this;
+    }
+
+    /**
+     * Sets whether the HDFS blocks are padded to prevent stripes from
+     * straddling blocks. Padding improves locality and thus the speed of
+     * reading, but costs space.
+     */
+    public WriterOptions blockPadding(boolean value) {
+      blockPaddingValue = value;
+      return this;
+    }
+
+    /**
+     * Sets the generic compression that is used to compress the data.
+     */
+    public WriterOptions compress(CompressionKind value) {
+      compressValue = value;
+      return this;
+    }
+
+    /**
+     * A required option that sets the object inspector for the rows. Used
+     * to determine the schema for the file.
+     */
+    public WriterOptions inspector(ObjectInspector value) {
+      inspectorValue = value;
+      return this;
+    }
+
+    /**
+     * Sets the version of the file that will be written.
+     */
+    public WriterOptions version(Version value) {
+      versionValue = value;
+      return this;
+    }
+
+    /**
+     * A package local option to set the memory manager.
+     */
+    WriterOptions memory(MemoryManager value) {
+      memoryManagerValue = value;
+      return this;
+    }
+  }
+
+  /**
+   * Create a default set of write options that can be modified.
+   */
+  public static WriterOptions writerOptions(Configuration conf) {
+    return new WriterOptions(conf);
+  }
+
+  /**
+   * Create an ORC file writer. This is the public interface for creating
+   * writers going forward and new options will only be added to this method.
+   * @param path filename to write to
+   * @param options the options
+   * @return a new ORC file writer
+   * @throws IOException
+   */
+  public static Writer createWriter(Path path,
+                                    WriterOptions opts
+                                    ) throws IOException {
+    FileSystem fs = opts.fileSystemValue == null ?
+      path.getFileSystem(opts.configuration) : opts.fileSystemValue;
+
+    return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue,
+                          opts.stripeSizeValue, opts.compressValue,
+                          opts.bufferSizeValue, opts.rowIndexStrideValue,
+                          opts.memoryManagerValue, opts.blockPaddingValue,
+                          opts.versionValue);
+  }
+
+  /**
+   * Create an ORC file writer. This method is provided for API backward
+   * compatability with Hive 0.11.
    * @param fs file system
    * @param path filename to write to
    * @param inspector the ObjectInspector that inspects the rows
@@ -86,7 +273,7 @@ public final class OrcFile {
    * @param bufferSize the number of bytes to compress at once
    * @param rowIndexStride the number of rows between row index entries or
    *                       0 to suppress all indexes
-   * @return a new ORC file streamFactory
+   * @return a new ORC file writer
    * @throws IOException
    */
   public static Writer createWriter(FileSystem fs,
@@ -97,8 +284,14 @@ public final class OrcFile {
                                     CompressionKind compress,
                                     int bufferSize,
                                     int rowIndexStride) throws IOException {
-    return new WriterImpl(fs, path, conf, inspector, stripeSize, compress,
-      bufferSize, rowIndexStride, getMemoryManager(conf));
+    return createWriter(path,
+                        writerOptions(conf)
+                        .fileSystem(fs)
+                        .inspector(inspector)
+                        .stripeSize(stripeSize)
+                        .compress(compress)
+                        .bufferSize(bufferSize)
+                        .rowIndexStride(rowIndexStride));
   }
 
   private static MemoryManager memoryManager = null;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Thu Sep 12 01:21:10 2013
@@ -22,6 +22,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.google.common.collect.Lists;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,7 +34,12 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -48,6 +56,8 @@ public class OrcInputFormat  extends Fil
 
   VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat();
 
+  private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
+
   private static class OrcRecordReader
       implements RecordReader<NullWritable, OrcStruct> {
     private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
@@ -59,14 +69,38 @@ public class OrcInputFormat  extends Fil
 
     OrcRecordReader(Reader file, Configuration conf,
                     long offset, long length) throws IOException {
-      this.reader = file.rows(offset, length,
-          findIncludedColumns(file.getTypes(), conf));
+      String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
+      String columnNamesString =
+          conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+      String[] columnNames = null;
+      SearchArgument sarg = null;
       List<OrcProto.Type> types = file.getTypes();
       if (types.size() == 0) {
         numColumns = 0;
       } else {
         numColumns = types.get(0).getSubtypesCount();
       }
+      columnNames = new String[types.size()];
+      LOG.info("included column ids = " +
+          conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "null"));
+      LOG.info("included columns names = " +
+          conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "null"));
+      boolean[] includeColumn = findIncludedColumns(types, conf);
+      if (serializedPushdown != null && columnNamesString != null) {
+        sarg = SearchArgument.FACTORY.create
+            (Utilities.deserializeExpression(serializedPushdown, conf));
+        LOG.info("ORC pushdown predicate: " + sarg);
+        String[] neededColumnNames = columnNamesString.split(",");
+        int i = 0;
+        for(int columnId: types.get(0).getSubtypesList()) {
+          if (includeColumn[columnId]) {
+            columnNames[columnId] = neededColumnNames[i++];
+          }
+        }
+      } else {
+        LOG.info("No ORC pushdown predicate");
+      }
+      this.reader = file.rows(offset, length,includeColumn, sarg, columnNames);
       this.offset = offset;
       this.length = length;
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Thu Sep 12 01:21:10 2013
@@ -47,32 +47,20 @@ public class OrcOutputFormat extends Fil
       implements RecordWriter<NullWritable, OrcSerdeRow>,
                  FileSinkOperator.RecordWriter {
     private Writer writer = null;
-    private final FileSystem fs;
     private final Path path;
-    private final Configuration conf;
-    private final long stripeSize;
-    private final int compressionSize;
-    private final CompressionKind compress;
-    private final int rowIndexStride;
-
-    OrcRecordWriter(FileSystem fs, Path path, Configuration conf,
-                    String stripeSize, String compress,
-                    String compressionSize, String rowIndexStride) {
-      this.fs = fs;
+    private final OrcFile.WriterOptions options;
+
+    OrcRecordWriter(Path path, OrcFile.WriterOptions options) {
       this.path = path;
-      this.conf = conf;
-      this.stripeSize = Long.valueOf(stripeSize);
-      this.compress = CompressionKind.valueOf(compress);
-      this.compressionSize = Integer.valueOf(compressionSize);
-      this.rowIndexStride = Integer.valueOf(rowIndexStride);
+      this.options = options;
     }
 
     @Override
     public void write(NullWritable nullWritable,
                       OrcSerdeRow row) throws IOException {
       if (writer == null) {
-        writer = OrcFile.createWriter(fs, path, this.conf, row.getInspector(),
-            stripeSize, compress, compressionSize, rowIndexStride);
+        options.inspector(row.getInspector());
+        writer = OrcFile.createWriter(path, options);
       }
       writer.addRow(row.getRow());
     }
@@ -81,9 +69,8 @@ public class OrcOutputFormat extends Fil
     public void write(Writable row) throws IOException {
       OrcSerdeRow serdeRow = (OrcSerdeRow) row;
       if (writer == null) {
-        writer = OrcFile.createWriter(fs, path, this.conf,
-            serdeRow.getInspector(), stripeSize, compress, compressionSize,
-            rowIndexStride);
+        options.inspector(serdeRow.getInspector());
+        writer = OrcFile.createWriter(path, options);
       }
       writer.addRow(serdeRow.getRow());
     }
@@ -102,8 +89,8 @@ public class OrcOutputFormat extends Fil
         ObjectInspector inspector = ObjectInspectorFactory.
             getStandardStructObjectInspector(new ArrayList<String>(),
                 new ArrayList<ObjectInspector>());
-        writer = OrcFile.createWriter(fs, path, this.conf, inspector,
-            stripeSize, compress, compressionSize, rowIndexStride);
+        options.inspector(inspector);
+        writer = OrcFile.createWriter(path, options);
       }
       writer.close();
     }
@@ -113,9 +100,8 @@ public class OrcOutputFormat extends Fil
   public RecordWriter<NullWritable, OrcSerdeRow>
       getRecordWriter(FileSystem fileSystem, JobConf conf, String name,
                       Progressable reporter) throws IOException {
-    return new OrcRecordWriter(fileSystem,  new Path(name), conf,
-      OrcFile.DEFAULT_STRIPE_SIZE, OrcFile.DEFAULT_COMPRESSION,
-      OrcFile.DEFAULT_COMPRESSION_BLOCK_SIZE, OrcFile.DEFAULT_ROW_INDEX_STRIDE);
+    return new
+      OrcRecordWriter(new Path(name), OrcFile.writerOptions(conf));
   }
 
   @Override
@@ -126,20 +112,42 @@ public class OrcOutputFormat extends Fil
                          boolean isCompressed,
                          Properties tableProperties,
                          Progressable reporter) throws IOException {
-    String stripeSize = tableProperties.getProperty(OrcFile.STRIPE_SIZE,
-        OrcFile.DEFAULT_STRIPE_SIZE);
-    String compression = tableProperties.getProperty(OrcFile.COMPRESSION,
-        OrcFile.DEFAULT_COMPRESSION);
-    String compressionSize =
-      tableProperties.getProperty(OrcFile.COMPRESSION_BLOCK_SIZE,
-        OrcFile.DEFAULT_COMPRESSION_BLOCK_SIZE);
-    String rowIndexStride =
-        tableProperties.getProperty(OrcFile.ROW_INDEX_STRIDE,
-            OrcFile.DEFAULT_ROW_INDEX_STRIDE);
-    if ("false".equals(tableProperties.getProperty(OrcFile.ENABLE_INDEXES))) {
-      rowIndexStride = "0";
+    OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
+    if (tableProperties.containsKey(OrcFile.STRIPE_SIZE)) {
+      options.stripeSize(Long.parseLong
+                           (tableProperties.getProperty(OrcFile.STRIPE_SIZE)));
+    }
+
+    if (tableProperties.containsKey(OrcFile.COMPRESSION)) {
+      options.compress(CompressionKind.valueOf
+                           (tableProperties.getProperty(OrcFile.COMPRESSION)));
+    }
+
+    if (tableProperties.containsKey(OrcFile.COMPRESSION_BLOCK_SIZE)) {
+      options.bufferSize(Integer.parseInt
+                         (tableProperties.getProperty
+                            (OrcFile.COMPRESSION_BLOCK_SIZE)));
+    }
+
+    if (tableProperties.containsKey(OrcFile.ROW_INDEX_STRIDE)) {
+      options.rowIndexStride(Integer.parseInt
+                             (tableProperties.getProperty
+                              (OrcFile.ROW_INDEX_STRIDE)));
     }
-    return new OrcRecordWriter(path.getFileSystem(conf), path, conf,
-      stripeSize, compression, compressionSize, rowIndexStride);
+
+    if (tableProperties.containsKey(OrcFile.ENABLE_INDEXES)) {
+      if ("false".equals(tableProperties.getProperty
+                         (OrcFile.ENABLE_INDEXES))) {
+        options.rowIndexStride(0);
+      }
+    }
+
+    if (tableProperties.containsKey(OrcFile.BLOCK_PADDING)) {
+      options.blockPadding(Boolean.parseBoolean
+                           (tableProperties.getProperty
+                            (OrcFile.BLOCK_PADDING)));
+    }
+
+    return new OrcRecordWriter(path, options);
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Thu Sep 12 01:21:10 2013
@@ -22,7 +22,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Properties;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedSerde;
@@ -40,6 +41,9 @@ import org.apache.hadoop.io.Writable;
  * It transparently passes the object to/from the ORC file reader/writer.
  */
 public class OrcSerde implements SerDe, VectorizedSerde {
+
+  private static final Log LOG = LogFactory.getLog(OrcSerde.class);
+
   private final OrcSerdeRow row = new OrcSerdeRow();
   private ObjectInspector inspector = null;
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Thu Sep 12 01:21:10 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 import java.io.IOException;
@@ -118,8 +119,26 @@ public interface Reader {
    * @param include true for each column that should be included
    * @return a new RecordReader that will read the specified rows.
    * @throws IOException
+   * @deprecated
    */
   RecordReader rows(long offset, long length,
                     boolean[] include) throws IOException;
 
+  /**
+   * Create a RecordReader that will read a section of a file. It starts reading
+   * at the first stripe after the offset and continues to the stripe that
+   * starts at offset + length. It also accepts a list of columns to read and a
+   * search argument.
+   * @param offset the minimum offset of the first stripe to read
+   * @param length the distance from offset of the first address to stop reading
+   *               at
+   * @param include true for each column that should be included
+   * @param sarg a search argument that limits the rows that should be read.
+   * @param neededColumns the names of the included columns
+   * @return the record reader for the rows
+   */
+  RecordReader rows(long offset, long length,
+                    boolean[] include, SearchArgument sarg,
+                    String[] neededColumns) throws IOException;
+
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Thu Sep 12 01:21:10 2013
@@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Text;
 
@@ -247,11 +248,13 @@ final class ReaderImpl implements Reader
       if (version.size() >= 2) {
         minor = version.get(1);
       }
-      if (major > OrcFile.MAJOR_VERSION ||
-          (major == OrcFile.MAJOR_VERSION && minor > OrcFile.MINOR_VERSION)) {
-        log.warn("ORC file " + path + " was written by a future Hive version " +
-            versionString(version) + ". This file may not be readable by " +
-            "this version of Hive.");
+      if (major > OrcFile.Version.CURRENT.getMajor() ||
+          (major == OrcFile.Version.CURRENT.getMajor() &&
+           minor > OrcFile.Version.CURRENT.getMinor())) {
+        log.warn("ORC file " + path +
+                 " was written by a future Hive version " +
+                 versionString(version) +
+                 ". This file may not be readable by this version of Hive.");
       }
     }
   }
@@ -307,7 +310,8 @@ final class ReaderImpl implements Reader
       buffer.position(psOffset - footerSize);
       buffer.limit(psOffset);
     }
-    InputStream instream = InStream.create("footer", buffer, codec, bufferSize);
+    InputStream instream = InStream.create("footer", new ByteBuffer[]{buffer},
+        new long[]{0L}, footerSize, codec, bufferSize);
     footer = OrcProto.Footer.parseFrom(instream);
     inspector = OrcStruct.createObjectInspector(0, footer.getTypesList());
     file.close();
@@ -315,15 +319,22 @@ final class ReaderImpl implements Reader
 
   @Override
   public RecordReader rows(boolean[] include) throws IOException {
-    return rows(0, Long.MAX_VALUE, include);
+    return rows(0, Long.MAX_VALUE, include, null, null);
   }
 
   @Override
   public RecordReader rows(long offset, long length, boolean[] include
                            ) throws IOException {
+    return rows(offset, length, include, null, null);
+  }
+
+  @Override
+  public RecordReader rows(long offset, long length, boolean[] include,
+                           SearchArgument sarg, String[] columnNames
+                           ) throws IOException {
     return new RecordReaderImpl(this.getStripes(), fileSystem,  path, offset,
-      length, footer.getTypesList(), codec, bufferSize,
-      include, footer.getRowIndexStride());
+        length, footer.getTypesList(), codec, bufferSize,
+        include, footer.getRowIndexStride(), sarg, columnNames);
   }
 
 }