You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/09/12 03:21:29 UTC

svn commit: r1522098 [16/30] - in /hive/branches/vectorization: ./ beeline/src/test/org/apache/hive/beeline/src/test/ bin/ bin/ext/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/a...

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Thu Sep 12 01:21:10 2013
@@ -32,8 +32,8 @@ import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -310,7 +310,7 @@ public class MapOperator extends Operato
           tblRawRowObjectInspector =
               (StructObjectInspector) ObjectInspectorConverters.getConvertedOI(
                   partRawRowObjectInspector,
-                  tblDeserializer.getObjectInspector());
+                  tblDeserializer.getObjectInspector(), true);
 
           if (identityConverterTableDesc.contains(tableDesc)) {
             if (!partRawRowObjectInspector.equals(tblRawRowObjectInspector)) {
@@ -347,6 +347,8 @@ public class MapOperator extends Operato
     Path fpath = new Path(HiveConf.getVar(hconf,
         HiveConf.ConfVars.HADOOPMAPFILENAME));
 
+    boolean schemeless = fpath.toUri().getScheme() == null;
+
     List<Operator<? extends OperatorDesc>> children =
         new ArrayList<Operator<? extends OperatorDesc>>();
 
@@ -358,6 +360,10 @@ public class MapOperator extends Operato
         List<String> aliases = entry.getValue();
 
         Path onepath = new Path(onefile);
+        if (schemeless) {
+          onepath = new Path(onepath.toUri().getPath());
+        }
+
         PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
 
         for (String onealias : aliases) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Sep 12 01:21:10 2013
@@ -58,7 +58,7 @@ public abstract class Operator<T extends
 
   private static final long serialVersionUID = 1L;
 
-  private Configuration configuration;
+  private transient Configuration configuration;
   protected List<Operator<? extends OperatorDesc>> childOperators;
   protected List<Operator<? extends OperatorDesc>> parentOperators;
   protected String operatorId;
@@ -196,7 +196,7 @@ public abstract class Operator<T extends
   }
 
   // non-bean fields needed during compilation
-  private transient RowSchema rowSchema;
+  private RowSchema rowSchema;
 
   public void setSchema(RowSchema rowSchema) {
     this.rowSchema = rowSchema;
@@ -592,6 +592,9 @@ public abstract class Operator<T extends
     state = State.CLOSE;
     LOG.info(id + " finished. closing... ");
 
+    // call the operator specific close routine
+    closeOp(abort);
+
     if (counterNameToEnum != null) {
       incrCounter(numInputRowsCntr, inputRows);
       incrCounter(numOutputRowsCntr, outputRows);
@@ -600,9 +603,6 @@ public abstract class Operator<T extends
 
     LOG.info(id + " forwarded " + cntr + " rows");
 
-    // call the operator specific close routine
-    closeOp(abort);
-
     try {
       logStats();
       if (childOperators == null) {
@@ -816,13 +816,7 @@ public abstract class Operator<T extends
       }
     }
 
-    if (isLogInfoEnabled) {
-      cntr++;
-      if (cntr == nextCntr) {
-        LOG.info(id + " forwarding " + cntr + " rows");
-        nextCntr = getNextCntr(cntr);
-      }
-    }
+    increaseForward(1);
 
     // For debugging purposes:
     // System.out.println("" + this.getClass() + ": " +
@@ -855,6 +849,18 @@ public abstract class Operator<T extends
     }
   }
 
+  void increaseForward(long counter) {
+    if (isLogInfoEnabled) {
+      cntr += counter;
+      if (cntr >= nextCntr) {
+        LOG.info(id + " forwarding " + cntr + " rows");
+        do {
+          nextCntr = getNextCntr(nextCntr);
+        } while(cntr >= nextCntr);
+      }
+    }
+  }
+
   public void resetStats() {
     for (Enum<?> e : statsMap.keySet()) {
       statsMap.get(e).set(0L);
@@ -1550,6 +1556,17 @@ public abstract class Operator<T extends
     return true;
   }
 
+  /**
+   * used for LimitPushdownOptimizer
+   *
+   * if all of the operators between limit and reduce-sink does not remove any input rows
+   * in the range of limit count, limit can be pushed down to reduce-sink operator.
+   * forward, select, etc.
+   */
+  public boolean acceptLimitPushdown() {
+    return false;
+  }
+
   @Override
   public String toString() {
     return getName() + "[" + getIdentifier() + "]";

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Thu Sep 12 01:21:10 2013
@@ -34,8 +34,6 @@ import org.apache.hadoop.hive.ql.plan.PT
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
@@ -44,11 +42,9 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
-public class PTFOperator extends Operator<PTFDesc> implements Serializable
-{
+public class PTFOperator extends Operator<PTFDesc> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 	PTFPartition inputPart;
@@ -67,8 +63,7 @@ public class PTFOperator extends Operato
 	 * 4. Create input partition to store rows coming from previous operator
 	 */
 	@Override
-	protected void initializeOp(Configuration jobConf) throws HiveException
-	{
+	protected void initializeOp(Configuration jobConf) throws HiveException {
 		hiveConf = new HiveConf(jobConf, PTFOperator.class);
 		// if the parent is ExtractOperator, this invocation is from reduce-side
 		Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0);
@@ -78,13 +73,10 @@ public class PTFOperator extends Operato
     inputPart = createFirstPartitionForChain(
         inputObjInspectors[0], hiveConf, isMapOperator);
 
-		if (isMapOperator)
-		{
+		if (isMapOperator) {
 			PartitionedTableFunctionDef tDef = conf.getStartOfChain();
 			outputObjInspector = tDef.getRawInputShape().getOI();
-		}
-		else
-		{
+		} else {
 			outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
 		}
 
@@ -94,16 +86,12 @@ public class PTFOperator extends Operato
 	}
 
 	@Override
-	protected void closeOp(boolean abort) throws HiveException
-	{
+	protected void closeOp(boolean abort) throws HiveException {
 		super.closeOp(abort);
     if(inputPart.size() != 0){
-      if (isMapOperator)
-      {
+      if (isMapOperator) {
         processMapFunction();
-      }
-      else
-      {
+      } else {
         processInputPartition();
       }
     }
@@ -113,8 +101,7 @@ public class PTFOperator extends Operato
 	@Override
 	public void processOp(Object row, int tag) throws HiveException
 	{
-	  if (!isMapOperator )
-    {
+	  if (!isMapOperator ) {
       /*
        * checkif current row belongs to the current accumulated Partition:
        * - If not:
@@ -122,24 +109,19 @@ public class PTFOperator extends Operato
        *  - reset input Partition
        * - set currentKey to the newKey if it is null or has changed.
        */
-      newKeys.getNewKey(row, inputPart.getOI());
+      newKeys.getNewKey(row, inputPart.getInputOI());
       boolean keysAreEqual = (currentKeys != null && newKeys != null)?
               newKeys.equals(currentKeys) : false;
 
-      if (currentKeys != null && !keysAreEqual)
-      {
+      if (currentKeys != null && !keysAreEqual) {
         processInputPartition();
         inputPart.reset();
       }
 
-      if (currentKeys == null || !keysAreEqual)
-      {
-        if (currentKeys == null)
-        {
+      if (currentKeys == null || !keysAreEqual) {
+        if (currentKeys == null) {
           currentKeys = newKeys.copyKey();
-        }
-        else
-        {
+        } else {
           currentKeys.copyKey(newKeys);
         }
       }
@@ -156,16 +138,14 @@ public class PTFOperator extends Operato
 	 * @param hiveConf
 	 * @throws HiveException
 	 */
-	protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException
-	{
+	protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException {
 
 	  PTFDeserializer dS =
 	      new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
 	  dS.initializePTFChain(conf.getFuncDef());
 	}
 
-	protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException
-	{
+	protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException {
 		PartitionDef pDef = conf.getStartOfChain().getPartition();
 		ArrayList<PTFExpressionDef> exprs = pDef.getExpressions();
 		int numExprs = exprs.size();
@@ -173,8 +153,7 @@ public class PTFOperator extends Operato
 		ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
 		ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
 
-		for(int i=0; i<numExprs; i++)
-		{
+		for(int i=0; i<numExprs; i++) {
 		  PTFExpressionDef exprDef = exprs.get(i);
 			/*
 			 * Why cannot we just use the ExprNodeEvaluator on the column?
@@ -192,29 +171,20 @@ public class PTFOperator extends Operato
 	  newKeys = keyWrapperFactory.getKeyWrapper();
 	}
 
-	protected void processInputPartition() throws HiveException
-	{
+	protected void processInputPartition() throws HiveException {
 	  PTFPartition outPart = executeChain(inputPart);
-	  if ( conf.forWindowing() ) {
-	    executeWindowExprs(outPart);
-	  }
-	  else {
-	    PTFPartitionIterator<Object> pItr = outPart.iterator();
-	    while (pItr.hasNext())
-	    {
-	      Object oRow = pItr.next();
-	      forward(oRow, outputObjInspector);
-	    }
-	  }
+	  PTFPartitionIterator<Object> pItr = outPart.iterator();
+    while (pItr.hasNext()) {
+      Object oRow = pItr.next();
+      forward(oRow, outputObjInspector);
+    }
 	}
 
-	protected void processMapFunction() throws HiveException
-	{
+	protected void processMapFunction() throws HiveException {
 	  PartitionedTableFunctionDef tDef = conf.getStartOfChain();
     PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart);
     PTFPartitionIterator<Object> pItr = outPart.iterator();
-    while (pItr.hasNext())
-    {
+    while (pItr.hasNext()) {
       Object oRow = pItr.next();
       forward(oRow, outputObjInspector);
     }
@@ -234,8 +204,7 @@ public class PTFOperator extends Operato
 
 
 	@Override
-	public OperatorType getType()
-	{
+	public OperatorType getType() {
 		return OperatorType.PTF;
 	}
 
@@ -250,124 +219,23 @@ public class PTFOperator extends Operato
    * @throws HiveException
    */
   private PTFPartition executeChain(PTFPartition part)
-      throws HiveException
-  {
+      throws HiveException {
     Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
     PTFInputDef iDef = conf.getFuncDef();
-    while (true)
-    {
-      if (iDef instanceof PartitionedTableFunctionDef)
-      {
-        fnDefs.push((PartitionedTableFunctionDef) iDef);
-        iDef = ((PartitionedTableFunctionDef) iDef).getInput();
-      }
-      else
-      {
-        break;
-      }
+
+    while (iDef instanceof PartitionedTableFunctionDef) {
+      fnDefs.push((PartitionedTableFunctionDef) iDef);
+      iDef = ((PartitionedTableFunctionDef) iDef).getInput();
     }
 
     PartitionedTableFunctionDef currFnDef;
-    while (!fnDefs.isEmpty())
-    {
+    while (!fnDefs.isEmpty()) {
       currFnDef = fnDefs.pop();
       part = currFnDef.getTFunction().execute(part);
     }
     return part;
   }
 
-  /**
-   * If WindowingSpec contains any Windowing Expressions or has a
-   * Having condition then these are processed
-   * and forwarded on. Whereas when there is no having or WdwExprs
-   * just forward the rows in the output Partition.
-   *
-   * For e.g. Consider the following query:
-   * <pre>
-   * {@code
-   *  select rank(), lead(rank(),1),...
-   *  from xyz
-   *  ...
-   *  having rank() > 1
-   *  }
-   * </pre>
-   * rank() gets processed as a WdwFn; Its in the oPart(output partition)
-   * argument to executeWindowExprs. Here we first evaluate the having expression.
-   * So the first row of oPart gets filtered out.
-   * Next we evaluate lead(rank()) which is held as a WindowExpression and add it to the output.
-   *
-   * @param ptfDesc
-   * @param oPart output partition after Window Fns are processed.
-   * @param op
-   * @throws HiveException
-   */
-  private void executeWindowExprs(PTFPartition oPart)
-      throws HiveException
-  {
-    WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) conf.getFuncDef();
-    /*
-     * inputOI represents the row with WindowFn results present.
-     * So in the e.g. above it will have a column for 'rank()'
-     */
-    StructObjectInspector inputOI = wTFnDef.getOutputFromWdwFnProcessing().getOI();
-    /*
-     * outputOI represents the final row with the Windowing Expressions added.
-     * So in the e.g. above it will have a column for 'lead(rank())' in addition to
-     * all columns in inputOI.
-     */
-    StructObjectInspector outputOI = wTFnDef.getOutputShape().getOI();
-    int numCols = outputOI.getAllStructFieldRefs().size();
-    ArrayList<WindowExpressionDef> wdwExprs = wTFnDef.getWindowExpressions();
-    int numWdwExprs = wdwExprs == null ? 0 : wdwExprs.size();
-    Object[] output = new Object[numCols];
-
-    /*
-     * If this Windowing invocation has no Window Expressions and doesn't need to be filtered,
-     * we can just forward the row in the oPart partition.
-     */
-    boolean forwardRowsUntouched = (wdwExprs == null || wdwExprs.size() == 0 );
-
-    PTFPartitionIterator<Object> pItr = oPart.iterator();
-    PTFOperator.connectLeadLagFunctionsToPartition(conf, pItr);
-    while (pItr.hasNext())
-    {
-      int colCnt = 0;
-      Object oRow = pItr.next();
-
-      /*
-       * when there is no Windowing expressions or having;
-       * just forward the Object coming out of the Partition.
-       */
-      if ( forwardRowsUntouched ) {
-        forward(oRow, outputObjInspector);
-        continue;
-      }
-
-      /*
-       * Setup the output row columns in the following order
-       * - the columns in the SelectList processed by the PTF
-       * (ie the Select Exprs that have navigation expressions)
-       * - the columns from the final PTF.
-       */
-
-      if ( wdwExprs != null ) {
-        for (WindowExpressionDef wdwExpr : wdwExprs)
-        {
-          Object newCol = wdwExpr.getExprEvaluator().evaluate(oRow);
-          output[colCnt++] = newCol;
-        }
-      }
-
-      for(; colCnt < numCols; ) {
-        StructField field = inputOI.getAllStructFieldRefs().get(colCnt - numWdwExprs);
-        output[colCnt++] =
-            ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, field),
-            field.getFieldObjectInspector());
-      }
-
-      forward(output, outputObjInspector);
-    }
-  }
 
   /**
    * Create a new Partition.
@@ -390,31 +258,31 @@ public class PTFOperator extends Operato
    * @throws HiveException
    */
   public PTFPartition createFirstPartitionForChain(ObjectInspector oi,
-      HiveConf hiveConf, boolean isMapSide) throws HiveException
-  {
+      HiveConf hiveConf, boolean isMapSide) throws HiveException {
     PartitionedTableFunctionDef tabDef = conf.getStartOfChain();
     TableFunctionEvaluator tEval = tabDef.getTFunction();
-    String partClassName = tEval.getPartitionClass();
-    int partMemSize = tEval.getPartitionMemSize();
 
     PTFPartition part = null;
     SerDe serde = isMapSide ? tabDef.getInput().getOutputShape().getSerde() :
       tabDef.getRawInputShape().getSerde();
-    part = new PTFPartition(partClassName, partMemSize, serde,
-        (StructObjectInspector) oi);
+    StructObjectInspector outputOI = isMapSide ? tabDef.getInput().getOutputShape().getOI() :
+      tabDef.getRawInputShape().getOI();
+    part = PTFPartition.create(conf.getCfg(),
+        serde,
+        (StructObjectInspector) oi,
+        outputOI);
+
     return part;
 
   }
 
   public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc,
-      PTFPartitionIterator<Object> pItr) throws HiveException
-  {
+      PTFPartitionIterator<Object> pItr) throws HiveException {
     List<ExprNodeGenericFuncDesc> llFnDescs = ptfDesc.getLlInfo().getLeadLagExprs();
     if (llFnDescs == null) {
       return;
     }
-    for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs)
-    {
+    for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs) {
       GenericUDFLeadLag llFn = (GenericUDFLeadLag) llFnDesc
           .getGenericUDF();
       llFn.setpItr(pItr);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFPartition.java Thu Sep 12 01:21:10 2013
@@ -20,227 +20,166 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.util.ConcurrentModificationException;
 import java.util.Iterator;
+import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.exec.PTFPersistence.ByteBasedList;
+import org.apache.hadoop.hive.ql.exec.persistence.PTFRowContainer;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-import org.apache.hadoop.io.Writable;
 
 /*
  * represents a collection of rows that is acted upon by a TableFunction or a WindowFunction.
  */
-public class PTFPartition
-{
-  SerDe serDe;
-  StructObjectInspector OI;
-  private ByteBasedList elems;
-  private Writable wRow;
-  private int sz;
-
-  public PTFPartition(HiveConf cfg, SerDe serDe, StructObjectInspector oI) throws HiveException
-  {
-    String partitionClass = HiveConf.getVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENCE_CLASS);
-    int partitionMemSize = HiveConf.getIntVar(cfg, ConfVars.HIVE_PTF_PARTITION_PERSISTENT_SIZE);
-    init(partitionClass, partitionMemSize, serDe, oI);
-  }
-
-  public PTFPartition(String partitionClass, int partitionMemSize, SerDe serDe, StructObjectInspector oI) throws HiveException
-  {
-    init(partitionClass, partitionMemSize, serDe, oI);
-  }
+@SuppressWarnings("deprecation")
+public class PTFPartition {
+  protected static Log LOG = LogFactory.getLog(PTFPartition.class);
 
-  private void init(String partitionClass, int partitionMemSize, SerDe serDe, StructObjectInspector oI) throws HiveException
-  {
+  SerDe serDe;
+  StructObjectInspector inputOI;
+  StructObjectInspector outputOI;
+  private final PTFRowContainer<List<Object>> elems;
+
+  protected PTFPartition(HiveConf cfg,
+      SerDe serDe, StructObjectInspector inputOI,
+      StructObjectInspector outputOI)
+      throws HiveException {
     this.serDe = serDe;
-    OI = oI;
-    elems = PTFPersistence.createList(partitionClass, partitionMemSize);
-    sz = 0;
-    wRow = createWritable();
+    this.inputOI = inputOI;
+    this.outputOI = outputOI;
+    int containerNumRows = HiveConf.getIntVar(cfg, ConfVars.HIVEJOINCACHESIZE);
+    elems = new PTFRowContainer<List<Object>>(containerNumRows, cfg, null);
+    elems.setSerDe(serDe, outputOI);
+    elems.setTableDesc(PTFRowContainer.createTableDesc(inputOI));
   }
 
   public void reset() throws HiveException {
-    sz = 0;
-    elems.reset(0);
+    elems.clear();
   }
 
-  public SerDe getSerDe()
-  {
+  public SerDe getSerDe() {
     return serDe;
   }
-  public void setSerDe(SerDe serDe)
-  {
-    this.serDe = serDe;
-  }
-  public StructObjectInspector getOI()
-  {
-    return OI;
-  }
-  public void setOI(StructObjectInspector oI)
-  {
-    OI = oI;
-  }
 
-  private Writable createWritable() throws HiveException
-  {
-    try
-    {
-      return serDe.getSerializedClass().newInstance();
-    }
-    catch(Throwable t)
-    {
-      throw new HiveException(t);
-    }
+  public StructObjectInspector getInputOI() {
+    return inputOI;
   }
 
-  public Object getAt(int i) throws HiveException
-  {
-    try
-    {
-      elems.get(i, wRow);
-      Object o = serDe.deserialize(wRow);
-      return o;
-    }
-    catch(SerDeException  se)
-    {
-      throw new HiveException(se);
-    }
+  public StructObjectInspector getOutputOI() {
+    return outputOI;
   }
 
-  public Object getWritableAt(int i) throws HiveException
+  public Object getAt(int i) throws HiveException
   {
-    elems.get(i, wRow);
-    return wRow;
+    return elems.getAt(i);
   }
 
-  public void append(Writable o) throws HiveException
-  {
-    elems.append(o);
-    sz++;
-  }
+  public void append(Object o) throws HiveException {
 
-  public void append(Object o) throws HiveException
-  {
-    try
-    {
-      append(serDe.serialize(o, OI));
-    }
-    catch(SerDeException e)
-    {
-      throw new HiveException(e);
+    if ( elems.size() == Integer.MAX_VALUE ) {
+      throw new HiveException(String.format("Cannot add more than %d elements to a PTFPartition",
+          Integer.MAX_VALUE));
     }
+
+    @SuppressWarnings("unchecked")
+    List<Object> l = (List<Object>)
+        ObjectInspectorUtils.copyToStandardObject(o, inputOI, ObjectInspectorCopyOption.WRITABLE);
+    elems.add(l);
   }
 
-  public int size()
-  {
-    return sz;
+  public int size() {
+    return (int) elems.size();
   }
 
-  public PTFPartitionIterator<Object> iterator()
-  {
+  public PTFPartitionIterator<Object> iterator() throws HiveException {
+    elems.first();
     return new PItr(0, size());
   }
 
-  public PTFPartitionIterator<Object> range(int start, int end)
-  {
-    assert(start >= 0);
-    assert(end <= size());
-    assert(start <= end);
+  public PTFPartitionIterator<Object> range(int start, int end) {
+    assert (start >= 0);
+    assert (end <= size());
+    assert (start <= end);
     return new PItr(start, end);
   }
 
   public void close() {
-    elems.close();
+    try {
+      elems.close();
+    } catch (HiveException e) {
+      LOG.error(e.toString(), e);
+    }
   }
 
-  class PItr implements PTFPartitionIterator<Object>
-  {
+  class PItr implements PTFPartitionIterator<Object> {
     int idx;
     final int start;
     final int end;
     final int createTimeSz;
 
-    PItr(int start, int end)
-    {
+    PItr(int start, int end) {
       this.idx = start;
       this.start = start;
       this.end = end;
       createTimeSz = PTFPartition.this.size();
     }
 
-    public boolean hasNext()
-    {
-      checkForComodification() ;
+    public boolean hasNext() {
+      checkForComodification();
       return idx < end;
     }
 
-    public Object next()
-    {
+    public Object next() {
       checkForComodification();
-      try
-      {
+      try {
         return PTFPartition.this.getAt(idx++);
-      }
-      catch(HiveException e)
-      {
+      } catch (HiveException e) {
         throw new RuntimeException(e);
       }
     }
 
-    public void remove()
-    {
+    public void remove() {
       throw new UnsupportedOperationException();
     }
 
-    final void checkForComodification()
-    {
-        if (createTimeSz != PTFPartition.this.size()) {
-          throw new ConcurrentModificationException();
-        }
+    final void checkForComodification() {
+      if (createTimeSz != PTFPartition.this.size()) {
+        throw new ConcurrentModificationException();
+      }
     }
 
     @Override
-    public int getIndex()
-    {
+    public int getIndex() {
       return idx;
     }
 
-    private Object getAt(int i)
-    {
-      try
-      {
-        return PTFPartition.this.getAt(i);
-      }
-      catch(HiveException e)
-      {
-        throw new RuntimeException(e);
-      }
+    private Object getAt(int i) throws HiveException {
+      return PTFPartition.this.getAt(i);
     }
 
     @Override
-    public Object lead(int amt)
-    {
+    public Object lead(int amt) throws HiveException {
       int i = idx + amt;
       i = i >= end ? end - 1 : i;
       return getAt(i);
     }
 
     @Override
-    public Object lag(int amt)
-    {
+    public Object lag(int amt) throws HiveException {
       int i = idx - amt;
       i = i < start ? start : i;
       return getAt(i);
     }
 
     @Override
-    public Object resetToIndex(int idx)
-    {
-      if ( idx < start || idx >= end )
-      {
+    public Object resetToIndex(int idx) throws HiveException {
+      if (idx < start || idx >= end) {
         return null;
       }
       Object o = getAt(idx);
@@ -249,14 +188,12 @@ public class PTFPartition
     }
 
     @Override
-    public PTFPartition getPartition()
-    {
+    public PTFPartition getPartition() {
       return PTFPartition.this;
     }
 
     @Override
-    public void reset()
-    {
+    public void reset() {
       idx = start;
     }
   };
@@ -266,24 +203,37 @@ public class PTFPartition
    * Iterator exposes the index of the next location.
    * Client can invoke lead/lag relative to the next location.
    */
-  public static interface PTFPartitionIterator<T> extends Iterator<T>
-  {
+  public static interface PTFPartitionIterator<T> extends Iterator<T> {
     int getIndex();
 
-    T lead(int amt);
+    T lead(int amt) throws HiveException;
 
-    T lag(int amt);
+    T lag(int amt) throws HiveException;
 
     /*
-     * after a lead and lag call, allow Object associated with SerDe and writable associated with partition to be reset
+     * after a lead and lag call, allow Object associated with SerDe and writable associated with
+     * partition to be reset
      * to the value for the current Index.
      */
-    Object resetToIndex(int idx);
+    Object resetToIndex(int idx) throws HiveException;
 
     PTFPartition getPartition();
 
-    void reset();
+    void reset() throws HiveException;
   }
 
+  public static PTFPartition create(HiveConf cfg,
+      SerDe serDe,
+      StructObjectInspector inputOI,
+      StructObjectInspector outputOI)
+      throws HiveException {
+    return new PTFPartition(cfg, serDe, inputOI, outputOI);
+  }
+
+  public static StructObjectInspector setupPartitionOutputOI(SerDe serDe,
+      StructObjectInspector tblFnOI) throws SerDeException {
+    return (StructObjectInspector) ObjectInspectorUtils.getStandardObjectInspector(tblFnOI,
+        ObjectInspectorCopyOption.WRITABLE);
+  }
 
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/PartitionKeySampler.java Thu Sep 12 01:21:10 2013
@@ -20,6 +20,13 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Random;
+
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -36,13 +43,6 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Random;
-
 public class PartitionKeySampler implements OutputCollector<HiveKey, Object> {
 
   public static final Comparator<byte[]> C = new Comparator<byte[]>() {
@@ -51,7 +51,7 @@ public class PartitionKeySampler impleme
     }
   };
 
-  private List<byte[]> sampled = new ArrayList<byte[]>();
+  private final List<byte[]> sampled = new ArrayList<byte[]>();
 
   public void addSampleFile(Path inputPath, JobConf job) throws IOException {
     FileSystem fs = inputPath.getFileSystem(job);
@@ -134,6 +134,8 @@ public class PartitionKeySampler impleme
     private float samplePercent = 0.1f;
     private final Random random = new Random();
 
+    private int sampled;
+
     public FetchSampler(FetchWork work, JobConf job, Operator<?> operator) {
       super(work, job, operator, null);
     }
@@ -148,12 +150,22 @@ public class PartitionKeySampler impleme
 
     @Override
     public boolean pushRow() throws IOException, HiveException {
-      InspectableObject row = getNextRow();
-      if (row != null && random.nextFloat() < samplePercent) {
-        sampleNum--;
-        pushRow(row);
+      if (!super.pushRow()) {
+        return false;
+      }
+      if (sampled < sampleNum) {
+        return true;
+      }
+      operator.flush();
+      return false;
+    }
+
+    @Override
+    protected void pushRow(InspectableObject row) throws HiveException {
+      if (random.nextFloat() < samplePercent) {
+        sampled++;
+        super.pushRow(row);
       }
-      return sampleNum > 0 && row != null;
     }
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Thu Sep 12 01:21:10 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -43,14 +42,14 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.Text;
 
 /**
  * Reduce Sink Operator sends output to the reduce stage.
  **/
 public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
-    implements Serializable {
+    implements Serializable, TopNHash.BinaryCollector {
 
   private static final long serialVersionUID = 1L;
 
@@ -90,6 +89,9 @@ public class ReduceSinkOperator extends 
     return inputAlias;
   }
 
+  // picks topN K:V pairs from input. can be null
+  private transient TopNHash reducerHash;
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
 
@@ -131,6 +133,8 @@ public class ReduceSinkOperator extends 
           .newInstance();
       valueSerializer.initialize(null, valueTableDesc.getProperties());
 
+      reducerHash = createTopKHash();
+
       firstRow = true;
       initializeChildren(hconf);
     } catch (Exception e) {
@@ -139,6 +143,23 @@ public class ReduceSinkOperator extends 
     }
   }
 
+  private TopNHash createTopKHash() {
+    int limit = conf.getTopN();
+    float percent = conf.getTopNMemoryUsage();
+    if (limit < 0 || percent <= 0) {
+      return null;
+    }
+    if (limit == 0) {
+      return TopNHash.create0();
+    }
+    // limit * 64 : compensation of arrays for key/value/hashcodes
+    long threshold = (long) (percent * Runtime.getRuntime().maxMemory()) - limit * 64;
+    if (threshold < 0) {
+      return null;
+    }
+    return TopNHash.create(conf.isMapGroupBy(), limit, threshold, this);
+  }
+
   transient InspectableObject tempInspectableObject = new InspectableObject();
   protected transient HiveKey keyWritable = new HiveKey();
   protected transient Writable value;
@@ -147,12 +168,24 @@ public class ReduceSinkOperator extends 
   transient StructObjectInspector valueObjectInspector;
   transient ObjectInspector[] partitionObjectInspectors;
 
-  protected transient Object[][] cachedKeys;
   protected transient Object[] cachedValues;
   protected transient List<List<Integer>> distinctColIndices;
-
+  /**
+   * This two dimensional array holds key data and a corresponding Union object
+   * which contains the tag identifying the aggregate expression for distinct columns.
+   *
+   * If there is no distict expression, cachedKeys is simply like this.
+   * cachedKeys[0] = [col0][col1]
+   *
+   * with two distict expression, union(tag:key) is attatched for each distinct expression
+   * cachedKeys[0] = [col0][col1][0:dist1]
+   * cachedKeys[1] = [col0][col1][1:dist2]
+   *
+   * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
+   * see {@link ExprNodeColumnEvaluator}
+   */
+  protected transient Object[][] cachedKeys;
   boolean firstRow;
-
   protected transient Random random;
 
   /**
@@ -198,6 +231,7 @@ public class ReduceSinkOperator extends 
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void processOp(Object row, int tag) throws HiveException {
     try {
       ObjectInspector rowInspector = inputObjInspectors[tag];
@@ -241,8 +275,6 @@ public class ReduceSinkOperator extends 
       for (int i = 0; i < valueEval.length; i++) {
         cachedValues[i] = valueEval[i].evaluate(row);
       }
-      // Serialize the value
-      value = valueSerializer.serialize(cachedValues, valueObjectInspector);
 
       // Evaluate the keys
       Object[] distributionKeys = new Object[numDistributionKeys];
@@ -267,6 +299,8 @@ public class ReduceSinkOperator extends 
         // no distinct key
         System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
       }
+
+      BytesWritable value = null;
       // Serialize the keys and append the tag
       for (int i = 0; i < cachedKeys.length; i++) {
         if (keyIsText) {
@@ -294,26 +328,85 @@ public class ReduceSinkOperator extends 
           }
         }
         keyWritable.setHashCode(keyHashCode);
-        if (out != null) {
-          out.collect(keyWritable, value);
-          // Since this is a terminal operator, update counters explicitly -
-          // forward is not called
-          if (counterNameToEnum != null) {
-            ++outputRows;
-            if (outputRows % 1000 == 0) {
-              incrCounter(numOutputRowsCntr, outputRows);
-              outputRows = 0;
+
+        if (reducerHash == null) {
+          if (null != out) {
+            collect(keyWritable, value = getValue(row, value));
+          }
+       } else {
+          int index = reducerHash.indexOf(keyWritable);
+          if (index == TopNHash.EXCLUDED) {
+            continue;
+          }
+          value = getValue(row, value);
+          if (index >= 0) {
+            reducerHash.set(index, value);
+          } else {
+            if (index == TopNHash.FORWARD) {
+              collect(keyWritable, value);
+            } else if (index == TopNHash.FLUSH) {
+              LOG.info("Top-N hash is flushed");
+              reducerHash.flush();
+              // we can now retry adding key/value into hash, which is flushed.
+              // but for simplicity, just forward them
+              collect(keyWritable, value);
+            } else if (index == TopNHash.DISABLE) {
+              LOG.info("Top-N hash is disabled");
+              reducerHash.flush();
+              collect(keyWritable, value);
+              reducerHash = null;
             }
           }
         }
       }
-    } catch (SerDeException e) {
-      throw new HiveException(e);
-    } catch (IOException e) {
+    } catch (HiveException e) {
+      throw e;
+    } catch (Exception e) {
       throw new HiveException(e);
     }
   }
 
+  public void collect(BytesWritable key, BytesWritable value) throws IOException {
+    // Since this is a terminal operator, update counters explicitly -
+    // forward is not called
+    out.collect(key, value);
+    if (++outputRows % 1000 == 0) {
+      if (counterNameToEnum != null) {
+        incrCounter(numOutputRowsCntr, outputRows);
+      }
+      increaseForward(outputRows);
+      outputRows = 0;
+    }
+  }
+
+  // evaluate value lazily
+  private BytesWritable getValue(Object row, BytesWritable value) throws Exception {
+    if (value != null) {
+      return value;
+    }
+    // Evaluate the value
+    for (int i = 0; i < valueEval.length; i++) {
+      cachedValues[i] = valueEval[i].evaluate(row);
+    }
+    // Serialize the value
+    return (BytesWritable) valueSerializer.serialize(cachedValues, valueObjectInspector);
+  }
+
+  @Override
+  protected void closeOp(boolean abort) throws HiveException {
+    if (!abort && reducerHash != null) {
+      try {
+        reducerHash.flush();
+      } catch (IOException e) {
+        throw new HiveException(e);
+      } finally {
+        reducerHash = null;
+      }
+    }
+    reducerHash = null;
+    super.closeOp(abort);
+  }
+
   /**
    * @return the name of the operator
    */

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java Thu Sep 12 01:21:10 2013
@@ -51,8 +51,10 @@ public class RowSchema implements Serial
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder('(');
-    for (ColumnInfo col: signature) {
-      sb.append(col.toString());
+    if (signature != null) {
+      for (ColumnInfo col : signature) {
+        sb.append(col.toString());
+      }
     }
     sb.append(')');
     return sb.toString();

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Thu Sep 12 01:21:10 2013
@@ -64,10 +64,10 @@ public class SMBMapJoinOperator extends 
   private MapredLocalWork localWork = null;
   private Map<String, MergeQueue> aliasToMergeQueue = Collections.emptyMap();
 
-  transient ArrayList<Object>[] keyWritables;
-  transient ArrayList<Object>[] nextKeyWritables;
-  RowContainer<ArrayList<Object>>[] nextGroupStorage;
-  RowContainer<ArrayList<Object>>[] candidateStorage;
+  transient List<Object>[] keyWritables;
+  transient List<Object>[] nextKeyWritables;
+  RowContainer<List<Object>>[] nextGroupStorage;
+  RowContainer<List<Object>>[] candidateStorage;
 
   transient String[] tagToAlias;
   private transient boolean[] fetchDone;
@@ -136,12 +136,12 @@ public class SMBMapJoinOperator extends 
     }
 
     for (byte pos = 0; pos < order.length; pos++) {
-      RowContainer rc = JoinUtil.getRowContainer(hconf,
+      RowContainer<List<Object>> rc = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors[pos],
           pos, bucketSize,spillTableDesc, conf, !hasFilter(pos),
           reporter);
       nextGroupStorage[pos] = rc;
-      RowContainer candidateRC = JoinUtil.getRowContainer(hconf,
+      RowContainer<List<Object>> candidateRC = JoinUtil.getRowContainer(hconf,
           rowContainerStandardObjectInspectors[pos],
           pos, bucketSize,spillTableDesc, conf, !hasFilter(pos),
           reporter);
@@ -435,7 +435,7 @@ public class SMBMapJoinOperator extends 
   private void promoteNextGroupToCandidate(Byte t) throws HiveException {
     this.keyWritables[t] = this.nextKeyWritables[t];
     this.nextKeyWritables[t] = null;
-    RowContainer<ArrayList<Object>> oldRowContainer = this.candidateStorage[t];
+    RowContainer<List<Object>> oldRowContainer = this.candidateStorage[t];
     oldRowContainer.clear();
     this.candidateStorage[t] = this.nextGroupStorage[t];
     this.nextGroupStorage[t] = oldRowContainer;
@@ -479,10 +479,10 @@ public class SMBMapJoinOperator extends 
 
   private int[] findSmallestKey() {
     int[] result = new int[order.length];
-    ArrayList<Object> smallestOne = null;
+    List<Object> smallestOne = null;
 
     for (byte pos = 0; pos < order.length; pos++) {
-      ArrayList<Object> key = keyWritables[pos];
+      List<Object> key = keyWritables[pos];
       if (key == null) {
         continue;
       }
@@ -501,7 +501,7 @@ public class SMBMapJoinOperator extends 
 
   private boolean processKey(byte alias, ArrayList<Object> key)
       throws HiveException {
-    ArrayList<Object> keyWritable = keyWritables[alias];
+    List<Object> keyWritable = keyWritables[alias];
     if (keyWritable == null) {
       //the first group.
       keyWritables[alias] = key;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Thu Sep 12 01:21:10 2013
@@ -124,4 +124,9 @@ public class SelectOperator extends Oper
   public boolean supportUnionRemoveOptimization() {
     return true;
   }
+
+  @Override
+  public boolean acceptLimitPushdown() {
+    return true;
+  }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Thu Sep 12 01:21:10 2013
@@ -19,12 +19,16 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.io.Serializable;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,12 +38,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
-import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -271,8 +275,6 @@ public class StatsTask extends Task<Stat
     try {
       // Stats setup:
       Warehouse wh = new Warehouse(conf);
-      FileSystem fileSys;
-      FileStatus[] fileStatus;
 
       if (!this.getWork().getNoStatsAggregator()) {
         String statsImplementationClass = HiveConf.getVar(conf, HiveConf.ConfVars.HIVESTATSDBCLASS);
@@ -322,16 +324,9 @@ public class StatsTask extends Task<Stat
         if (!tableStatsExist && atomic) {
           return 0;
         }
-        Path tablePath = wh.getTablePath(db.getDatabase(table.getDbName()), table.getTableName());
-        fileSys = tablePath.getFileSystem(conf);
-        fileStatus = Utilities.getFileStatusRecurse(tablePath, 1, fileSys);
-
-        tblStats.setStat(StatsSetupConst.NUM_FILES, fileStatus.length);
-        long tableSize = 0L;
-        for (int i = 0; i < fileStatus.length; i++) {
-          tableSize += fileStatus[i].getLen();
-        }
-        tblStats.setStat(StatsSetupConst.TOTAL_SIZE, tableSize);
+        long[] summary = summary(conf, table);
+        tblStats.setStat(StatsSetupConst.NUM_FILES, summary[0]);
+        tblStats.setStat(StatsSetupConst.TOTAL_SIZE, summary[1]);
 
         // In case of a non-partitioned table, the key for stats temporary store is "rootDir"
         if (statsAggregator != null) {
@@ -403,18 +398,9 @@ public class StatsTask extends Task<Stat
             }
           }
 
-          fileSys = partn.getPartitionPath().getFileSystem(conf);
-          /* consider sub-directory created from list bucketing. */
-          int listBucketingDepth = calculateListBucketingDMLDepth(partn);
-          fileStatus = Utilities.getFileStatusRecurse(partn.getPartitionPath(),
-              (1 + listBucketingDepth), fileSys);
-          newPartStats.setStat(StatsSetupConst.NUM_FILES, fileStatus.length);
-
-          long partitionSize = 0L;
-          for (int i = 0; i < fileStatus.length; i++) {
-            partitionSize += fileStatus[i].getLen();
-          }
-          newPartStats.setStat(StatsSetupConst.TOTAL_SIZE, partitionSize);
+          long[] summary = summary(conf, partn);
+          newPartStats.setStat(StatsSetupConst.NUM_FILES, summary[0]);
+          newPartStats.setStat(StatsSetupConst.TOTAL_SIZE, summary[1]);
 
           if (hasStats) {
             PartitionStatistics oldPartStats = new PartitionStatistics(currentValues);
@@ -478,26 +464,103 @@ public class StatsTask extends Task<Stat
     return ret;
   }
 
-  /**
-   * List bucketing will introduce sub-directories.
-   *
-   * calculate it here in order to go to the leaf directory
-   *
-   * so that we can count right number of files.
-   *
-   * @param partn
-   * @return
-   */
-  private int calculateListBucketingDMLDepth(Partition partn) {
-    // list bucketing will introduce more files
-    int listBucketingDepth = 0;
-    if ((partn.getSkewedColNames() != null) && (partn.getSkewedColNames().size() > 0)
-        && (partn.getSkewedColValues() != null) && (partn.getSkewedColValues().size() > 0)
-        && (partn.getSkewedColValueLocationMaps() != null)
-        && (partn.getSkewedColValueLocationMaps().size() > 0)) {
-      listBucketingDepth = partn.getSkewedColNames().size();
+  private long[] summary(HiveConf conf, Partition partn) throws IOException {
+    Path path = partn.getPartitionPath();
+    FileSystem fs = path.getFileSystem(conf);
+    List<String> skewedColNames = partn.getSkewedColNames();
+    if (skewedColNames == null || skewedColNames.isEmpty()) {
+      return summary(fs, path);
+    }
+    List<List<String>> skewColValues = table.getSkewedColValues();
+    if (skewColValues == null || skewColValues.isEmpty()) {
+      return summary(fs, toDefaultLBPath(path));
+    }
+    return summary(fs, path, skewedColNames);
+  }
+
+  private long[] summary(HiveConf conf, Table table) throws IOException {
+    Path path = table.getPath();
+    FileSystem fs = path.getFileSystem(conf);
+    List<String> skewedColNames = table.getSkewedColNames();
+    if (skewedColNames == null || skewedColNames.isEmpty()) {
+      return summary(fs, path);
+    }
+    List<List<String>> skewColValues = table.getSkewedColValues();
+    if (skewColValues == null || skewColValues.isEmpty()) {
+      return summary(fs, toDefaultLBPath(path));
+    }
+    return summary(fs, path, table.getSkewedColNames());
+  }
+
+  private Path toDefaultLBPath(Path path) {
+    return new Path(path, ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME);
+  }
+
+  private long[] summary(FileSystem fs, Path path) throws IOException {
+    try {
+      FileStatus status = fs.getFileStatus(path);
+      if (!status.isDir()) {
+        return new long[] {1, status.getLen()};
+      }
+    } catch (FileNotFoundException e) {
+      return new long[] {0, 0};
+    }
+    FileStatus[] children = fs.listStatus(path);  // can be null
+    if (children == null) {
+      return new long[] {0, 0};
+    }
+    long numFiles = 0L;
+    long tableSize = 0L;
+    for (FileStatus child : children) {
+      if (!child.isDir()) {
+        tableSize += child.getLen();
+        numFiles++;
+      }
+    }
+    return new long[] {numFiles, tableSize};
+  }
+
+  private Pattern toPattern(List<String> skewCols) {
+    StringBuilder builder = new StringBuilder();
+    for (String skewCol : skewCols) {
+      if (builder.length() > 0) {
+        builder.append(Path.SEPARATOR_CHAR);
+      }
+      builder.append(skewCol).append('=');
+      builder.append("[^").append(Path.SEPARATOR_CHAR).append("]*");
+    }
+    builder.append(Path.SEPARATOR_CHAR);
+    builder.append("[^").append(Path.SEPARATOR_CHAR).append("]*$");
+    return Pattern.compile(builder.toString());
+  }
+
+  private long[] summary(FileSystem fs, Path path, List<String> skewCols) throws IOException {
+    long numFiles = 0L;
+    long tableSize = 0L;
+    Pattern pattern = toPattern(skewCols);
+    for (FileStatus status : Utilities.getFileStatusRecurse(path, skewCols.size() + 1, fs)) {
+      if (status.isDir()) {
+        continue;
+      }
+      String relative = toRelativePath(path, status.getPath());
+      if (relative == null) {
+        continue;
+      }
+      if (relative.startsWith(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME) ||
+        pattern.matcher(relative).matches()) {
+        tableSize += status.getLen();
+        numFiles++;
+      }
+    }
+    return new long[] {numFiles, tableSize};
+  }
+
+  private String toRelativePath(Path path1, Path path2) {
+    URI relative = path1.toUri().relativize(path2.toUri());
+    if (relative == path2.toUri()) {
+      return null;
     }
-    return listBucketingDepth;
+    return relative.getPath();
   }
 
   private boolean existStats(Map<String, String> parameters) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Thu Sep 12 01:21:10 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -241,6 +242,7 @@ public class TableScanOperator extends O
   // and 2) it will fail some join and union queries if this is added forcibly
   // into tableScanDesc
   java.util.ArrayList<Integer> neededColumnIDs;
+  List<String> neededColumns;
 
   public void setNeededColumnIDs(java.util.ArrayList<Integer> orign_columns) {
     neededColumnIDs = orign_columns;
@@ -250,6 +252,14 @@ public class TableScanOperator extends O
     return neededColumnIDs;
   }
 
+  public void setNeededColumns(List<String> columnNames) {
+    neededColumns = columnNames;
+  }
+
+  public List<String> getNeededColumns() {
+    return neededColumns;
+  }
+
   @Override
   public OperatorType getType() {
     return OperatorType.TABLESCAN;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Thu Sep 12 01:21:10 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.hive.ql.plan.ap
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
 import org.apache.hadoop.hive.ql.udf.generic.UDTFCollector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
@@ -66,7 +65,7 @@ public class UDTFOperator extends Operat
 
     // Make an object inspector [] of the arguments to the UDTF
     List<? extends StructField> inputFields =
-        ((StandardStructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs();
+        ((StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs();
 
     udtfInputOIs = new ObjectInspector[inputFields.size()];
     for (int i = 0; i < inputFields.size(); i++) {
@@ -104,7 +103,7 @@ public class UDTFOperator extends Operat
   @Override
   public void processOp(Object row, int tag) throws HiveException {
     // The UDTF expects arguments in an object[]
-    StandardStructObjectInspector soi = (StandardStructObjectInspector) inputObjInspectors[tag];
+    StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
     List<? extends StructField> fields = soi.getAllStructFieldRefs();
 
     for (int i = 0; i < fields.size(); i++) {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Sep 12 01:21:10 2013
@@ -102,6 +102,8 @@ import org.apache.hadoop.hive.ql.ErrorMs
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
+import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -112,6 +114,13 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.rcfile.merge.RCFileMergeMapper;
+import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanMapper;
+import org.apache.hadoop.hive.ql.io.rcfile.stats.PartialScanWork;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
+import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -165,6 +174,10 @@ import org.apache.hadoop.mapred.Sequence
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Shell;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
 /**
  * Utilities.
  *
@@ -252,7 +265,7 @@ public final class Utilities {
     return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
   }
 
-  public static BaseWork getBaseWork(Configuration conf, String name) {
+  private static BaseWork getBaseWork(Configuration conf, String name) {
     BaseWork gWork = null;
     Path path = null;
     try {
@@ -260,16 +273,32 @@ public final class Utilities {
       assert path != null;
       gWork = gWorkMap.get(path);
       if (gWork == null) {
-        String jtConf = ShimLoader.getHadoopShims().getJobLauncherRpcAddress(conf);
         Path localPath;
-        if (jtConf.equals("local")) {
+        if (ShimLoader.getHadoopShims().isLocalMode(conf)) {
           localPath = path;
         } else {
           localPath = new Path(name);
         }
         InputStream in = new FileInputStream(localPath.toUri().getPath());
-        BaseWork ret = deserializeObject(in);
-        gWork = ret;
+        if(MAP_PLAN_NAME.equals(name)){
+          if (ExecMapper.class.getName().equals(conf.get("mapred.mapper.class"))){
+            gWork = deserializePlan(in, MapWork.class, conf);
+          } else if(RCFileMergeMapper.class.getName().equals(conf.get("mapred.mapper.class"))) {
+            gWork = deserializePlan(in, MergeWork.class, conf);
+          } else if(ColumnTruncateMapper.class.getName().equals(conf.get("mapred.mapper.class"))) {
+            gWork = deserializePlan(in, ColumnTruncateWork.class, conf);
+          } else if(PartialScanMapper.class.getName().equals(conf.get("mapred.mapper.class"))) {
+            gWork = deserializePlan(in, PartialScanWork.class,conf);
+          } else {
+            assert false;
+          }
+        } else {
+          if(ExecReducer.class.getName().equals(conf.get("mapred.reducer.class"))) {
+            gWork = deserializePlan(in, ReduceWork.class, conf);
+          } else {
+            assert false;
+          }
+        }
         gWorkMap.put(path, gWork);
       }
       return gWork;
@@ -479,7 +508,7 @@ public final class Utilities {
       // use the default file system of the conf
       FileSystem fs = planPath.getFileSystem(conf);
       FSDataOutputStream out = fs.create(planPath);
-      serializeObject(w, out);
+      serializePlan(w, out, conf);
 
       // Serialize the plan to the default hdfs instance
       // Except for hadoop local mode execution where we should be
@@ -586,11 +615,82 @@ public final class Utilities {
     }
   }
 
+   /** Custom Kryo serializer for sql date, otherwise Kryo gets confused between
+   java.sql.Date and java.util.Date while deserializing
+   */
+  private static class SqlDateSerializer extends
+    com.esotericsoftware.kryo.Serializer<java.sql.Date> {
+
+    @Override
+    public java.sql.Date read(Kryo kryo, Input input, Class<java.sql.Date> clazz) {
+      return new java.sql.Date(input.readLong());
+    }
+
+    @Override
+    public void write(Kryo kryo, Output output, java.sql.Date sqlDate) {
+      output.writeLong(sqlDate.getTime());
+    }
+
+  }
+
+  /**
+   * Serializes the plan.
+   * @param plan The plan, such as QueryPlan, MapredWork, etc.
+   * @param out The stream to write to.
+   * @param conf to pick which serialization format is desired.
+   */
+  public static void serializePlan(Object plan, OutputStream out, Configuration conf) {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.SERIALIZE_PLAN);
+    if(conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo").equals("javaXML")) {
+      serializeObjectByJavaXML(plan, out);
+    } else {
+      serializeObjectByKryo(plan, out);
+    }
+    perfLogger.PerfLogEnd(LOG, PerfLogger.SERIALIZE_PLAN);
+  }
+
+  /**
+   * Deserializes the plan.
+   * @param in The stream to read from.
+   * @return The plan, such as QueryPlan, MapredWork, etc.
+   * @param To know what serialization format plan is in
+   */
+  public static <T> T deserializePlan(InputStream in, Class<T> planClass, Configuration conf) {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.DESERIALIZE_PLAN);
+    T plan;
+    if(conf.get(HiveConf.ConfVars.PLAN_SERIALIZATION.varname, "kryo").equals("javaXML")) {
+      plan = deserializeObjectByJavaXML(in);
+    } else {
+      plan = deserializeObjectByKryo(in, planClass);
+    }
+    perfLogger.PerfLogEnd(LOG, PerfLogger.DESERIALIZE_PLAN);
+    return plan;
+  }
+
+  /**
+   * Clones using the powers of XML. Do not use unless necessary.
+   * @param plan The plan.
+   * @return The clone.
+   */
+  public static MapredWork clonePlan(MapredWork plan) {
+    // TODO: need proper clone. Meanwhile, let's at least keep this horror in one place
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.CLONE_PLAN);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    Utilities.serializeObjectByJavaXML(plan, baos);
+    MapredWork newPlan = Utilities.deserializeObjectByJavaXML(
+      new ByteArrayInputStream(baos.toByteArray()));
+    perfLogger.PerfLogEnd(LOG, PerfLogger.CLONE_PLAN);
+    return newPlan;
+  }
+
   /**
    * Serialize the object. This helper function mainly makes sure that enums,
    * counters, etc are handled properly.
    */
-  public static void serializeObject(Object plan, OutputStream out) {
+  private static void serializeObjectByJavaXML(Object plan, OutputStream out) {
     XMLEncoder e = new XMLEncoder(out);
     e.setExceptionListener(new ExceptionListener() {
       public void exceptionThrown(Exception e) {
@@ -613,11 +713,21 @@ public final class Utilities {
   }
 
   /**
+   * @param plan Usually of type MapredWork, MapredLocalWork etc.
+   * @param out stream in which serialized plan is written into
+   */
+  private static void serializeObjectByKryo(Object plan, OutputStream out) {
+    Output output = new Output(out);
+    kryo.get().writeObject(output, plan);
+    output.close();
+  }
+
+  /**
    * De-serialize an object. This helper function mainly makes sure that enums,
    * counters, etc are handled properly.
    */
   @SuppressWarnings("unchecked")
-  public static <T> T deserializeObject(InputStream in) {
+  private static <T> T deserializeObjectByJavaXML(InputStream in) {
     XMLDecoder d = null;
     try {
       d = new XMLDecoder(in, null, null);
@@ -629,6 +739,27 @@ public final class Utilities {
     }
   }
 
+  private static <T> T deserializeObjectByKryo(InputStream in, Class<T> clazz ) {
+    Input inp = new Input(in);
+    T t = kryo.get().readObject(inp,clazz);
+    inp.close();
+    return t;
+  }
+
+  // Kryo is not thread-safe,
+  // Also new Kryo() is expensive, so we want to do it just once.
+  private static ThreadLocal<Kryo> kryo = new ThreadLocal<Kryo>() {
+
+    @Override
+    protected synchronized Kryo initialValue() {
+      Kryo kryo = new Kryo();
+      kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+      kryo.register(java.sql.Date.class, new SqlDateSerializer());
+      return kryo;
+    };
+  };
+
+
   public static TableDesc defaultTd;
   static {
     // by default we expect ^A separated strings
@@ -1802,6 +1933,8 @@ public final class Utilities {
    */
   public static ContentSummary getInputSummary(Context ctx, MapWork work, PathFilter filter)
       throws IOException {
+    PerfLogger perfLogger = PerfLogger.getPerfLogger();
+    perfLogger.PerfLogBegin(LOG, PerfLogger.INPUT_SUMMARY);
 
     long[] summary = {0, 0, 0};
 
@@ -1937,6 +2070,7 @@ public final class Utilities {
               + cs.getFileCount() + " directory count: " + cs.getDirectoryCount());
         }
 
+        perfLogger.PerfLogEnd(LOG, PerfLogger.INPUT_SUMMARY);
         return new ContentSummary(summary[0], summary[1], summary[2]);
       } finally {
         HiveInterruptUtils.remove(interrup);
@@ -2090,7 +2224,7 @@ public final class Utilities {
       if (columnTypes.length() > 0) {
         columnTypes.append(",");
       }
-      columnTypes.append(colInfo.getType().getTypeName());
+      columnTypes.append(colInfo.getTypeName());
     }
     String columnTypesString = columnTypes.toString();
     jobConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypesString);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Thu Sep 12 01:21:10 2013
@@ -717,12 +717,12 @@ public class ExecDriver extends Task<Map
     int ret;
     if (localtask) {
       memoryMXBean = ManagementFactory.getMemoryMXBean();
-      MapredLocalWork plan = (MapredLocalWork) Utilities.deserializeObject(pathData);
+      MapredLocalWork plan = Utilities.deserializePlan(pathData, MapredLocalWork.class, conf);
       MapredLocalTask ed = new MapredLocalTask(plan, conf, isSilent);
       ret = ed.executeFromChildJVM(new DriverContext());
 
     } else {
-      MapredWork plan = (MapredWork) Utilities.deserializeObject(pathData);
+      MapredWork plan = Utilities.deserializePlan(pathData, MapredWork.class, conf);
       ExecDriver ed = new ExecDriver(plan, conf, isSilent);
       ret = ed.execute(new DriverContext());
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Thu Sep 12 01:21:10 2013
@@ -35,10 +35,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.MapRedStats;
+import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskHandle;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
 import org.apache.hadoop.hive.ql.plan.ReducerTimeStatsPerJob;
 import org.apache.hadoop.hive.ql.session.SessionState;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Thu Sep 12 01:21:10 2013
@@ -39,14 +39,11 @@ import org.apache.hadoop.hive.ql.DriverC
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.mapred.JobConf;
 /**
  * Extension of ExecDriver:
  * - can optionally spawn a map-reduce task from a separate jvm
@@ -69,7 +66,6 @@ public class MapRedTask extends ExecDriv
   private transient ContentSummary inputSummary = null;
   private transient boolean runningViaChild = false;
 
-  private transient boolean inputSizeEstimated = false;
   private transient long totalInputFileSize;
   private transient long totalInputNumFiles;
 
@@ -79,10 +75,6 @@ public class MapRedTask extends ExecDriv
     super();
   }
 
-  public MapRedTask(MapredWork plan, JobConf job, boolean isSilent) throws HiveException {
-    throw new RuntimeException("Illegal Constructor call");
-  }
-
   @Override
   public int execute(DriverContext driverContext) {
 
@@ -181,7 +173,7 @@ public class MapRedTask extends ExecDriv
       OutputStream out = FileSystem.getLocal(conf).create(planPath);
       MapredWork plan = getWork();
       LOG.info("Generating plan file " + planPath.toString());
-      Utilities.serializeObject(plan, out);
+      Utilities.serializePlan(plan, out, conf);
 
       String isSilent = "true".equalsIgnoreCase(System
           .getProperty("test.silent")) ? "-nolog" : "";
@@ -408,7 +400,7 @@ public class MapRedTask extends ExecDriv
         if (inputSummary == null) {
           inputSummary =  Utilities.getInputSummary(driverContext.getCtx(), work.getMapWork(), null);
         }
-        int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(), 
+        int reducers = Utilities.estimateNumberOfReducers(conf, inputSummary, work.getMapWork(),
                                                           work.isFinalMapRed());
         rWork.setNumReduceTasks(reducers);
         console

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Thu Sep 12 01:21:10 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.m
 
 import java.io.File;
 import java.io.IOException;
+import java.io.ObjectOutputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.lang.management.ManagementFactory;
@@ -52,9 +53,8 @@ import org.apache.hadoop.hive.ql.exec.Ta
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.Utilities.StreamPrinter;
-import org.apache.hadoop.hive.ql.exec.persistence.AbstractMapJoinKey;
-import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
-import org.apache.hadoop.hive.ql.exec.persistence.MapJoinObjectValue;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
@@ -134,14 +134,13 @@ public class MapredLocalTask extends Tas
       String hiveJar = conf.getJar();
 
       String hadoopExec = conf.getVar(HiveConf.ConfVars.HADOOPBIN);
-      String libJarsOption;
 
       // write out the plan to a local file
       Path planPath = new Path(ctx.getLocalTmpFileURI(), "plan.xml");
       OutputStream out = FileSystem.getLocal(conf).create(planPath);
       MapredLocalWork plan = getWork();
       LOG.info("Generating plan file " + planPath.toString());
-      Utilities.serializeObject(plan, out);
+      Utilities.serializePlan(plan, out, conf);
 
       String isSilent = "true".equalsIgnoreCase(System.getProperty("test.silent")) ? "-nolog" : "";
 
@@ -319,14 +318,13 @@ public class MapredLocalTask extends Tas
       long elapsed = currentTime - startTime;
       console.printInfo(Utilities.now() + "\tEnd of local task; Time Taken: "
           + Utilities.showTime(elapsed) + " sec.");
-    } catch (Throwable e) {
-      if (e instanceof OutOfMemoryError
-          || (e instanceof HiveException && e.getMessage().equals("RunOutOfMeomoryUsage"))) {
-        // Don't create a new object if we are already out of memory
+    } catch (Throwable throwable) {
+      if (throwable instanceof OutOfMemoryError
+          || (throwable instanceof MapJoinMemoryExhaustionException)) {
+        l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable);
         return 3;
       } else {
-        l4j.error("Hive Runtime Error: Map local work failed");
-        e.printStackTrace();
+        l4j.error("Hive Runtime Error: Map local work failed", throwable);
         return 2;
       }
     }
@@ -336,7 +334,6 @@ public class MapredLocalTask extends Tas
   private void startForward(boolean inputFileChangeSenstive, String bigTableBucket)
       throws Exception {
     for (Map.Entry<String, FetchOperator> entry : fetchOperators.entrySet()) {
-      int fetchOpRows = 0;
       String alias = entry.getKey();
       FetchOperator fetchOp = entry.getValue();
 
@@ -364,7 +361,6 @@ public class MapredLocalTask extends Tas
           forwardOp.close(false);
           break;
         }
-        fetchOpRows++;
         forwardOp.process(row.o, 0);
         // check if any operator had a fatal error or early exit during
         // execution
@@ -425,7 +421,8 @@ public class MapredLocalTask extends Tas
     }
   }
 
-  private void generateDummyHashTable(String alias, String bigBucketFileName) throws HiveException,IOException {
+  private void generateDummyHashTable(String alias, String bigBucketFileName)
+      throws HiveException,IOException {
     // find the (byte)tag for the map join(HashTableSinkOperator)
     Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
     Operator<? extends OperatorDesc> childOp = parentOp.getChildOperators().get(0);
@@ -442,8 +439,6 @@ public class MapredLocalTask extends Tas
 
     // generate empty hashtable for this (byte)tag
     String tmpURI = this.getWork().getTmpFileURI();
-    HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue> hashTable =
-      new HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>();
 
     String fileName = work.getBucketFileName(bigBucketFileName);
 
@@ -453,12 +448,14 @@ public class MapredLocalTask extends Tas
     console.printInfo(Utilities.now() + "\tDump the hashtable into file: " + tmpURIPath);
     Path path = new Path(tmpURIPath);
     FileSystem fs = path.getFileSystem(job);
-    File file = new File(path.toUri().getPath());
-    fs.create(path);
-    long fileLength = hashTable.flushMemoryCacheToPersistent(file);
+    ObjectOutputStream out = new ObjectOutputStream(fs.create(path));
+    try {
+      MapJoinTableContainerSerDe.persistDummyTable(out);
+    } finally {
+      out.close();
+    }
     console.printInfo(Utilities.now() + "\tUpload 1 File to: " + tmpURIPath + " File size: "
-        + fileLength);
-    hashTable.close();
+        + fs.getFileStatus(path).getLen());
   }
 
   private void setUpFetchOpContext(FetchOperator fetchOp, String alias, String currentInputFile)

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/AbstractRowContainer.java Thu Sep 12 01:21:10 2013
@@ -20,17 +20,17 @@ package org.apache.hadoop.hive.ql.exec.p
 
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 
-public abstract class AbstractRowContainer<Row> {
+public abstract class AbstractRowContainer<ROW> {
 
   public AbstractRowContainer() {
 
   }
 
-  public abstract void add(Row t) throws HiveException;
+  public abstract void add(ROW t) throws HiveException;
 
-  public abstract Row first() throws HiveException;
+  public abstract ROW first() throws HiveException;
 
-  public abstract Row next() throws HiveException;
+  public abstract ROW next() throws HiveException;
 
   /**
    * Get the number of elements in the RowContainer.

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1522098&r1=1522097&r2=1522098&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Thu Sep 12 01:21:10 2013
@@ -18,26 +18,14 @@
 
 package org.apache.hadoop.hive.ql.exec.persistence;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.io.Serializable;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.text.NumberFormat;
 import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 
 
 /**
@@ -47,26 +35,17 @@ import org.apache.hadoop.hive.ql.session
  * hash table.
  */
 
-public class HashMapWrapper<K, V> implements Serializable {
+public class HashMapWrapper extends AbstractMapJoinTableContainer implements Serializable {
 
   private static final long serialVersionUID = 1L;
-  protected Log LOG = LogFactory.getLog(this.getClass().getName());
+  protected static final Log LOG = LogFactory.getLog(HashMapWrapper.class);
 
   // default threshold for using main memory based HashMap
-
+  private static final String THESHOLD_NAME = "threshold";
+  private static final String LOAD_NAME = "load";
   private static final int THRESHOLD = 1000000;
   private static final float LOADFACTOR = 0.75f;
-  private static final float MEMORYUSAGE = 1;
-
-  private float maxMemoryUsage;
-  private HashMap<K, V> mHash; // main memory HashMap
-  protected transient LogHelper console;
-
-  private File dumpFile;
-  public static MemoryMXBean memoryMXBean;
-  private long maxMemory;
-  private long currentMemory;
-  private NumberFormat num;
+  private HashMap<MapJoinKey, MapJoinRowContainer> mHash; // main memory HashMap
 
   /**
    * Constructor.
@@ -74,163 +53,53 @@ public class HashMapWrapper<K, V> implem
    * @param threshold
    *          User specified threshold to store new values into persistent storage.
    */
-  public HashMapWrapper(int threshold, float loadFactor, float memoryUsage) {
-    maxMemoryUsage = memoryUsage;
-    mHash = new HashMap<K, V>(threshold, loadFactor);
-    memoryMXBean = ManagementFactory.getMemoryMXBean();
-    maxMemory = memoryMXBean.getHeapMemoryUsage().getMax();
-    LOG.info("maximum memory: " + maxMemory);
-    num = NumberFormat.getInstance();
-    num.setMinimumFractionDigits(2);
+  public HashMapWrapper(int threshold, float loadFactor) {
+    super(createConstructorMetaData(threshold, loadFactor));
+    mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
+
+  }
+  
+  public HashMapWrapper(Map<String, String> metaData) {
+    super(metaData);
+    int threshold = Integer.parseInt(metaData.get(THESHOLD_NAME));
+    float loadFactor = Float.parseFloat(metaData.get(LOAD_NAME));
+    mHash = new HashMap<MapJoinKey, MapJoinRowContainer>(threshold, loadFactor);
   }
 
   public HashMapWrapper(int threshold) {
-    this(threshold, LOADFACTOR, MEMORYUSAGE);
+    this(threshold, LOADFACTOR);
   }
 
   public HashMapWrapper() {
-    this(THRESHOLD, LOADFACTOR, MEMORYUSAGE);
+    this(THRESHOLD, LOADFACTOR);
   }
 
-  public V get(K key) {
+  @Override
+  public MapJoinRowContainer get(MapJoinKey key) {
     return mHash.get(key);
   }
 
-  public boolean put(K key, V value) throws HiveException {
-    // isAbort();
+  @Override
+  public void put(MapJoinKey key, MapJoinRowContainer value) {
     mHash.put(key, value);
-    return false;
-  }
-
-
-  public void remove(K key) {
-    mHash.remove(key);
-  }
-
-  /**
-   * Flush the main memory hash table into the persistent cache file
-   *
-   * @return persistent cache file
-   */
-  public long flushMemoryCacheToPersistent(File file) throws IOException {
-    ObjectOutputStream outputStream = null;
-    outputStream = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(file), 4096));
-    outputStream.writeObject(mHash);
-    outputStream.flush();
-    outputStream.close();
-
-    return file.length();
-  }
-
-  public void initilizePersistentHash(String fileName) throws IOException, ClassNotFoundException {
-    ObjectInputStream inputStream = null;
-    inputStream = new ObjectInputStream(new BufferedInputStream(new FileInputStream(fileName), 4096));
-    HashMap<K, V> hashtable = (HashMap<K, V>) inputStream.readObject();
-    this.setMHash(hashtable);
-
-    inputStream.close();
   }
 
+  @Override
   public int size() {
     return mHash.size();
   }
-
-  public Set<K> keySet() {
-    return mHash.keySet();
-  }
-
-
-  /**
-   * Close the persistent hash table and clean it up.
-   *
-   * @throws HiveException
-   */
-  public void close() throws HiveException {
-    mHash.clear();
+  @Override
+  public Set<Entry<MapJoinKey, MapJoinRowContainer>> entrySet() {
+    return mHash.entrySet();
   }
-
-  public void clear() throws HiveException {
+  @Override
+  public void clear() {
     mHash.clear();
   }
-
-  public int getKeySize() {
-    return mHash.size();
-  }
-
-  public boolean isAbort(long numRows,LogHelper console) {
-    int size = mHash.size();
-    long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
-    double rate = (double) usedMemory / (double) maxMemory;
-    console.printInfo(Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t"
-        + size + "\tMemory usage:\t" + usedMemory + "\trate:\t" + num.format(rate));
-    if (rate > (double) maxMemoryUsage) {
-      return true;
-    }
-    return false;
-  }
-
-  public void setLOG(Log log) {
-    LOG = log;
-  }
-
-  public HashMap<K, V> getMHash() {
-    return mHash;
+  private static Map<String, String> createConstructorMetaData(int threshold, float loadFactor) {
+    Map<String, String> metaData = new HashMap<String, String>();
+    metaData.put(THESHOLD_NAME, String.valueOf(threshold));
+    metaData.put(LOAD_NAME, String.valueOf(loadFactor));
+    return metaData;
   }
-
-  public void setMHash(HashMap<K, V> hash) {
-    mHash = hash;
-  }
-
-  public LogHelper getConsole() {
-    return console;
-  }
-
-  public void setConsole(LogHelper console) {
-    this.console = console;
-  }
-
-  public File getDumpFile() {
-    return dumpFile;
-  }
-
-  public void setDumpFile(File dumpFile) {
-    this.dumpFile = dumpFile;
-  }
-
-  public static MemoryMXBean getMemoryMXBean() {
-    return memoryMXBean;
-  }
-
-  public static void setMemoryMXBean(MemoryMXBean memoryMXBean) {
-    HashMapWrapper.memoryMXBean = memoryMXBean;
-  }
-
-  public long getMaxMemory() {
-    return maxMemory;
-  }
-
-  public void setMaxMemory(long maxMemory) {
-    this.maxMemory = maxMemory;
-  }
-
-  public long getCurrentMemory() {
-    return currentMemory;
-  }
-
-  public void setCurrentMemory(long currentMemory) {
-    this.currentMemory = currentMemory;
-  }
-
-  public NumberFormat getNum() {
-    return num;
-  }
-
-  public void setNum(NumberFormat num) {
-    this.num = num;
-  }
-
-  public static int getTHRESHOLD() {
-    return THRESHOLD;
-  }
-
 }