You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dh...@apache.org on 2008/11/11 02:50:18 UTC

svn commit: r712905 [5/38] - in /hadoop/core/trunk: ./ src/contrib/hive/ src/contrib/hive/cli/src/java/org/apache/hadoop/hive/cli/ src/contrib/hive/common/src/java/org/apache/hadoop/hive/conf/ src/contrib/hive/conf/ src/contrib/hive/data/files/ src/con...

Modified: hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=712905&r1=712904&r2=712905&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hadoop/core/trunk/src/contrib/hive/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Nov 10 17:50:06 2008
@@ -24,6 +24,7 @@
 import java.lang.reflect.Method;
 
 import org.antlr.runtime.tree.*;
+import org.apache.hadoop.hive.serde2.Deserializer;
 import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
@@ -33,7 +34,9 @@
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.*;
+import org.apache.hadoop.hive.ql.optimizer.Optimizer;
 import org.apache.hadoop.hive.ql.plan.*;
 import org.apache.hadoop.hive.ql.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.ql.typeinfo.TypeInfoFactory;
@@ -41,6 +44,7 @@
 import org.apache.hadoop.hive.ql.udf.UDFOPPositive;
 import org.apache.hadoop.hive.ql.exec.*;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapred.TextInputFormat;
 
 import org.apache.hadoop.fs.Path;
@@ -54,8 +58,12 @@
   private HashMap<String, PartitionPruner> aliasToPruner;
   private HashMap<String, SamplePruner> aliasToSamplePruner;
   private HashMap<String, Operator<? extends Serializable>> topOps;
+  private HashMap<String, Operator<? extends Serializable>> topSelOps;
+  private HashMap<Operator<? extends Serializable>, OpParseContext> opParseCtx;
   private List<loadTableDesc> loadTableWork;
   private List<loadFileDesc> loadFileWork;
+  private QB qb;
+  private CommonTree ast;
 
   private static class Phase1Ctx {
     String dest;
@@ -69,21 +77,42 @@
     this.aliasToPruner = new HashMap<String, PartitionPruner>();
     this.aliasToSamplePruner = new HashMap<String, SamplePruner>();
     this.topOps = new HashMap<String, Operator<? extends Serializable>>();
+    this.topSelOps = new HashMap<String, Operator<? extends Serializable>>();
     this.loadTableWork = new ArrayList<loadTableDesc>();
     this.loadFileWork = new ArrayList<loadFileDesc>();
+    opParseCtx = new HashMap<Operator<? extends Serializable>, OpParseContext>();
   }
 
   @Override
   protected void reset() {
     super.reset();
     this.aliasToPruner.clear();
-    this.topOps.clear();
     this.loadTableWork.clear();
     this.loadFileWork.clear();
+    this.topOps.clear();
+    this.topSelOps.clear();
+    qb = null;
+    ast = null;
+  }
+
+  public void init(ParseContext pctx) {
+    aliasToPruner = pctx.getAliasToPruner();
+    aliasToSamplePruner = pctx.getAliasToSamplePruner();
+    topOps = pctx.getTopOps();
+    topSelOps = pctx.getTopSelOps();
+    opParseCtx = pctx.getOpParseCtx();
+    loadTableWork = pctx.getLoadTableWork();
+    loadFileWork = pctx.getLoadFileWork();
+    ctx = pctx.getContext();
+  }
+
+  public ParseContext getParseContext() {
+    return new ParseContext(conf, qb, ast, aliasToPruner, aliasToSamplePruner, topOps, 
+                            topSelOps, opParseCtx, loadTableWork, loadFileWork, ctx);
   }
-
+  
   @SuppressWarnings("nls")
-  private void doPhase1QBExpr(CommonTree ast, QBExpr qbexpr, String id,
+  public void doPhase1QBExpr(CommonTree ast, QBExpr qbexpr, String id,
       String alias) throws SemanticException {
 
     assert (ast.getToken() != null);
@@ -142,7 +171,7 @@
         || expressionTree.getToken().getType() == HiveParser.TOK_FUNCTIONDI) {
       assert (expressionTree.getChildCount() != 0);
       assert (expressionTree.getChild(0).getType() == HiveParser.Identifier);
-      String functionName = expressionTree.getChild(0).getText();
+      String functionName = unescapeIdentifier(expressionTree.getChild(0).getText());
       if (FunctionRegistry.getUDAF(functionName) != null) {
         aggregations.put(expressionTree.toStringTree(), expressionTree);
         return;
@@ -195,17 +224,17 @@
       tableSamplePresent = true;
     }
     CommonTree tableTree = (CommonTree)(tabref.getChild(0));
-    String alias = tabref.getChild(aliasIndex).getText();
+    String alias = unescapeIdentifier(tabref.getChild(aliasIndex).getText());
     // If the alias is already there then we have a conflict
     if (qb.exists(alias)) {
       throw new SemanticException(ErrorMsg.AMBIGOUS_TABLE_ALIAS.getMsg(tabref.getChild(aliasIndex)));
     }
     if (tableSamplePresent) {
       CommonTree sampleClause = (CommonTree)tabref.getChild(1);
-      ArrayList<String> sampleCols = new ArrayList<String>();
+      ArrayList<CommonTree> sampleCols = new ArrayList<CommonTree>();
       if (sampleClause.getChildCount() > 2) {
         for (int i = 2; i < sampleClause.getChildCount(); i++) {
-          sampleCols.add(sampleClause.getChild(i).getText());
+          sampleCols.add((CommonTree)sampleClause.getChild(i));
         }
       }
       // TODO: For now only support sampling on up to two columns
@@ -214,13 +243,13 @@
         throw new SemanticException(ErrorMsg.SAMPLE_RESTRICTION.getMsg(tabref.getChild(0)));
       }
       qb.getParseInfo().setTabSample(alias, new TableSample(
-        sampleClause.getChild(0).getText(), 
-        sampleClause.getChild(1).getText(),
-        sampleCols)
+          unescapeIdentifier(sampleClause.getChild(0).getText()), 
+          unescapeIdentifier(sampleClause.getChild(1).getText()),
+          sampleCols)
       );
     }
     // Insert this map into the stats
-    String table_name = tabref.getChild(0).getText();
+    String table_name = unescapeIdentifier(tabref.getChild(0).getText());
     qb.setTabAlias(alias, table_name);
 
     qb.getParseInfo().setSrcForAlias(alias, tableTree);
@@ -233,7 +262,7 @@
       throw new SemanticException(ErrorMsg.NO_SUBQUERY_ALIAS.getMsg(subq));
     }
     CommonTree subqref = (CommonTree) subq.getChild(0);
-    String alias = subq.getChild(1).getText();
+    String alias = unescapeIdentifier(subq.getChild(1).getText());
 
     // Recursively do the first phase of semantic analysis for the subquery
     QBExpr qbexpr = new QBExpr(alias);
@@ -277,7 +306,7 @@
   }
 
   @SuppressWarnings({"fallthrough", "nls"})
-  private void doPhase1(CommonTree ast, QB qb, Phase1Ctx ctx_1)
+  public void doPhase1(CommonTree ast, QB qb, Phase1Ctx ctx_1)
       throws SemanticException {
 
     QBParseInfo qbp = qb.getParseInfo();
@@ -379,8 +408,20 @@
     }
   }
 
+  /** 
+   * Generate partition pruners. The filters can occur in the where clause and in the JOIN conditions. First, walk over the 
+   * filters in the join condition and AND them, since all of them are needed. Then for each where clause, traverse the 
+   * filter. 
+   * Note that, currently we do not propagate filters over subqueries. For eg: if the query is of the type:
+   * select ... FROM t1 JOIN (select ... t2) x where x.partition
+   * we will not recognize that x.partition condition introduces a parition pruner on t2
+   * 
+   */
   @SuppressWarnings("nls")
   private void genPartitionPruners(QB qb) throws SemanticException {
+    Map<String, Boolean> joinPartnPruner = new HashMap<String, Boolean>();
+    QBParseInfo qbp = qb.getParseInfo();
+
     // Recursively prune subqueries
     for (String alias : qb.getSubqAliases()) {
       QBExpr qbexpr = qb.getSubqForAlias(alias);
@@ -389,21 +430,12 @@
 
     for (String alias : qb.getTabAliases()) {
       String alias_id = (qb.getId() == null ? alias : qb.getId() + ":" + alias);
-      PartitionPruner pruner = new PartitionPruner(alias,
-                                                   qb.getMetaData());
+
+      PartitionPruner pruner = new PartitionPruner(alias, qb.getMetaData());
       // Pass each where clause to the pruner
-      QBParseInfo qbp = qb.getParseInfo();
       for(String clause: qbp.getClauseNames()) {
 
         CommonTree whexp = (CommonTree)qbp.getWhrForClause(clause);
-
-        if (pruner.getTable().isPartitioned() &&
-            conf.getVar(HiveConf.ConfVars.HIVEPARTITIONPRUNER).equalsIgnoreCase("strict") &&
-            (whexp == null || !pruner.hasPartitionPredicate((CommonTree)whexp.getChild(0)))) {
-          throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE.getMsg(whexp != null ? whexp : qbp.getSelForClause(clause), 
-                                                                             " for Alias " + alias + " Table " + pruner.getTable().getName()));
-        }
-
         if (whexp != null) {
           pruner.addExpression((CommonTree)whexp.getChild(0));
         }
@@ -412,6 +444,54 @@
       // Add the pruner to the list
       this.aliasToPruner.put(alias_id, pruner);
     }
+
+    if (!qb.getTabAliases().isEmpty() && qb.getQbJoinTree() != null) {
+      int pos = 0;
+      for (String alias : qb.getQbJoinTree().getBaseSrc()) {
+        if (alias != null) {
+          String alias_id = (qb.getId() == null ? alias : qb.getId() + ":" + alias);
+          PartitionPruner pruner = this.aliasToPruner.get(alias_id);
+          if(pruner == null) {
+            // this means that the alias is a subquery
+            pos++;
+            continue;
+          }
+          Vector<CommonTree> filters = qb.getQbJoinTree().getFilters().get(pos);
+          for (CommonTree cond : filters) {
+            pruner.addJoinOnExpression(cond);
+            if (pruner.hasPartitionPredicate(cond))
+              joinPartnPruner.put(alias_id, new Boolean(true));
+          }
+          if (qb.getQbJoinTree().getJoinSrc() != null) {
+            filters = qb.getQbJoinTree().getFilters().get(0);
+            for (CommonTree cond : filters) {
+              pruner.addJoinOnExpression(cond);
+              if (pruner.hasPartitionPredicate(cond))
+                joinPartnPruner.put(alias_id, new Boolean(true));
+            }
+          }
+        }
+        pos++;
+      }
+    }
+
+    for (String alias : qb.getTabAliases()) {
+      String alias_id = (qb.getId() == null ? alias : qb.getId() + ":" + alias);
+      PartitionPruner pruner = this.aliasToPruner.get(alias_id);
+      if (joinPartnPruner.get(alias_id) == null) {
+        // Pass each where clause to the pruner
+         for(String clause: qbp.getClauseNames()) {
+          
+           CommonTree whexp = (CommonTree)qbp.getWhrForClause(clause);
+           if (pruner.getTable().isPartitioned() &&
+               conf.getVar(HiveConf.ConfVars.HIVEPARTITIONPRUNER).equalsIgnoreCase("strict") &&
+               (whexp == null || !pruner.hasPartitionPredicate((CommonTree)whexp.getChild(0)))) {
+             throw new SemanticException(ErrorMsg.NO_PARTITION_PREDICATE.getMsg(whexp != null ? whexp : qbp.getSelForClause(clause), 
+                                                                                " for Alias " + alias + " Table " + pruner.getTable().getName()));
+           }
+         }
+      }
+    }
   }
 
   private void genSamplePruners(QBExpr qbexpr) throws SemanticException {
@@ -451,7 +531,7 @@
   }
 
   @SuppressWarnings("nls")
-  private void getMetaData(QB qb) throws SemanticException {
+  public void getMetaData(QB qb) throws SemanticException {
     try {
 
       LOG.info("Get metadata for source tables");
@@ -572,7 +652,7 @@
     // String[] allAliases = joinTree.getAllAliases();
     switch (condn.getToken().getType()) {
     case HiveParser.TOK_COLREF:
-      String tblName = condn.getChild(0).getText();
+      String tblName = unescapeIdentifier(condn.getChild(0).getText().toLowerCase());
       if (isPresent(joinTree.getLeftAliases(), tblName)) {
         if (!leftAliases.contains(tblName))
           leftAliases.add(tblName);
@@ -632,62 +712,169 @@
       throw new SemanticException(ErrorMsg.INVALID_JOIN_CONDITION_2.getMsg(condn));
   }
 
-  private void parseJoinCondition(CommonTree joinParseTree,
-      QBJoinTree joinTree, CommonTree joinCond, Vector<String> leftSrc)
+  /**
+   * Parse the join condition. 
+   * If the condition is a join condition, throw an error if it is not an equality. Otherwise, break it into left and 
+   * right expressions and store in the join tree.
+   * If the condition is a join filter, add it to the filter list of join tree.  The join condition can contains conditions
+   * on both the left and tree trees and filters on either. Currently, we only support equi-joins, so we throw an error
+   * if the condition involves both subtrees and is not a equality. Also, we only support AND i.e ORs are not supported 
+   * currently as their semantics are not very clear, may lead to data explosion and there is no usecase.
+   * @param joinTree  jointree to be populated
+   * @param joinCond  join condition
+   * @param leftSrc   left sources
+   * @throws SemanticException
+   */
+  private void parseJoinCondition(QBJoinTree joinTree, CommonTree joinCond, Vector<String> leftSrc)
       throws SemanticException {
 
     switch (joinCond.getToken().getType()) {
+    case HiveParser.KW_OR:
+      throw new SemanticException(ErrorMsg.INVALID_JOIN_CONDITION_3.getMsg(joinCond));
+      
     case HiveParser.KW_AND:
-      parseJoinCondition(joinParseTree, joinTree, (CommonTree) joinCond
+      parseJoinCondition(joinTree, (CommonTree) joinCond
           .getChild(0), leftSrc);
-      parseJoinCondition(joinParseTree, joinTree, (CommonTree) joinCond
+      parseJoinCondition(joinTree, (CommonTree) joinCond
           .getChild(1), leftSrc);
       break;
 
     case HiveParser.EQUAL:
       CommonTree leftCondn = (CommonTree) joinCond.getChild(0);
-      Vector<String> leftAliases = new Vector<String>();
-      Vector<String> rightAliases = new Vector<String>();
-      parseJoinCondPopulateAlias(joinTree, leftCondn, leftAliases, rightAliases);
-      populateAliases(leftAliases, rightAliases, leftCondn, joinTree, leftSrc);
+      Vector<String> leftCondAl1 = new Vector<String>();
+      Vector<String> leftCondAl2 = new Vector<String>();
+      parseJoinCondPopulateAlias(joinTree, leftCondn, leftCondAl1, leftCondAl2);
 
       CommonTree rightCondn = (CommonTree) joinCond.getChild(1);
-      leftAliases.clear();
-      rightAliases.clear();
-      parseJoinCondPopulateAlias(joinTree, rightCondn, leftAliases,
-          rightAliases);
-      populateAliases(leftAliases, rightAliases, rightCondn, joinTree, leftSrc);
+      Vector<String> rightCondAl1 = new Vector<String>();
+      Vector<String> rightCondAl2 = new Vector<String>();
+      parseJoinCondPopulateAlias(joinTree, rightCondn, rightCondAl1, rightCondAl2);
+
+      // is it a filter or a join condition
+      if (((leftCondAl1.size() != 0) && (leftCondAl2.size() != 0)) ||
+          ((rightCondAl1.size() != 0) && (rightCondAl2.size() != 0)))
+        throw new SemanticException(ErrorMsg.INVALID_JOIN_CONDITION_1.getMsg(joinCond));
+
+      if (leftCondAl1.size() != 0) {
+        if ((rightCondAl1.size() != 0) || ((rightCondAl1.size() == 0) && (rightCondAl2.size() == 0)))
+          joinTree.getFilters().get(0).add(joinCond);
+        else if (rightCondAl2.size() != 0) {
+          populateAliases(leftCondAl1, leftCondAl2, leftCondn, joinTree, leftSrc);
+          populateAliases(rightCondAl1, rightCondAl2, rightCondn, joinTree, leftSrc);
+        }
+      }
+      else if (leftCondAl2.size() != 0) {
+        if ((rightCondAl2.size() != 0) || ((rightCondAl1.size() == 0) && (rightCondAl2.size() == 0)))
+          joinTree.getFilters().get(1).add(joinCond);
+        else if (rightCondAl1.size() != 0) {
+          populateAliases(leftCondAl1, leftCondAl2, leftCondn, joinTree, leftSrc);
+          populateAliases(rightCondAl1, rightCondAl2, rightCondn, joinTree, leftSrc);
+        }
+      }
+      else if (rightCondAl1.size() != 0)
+        joinTree.getFilters().get(0).add(joinCond);
+      else
+        joinTree.getFilters().get(1).add(joinCond);
+
       break;
 
     default:
+      boolean isFunction = (joinCond.getType() == HiveParser.TOK_FUNCTION);
+        
+      // Create all children
+      int childrenBegin = (isFunction ? 1 : 0);
+      ArrayList<Vector<String>> leftAlias = new ArrayList<Vector<String>>(joinCond.getChildCount() - childrenBegin);
+      ArrayList<Vector<String>> rightAlias = new ArrayList<Vector<String>>(joinCond.getChildCount() - childrenBegin);
+      for (int ci = 0; ci < joinCond.getChildCount() - childrenBegin; ci++) {
+        Vector<String> left  = new Vector<String>();
+        Vector<String> right = new Vector<String>();
+        leftAlias.add(left);
+        rightAlias.add(right);
+      }
+        
+      for (int ci=childrenBegin; ci<joinCond.getChildCount(); ci++)
+        parseJoinCondPopulateAlias(joinTree, (CommonTree)joinCond.getChild(ci), leftAlias.get(ci-childrenBegin), rightAlias.get(ci-childrenBegin));
+
+      boolean leftAliasNull = true;
+      for (Vector<String> left : leftAlias) {
+        if (left.size() != 0) {
+          leftAliasNull = false;
+          break;
+        }
+    	}
+
+      boolean rightAliasNull = true;
+      for (Vector<String> right : rightAlias) {
+        if (right.size() != 0) {
+          rightAliasNull = false;
+          break;
+        }
+    	}
+
+      if (!leftAliasNull && !rightAliasNull)
+        throw new SemanticException(ErrorMsg.INVALID_JOIN_CONDITION_1.getMsg(joinCond));
+
+      if (!leftAliasNull)
+        joinTree.getFilters().get(0).add(joinCond);
+      else
+        joinTree.getFilters().get(1).add(joinCond);
+        
       break;
     }
   }
 
+  @SuppressWarnings("nls")
+  private Operator<? extends Serializable> putOpInsertMap(Operator<? extends Serializable> op, RowResolver rr) {
+    OpParseContext ctx = new OpParseContext(rr);
+    opParseCtx.put(op, ctx);
+    return op;
+  }
   
   @SuppressWarnings("nls")
-  private OperatorInfo genFilterPlan(String dest, QB qb,
-      OperatorInfo input) throws SemanticException {
+  private Operator genFilterPlan(String dest, QB qb,
+      Operator input) throws SemanticException {
 
     CommonTree whereExpr = qb.getParseInfo().getWhrForClause(dest);
-    OperatorInfo output = (OperatorInfo)input.clone();
-    output.setOp(
-        OperatorFactory.getAndMakeChild(
-            new filterDesc(genExprNodeDesc((CommonTree)whereExpr.getChild(0),
-                                           qb.getParseInfo().getAlias(),
-                                           input.getRowResolver())),
-            new RowSchema(output.getRowResolver().getColumnInfos()),
-                          input.getOp()
-        )
-    );
-    LOG.debug("Created Filter Plan for " + qb.getId() + ":" + dest + " row schema: " + output.getRowResolver().toString());
+    OpParseContext inputCtx = opParseCtx.get(input);
+    RowResolver inputRR = inputCtx.getRR();
+    Operator output = putOpInsertMap(
+      OperatorFactory.getAndMakeChild(
+        new filterDesc(genExprNodeDesc(qb.getMetaData(), (CommonTree)whereExpr.getChild(0), inputRR)),
+          new RowSchema(inputRR.getColumnInfos()), input), inputRR);
+ 
+    LOG.debug("Created Filter Plan for " + qb.getId() + ":" + dest + " row schema: " + inputRR.toString());
+    return output;
+  }
+
+  /**
+   * create a filter plan. The condition and the inputs are specified.
+   * @param qb current query block
+   * @param condn The condition to be resolved
+   * @param input the input operator
+   */
+  @SuppressWarnings("nls")
+  private Operator genFilterPlan(QB qb, CommonTree condn, Operator input) throws SemanticException {
+
+    OpParseContext inputCtx = opParseCtx.get(input);
+    RowResolver inputRR = inputCtx.getRR();
+    Operator output = putOpInsertMap(
+      OperatorFactory.getAndMakeChild(
+        new filterDesc(genExprNodeDesc(qb.getMetaData(), condn, inputRR)),
+          new RowSchema(inputRR.getColumnInfos()), input), inputRR);
+ 
+    LOG.debug("Created Filter Plan for " + qb.getId() + " row schema: " + inputRR.toString());
     return output;
   }
 
   @SuppressWarnings("nls")
-  private void genColList(String alias, CommonTree sel,
+  private void genColList(String tabAlias, String alias, CommonTree sel,
     ArrayList<exprNodeDesc> col_list, RowResolver input, Integer pos,
     RowResolver output) throws SemanticException {
+
+    // The table alias should exist
+    if (tabAlias != null && !input.hasTableAlias(tabAlias))
+      throw new SemanticException(ErrorMsg.INVALID_TABLE_ALIAS.getMsg(sel));
+    
     // TODO: Have to put in the support for AS clause
 
     // This is the tab.* case
@@ -703,42 +890,77 @@
     }
   }
 
-  @SuppressWarnings("nls")
-  private OperatorInfo genScriptPlan(CommonTree trfm, QB qb,
-      OperatorInfo input) throws SemanticException {
+  /**
+   * If the user script command needs any modifications - do it here
+   */
+  private String getFixedCmd(String cmd) {
+    SessionState ss = SessionState.get();
+    if(ss == null)
+      return cmd;
+
+    // for local mode - replace any references to packaged files by name with 
+    // the reference to the original file path
+    if(ss.getConf().get("mapred.job.tracker", "local").equals("local")) {
+      Set<String> files = ss.list_resource(SessionState.ResourceType.FILE, null);
+      if((files != null) && !files.isEmpty()) {
+        int end = cmd.indexOf(" ");
+        String prog = (end == -1) ? cmd : cmd.substring(0, end);
+        String args = (end == -1) ? "" :  cmd.substring(end, cmd.length());
+
+        for(String oneFile: files) {
+          Path p = new Path(oneFile);
+          if(p.getName().equals(prog)) {
+            cmd = oneFile + args;
+            break;
+          }
+        }
+      }
+    }
 
-    OperatorInfo output = (OperatorInfo)input.clone();
+    return cmd;
+  }
 
-    // Change the rws in this case
-    CommonTree collist = (CommonTree) trfm.getChild(1);
-    int ccount = collist.getChildCount();
-    RowResolver out_rwsch = new RowResolver();
-    StringBuilder sb = new StringBuilder();
 
-    for (int i = 0; i < ccount; ++i) {
+  @SuppressWarnings("nls")
+  private Operator genScriptPlan(CommonTree trfm, QB qb,
+      Operator input) throws SemanticException {
+    // If there is no "AS" clause, the output schema will be "key,value"
+    ArrayList<String> outputColList = new ArrayList<String>();
+    boolean defaultOutputColList = (trfm.getChildCount() < 3);
+    if (defaultOutputColList) {
+      outputColList.add("key");
+      outputColList.add("value");
+    } else {
+      CommonTree collist = (CommonTree) trfm.getChild(2);
+      int ccount = collist.getChildCount();
+      for (int i=0; i < ccount; ++i) {
+        outputColList.add(unescapeIdentifier(((CommonTree)collist.getChild(i)).getText()));
+      }
+    }
+    
+    RowResolver out_rwsch = new RowResolver();
+    StringBuilder columns = new StringBuilder();
+    for (int i = 0; i < outputColList.size(); ++i) {
       if (i != 0) {
-        sb.append(",");
+        columns.append(",");
       }
-      sb.append(((CommonTree)collist.getChild(i)).getText());
+      columns.append(outputColList.get(i));
       out_rwsch.put(
         qb.getParseInfo().getAlias(),
-        ((CommonTree)collist.getChild(i)).getText(),
-        new ColumnInfo(((CommonTree)collist.getChild(i)).getText(),
-                       String.class)  // Everything is a string right now
+        outputColList.get(i),
+        new ColumnInfo(outputColList.get(i), String.class)  // Script output is always a string
       );
     }
 
-    output
-        .setOp(OperatorFactory
+    Operator output = putOpInsertMap(OperatorFactory
             .getAndMakeChild(
                 new scriptDesc(
-                    stripQuotes(trfm.getChild(2).getText()),
-                    PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.tabCode), sb.toString()),
+                    getFixedCmd(stripQuotes(trfm.getChild(1).getText())),
+                    PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.tabCode), columns.toString(), defaultOutputColList),
                     PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.tabCode), "")),
                     new RowSchema(
-                        out_rwsch.getColumnInfos()), input.getOp()));
+                        out_rwsch.getColumnInfos()), input), out_rwsch);
 
-    output.setRowResolver(out_rwsch);
     return output;
   }
 
@@ -776,17 +998,22 @@
   private static String getColAlias(CommonTree selExpr, String defaultName) {
     if (selExpr.getChildCount() == 2) {
       // return zz for "xx + yy AS zz"
-      return selExpr.getChild(1).getText(); 
+      return unescapeIdentifier(selExpr.getChild(1).getText()); 
     }
 
     CommonTree root = (CommonTree)selExpr.getChild(0);
     while (root.getType() == HiveParser.DOT || root.getType() == HiveParser.TOK_COLREF) {
-      assert(root.getChildCount() == 2);
-      root = (CommonTree) root.getChild(1);
+      if (root.getType() == HiveParser.TOK_COLREF && root.getChildCount() == 1) {
+        root = (CommonTree) root.getChild(0);
+      }
+      else {
+        assert(root.getChildCount() == 2);
+        root = (CommonTree) root.getChild(1);
+      }
     }
     if (root.getType() == HiveParser.Identifier) {
       // Return zz for "xx.zz" and "xx.yy.zz"
-      return root.getText();
+      return unescapeIdentifier(root.getText());
     } else {
       // Return defaultName if selExpr is not a simple xx.yy.zz 
       return defaultName;
@@ -794,8 +1021,8 @@
   }
   
   @SuppressWarnings("nls")
-  private OperatorInfo genSelectPlan(String dest, QB qb,
-      OperatorInfo input) throws SemanticException {
+  private Operator genSelectPlan(String dest, QB qb,
+    Operator input) throws SemanticException {
 
     CommonTree selExprList = qb.getParseInfo().getSelForClause(dest);
 
@@ -804,7 +1031,9 @@
     CommonTree trfm = null;
     String alias = qb.getParseInfo().getAlias();
     Integer pos = Integer.valueOf(0);
-
+    RowResolver inputRR = opParseCtx.get(input).getRR();
+    boolean selectStar = false;
+    
     // Iterate over the selects
     for (int i = 0; i < selExprList.getChildCount(); ++i) {
 
@@ -812,10 +1041,13 @@
       CommonTree selExpr = (CommonTree) selExprList.getChild(i);
       String colAlias = getColAlias(selExpr, "_C" + i);
       CommonTree sel = (CommonTree)selExpr.getChild(0);
-
+      
       if (sel.getToken().getType() == HiveParser.TOK_ALLCOLREF) {
-        genColList(qb.getParseInfo().getAlias(), sel, col_list,
-            input.getRowResolver(), pos, out_rwsch);
+        String tabAlias = null;
+        if (sel.getChildCount() == 1)
+          tabAlias = unescapeIdentifier(sel.getChild(0).getText().toLowerCase());
+        genColList(tabAlias, alias, sel, col_list, inputRR, pos, out_rwsch);
+        selectStar = true;
       } else if (sel.getToken().getType() == HiveParser.TOK_TRANSFORM) {
         if (i > 0) {
           throw new SemanticException(ErrorMsg.INVALID_TRANSFORM.getMsg(sel));
@@ -825,26 +1057,28 @@
         for (int j = 0; j < cols.getChildCount(); ++j) {
           CommonTree expr = (CommonTree) cols.getChild(j);
           if (expr.getToken().getType() == HiveParser.TOK_ALLCOLREF) {
-            genColList(alias, expr,
-                       col_list, input.getRowResolver(),
-                       pos, out_rwsch);
+            String tabAlias = null;
+            if (sel.getChildCount() == 1)
+              tabAlias = unescapeIdentifier(sel.getChild(0).getText().toLowerCase());
+
+            genColList(tabAlias, alias, expr, col_list, inputRR, pos, out_rwsch);
+            selectStar = true;
           } else {
-            exprNodeDesc exp = genExprNodeDesc(expr, alias, input.getRowResolver());
+            exprNodeDesc exp = genExprNodeDesc(qb.getMetaData(), expr, inputRR);
             col_list.add(exp);
             if (!StringUtils.isEmpty(alias) &&
                 (out_rwsch.get(alias, colAlias) != null)) {
               throw new SemanticException(ErrorMsg.AMBIGOUS_COLUMN.getMsg(expr.getChild(1)));
             }
 
-            out_rwsch.put(alias, expr.getText(),
+            out_rwsch.put(alias, unescapeIdentifier(expr.getText()),
                           new ColumnInfo((Integer.valueOf(pos)).toString(),
-                                         exp.getTypeInfo())); // Everything is a string right now
+                                         exp.getTypeInfo()));
           }
         }
       } else {
         // Case when this is an expression
-        exprNodeDesc exp = genExprNodeDesc(sel, qb.getParseInfo()
-            .getAlias(), input.getRowResolver());
+        exprNodeDesc exp = genExprNodeDesc(qb.getMetaData(), sel, inputRR);
         col_list.add(exp);
         if (!StringUtils.isEmpty(alias) &&
             (out_rwsch.get(alias, colAlias) != null)) {
@@ -854,7 +1088,7 @@
         // of the expression as the column name
         out_rwsch.put(alias, colAlias,
                       new ColumnInfo((Integer.valueOf(pos)).toString(),
-                                     exp.getTypeInfo())); // Everything is a string right now
+                                     exp.getTypeInfo()));
       }
       pos = Integer.valueOf(pos.intValue() + 1);
     }
@@ -865,29 +1099,64 @@
       }
     }
     
-    OperatorInfo output = (OperatorInfo) input.clone();
-    output.setOp(OperatorFactory.getAndMakeChild(
-        new selectDesc(col_list), new RowSchema(out_rwsch.getColumnInfos()),
-        input.getOp()));
-
-    output.setRowResolver(out_rwsch);
+    Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+        new selectDesc(col_list, (selExprList.getChildCount() == 1) && selectStar), new RowSchema(out_rwsch.getColumnInfos()),
+        input), out_rwsch);
 
     if (trfm != null) {
       output = genScriptPlan(trfm, qb, output);
     }
 
-    LOG.debug("Created Select Plan for clause: " + dest + " row schema: "
-        + output.getRowResolver().toString());
+    LOG.debug("Created Select Plan for clause: " + dest + " row schema: " + out_rwsch.toString());
 
     return output;
   }
 
+  /**
+   * Class to store UDAF related information.
+   */
+  static class UDAFInfo {
+    ArrayList<exprNodeDesc> convertedParameters;
+    Method aggregateMethod;
+    Method evaluateMethod;
+  }
+
+  /**
+   * Returns the UDAFInfo struct for the aggregation
+   * @param aggName  The name of the UDAF.
+   * @param mode     The mode of the aggregation. This affects the evaluate method.
+   * @param aggClasses  The classes of the parameters to the UDAF. 
+   * @param aggParameters  The actual exprNodeDesc of the parameters.
+   * @param aggTree   The CommonTree node of the UDAF in the query.
+   * @return UDAFInfo
+   * @throws SemanticException when the UDAF is not found or has problems.
+   */
+  UDAFInfo getUDAFInfo(String aggName, groupByDesc.Mode mode, ArrayList<Class<?>> aggClasses,
+      ArrayList<exprNodeDesc> aggParameters, CommonTree aggTree) throws SemanticException {
+    UDAFInfo r = new UDAFInfo();
+    r.aggregateMethod = FunctionRegistry.getUDAFMethod(aggName, aggClasses);
+    if (null == r.aggregateMethod) {
+      String reason = "Looking for UDAF \"" + aggName + "\" with parameters " + aggClasses;
+      throw new SemanticException(ErrorMsg.INVALID_FUNCTION_SIGNATURE.getMsg((CommonTree)aggTree.getChild(0), reason));
+    }
+
+    r.convertedParameters = convertParameters(r.aggregateMethod, aggParameters);
+    
+    r.evaluateMethod = FunctionRegistry.getUDAFEvaluateMethod(aggName, mode);
+    if (r.evaluateMethod == null) {
+      String reason = "UDAF \"" + aggName + "\" does not have evaluate()/evaluatePartial() methods.";
+      throw new SemanticException(ErrorMsg.INVALID_FUNCTION.getMsg((CommonTree)aggTree.getChild(0), reason)); 
+    }
+    
+    return r;
+  }
+  
   @SuppressWarnings("nls")
-  private OperatorInfo genGroupByPlanGroupByOperator(
-        QBParseInfo parseInfo, String dest, OperatorInfo reduceSinkOperatorInfo,
+  private Operator genGroupByPlanGroupByOperator(
+        QBParseInfo parseInfo, String dest, Operator reduceSinkOperatorInfo,
         groupByDesc.Mode mode)
     throws SemanticException {
-    RowResolver groupByInputRowResolver = reduceSinkOperatorInfo.getRowResolver();
+    RowResolver groupByInputRowResolver = opParseCtx.get(reduceSinkOperatorInfo).getRR();
     RowResolver groupByOutputRowResolver = new RowResolver();
     groupByOutputRowResolver.setIsExprResolver(true);
     ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
@@ -933,41 +1202,38 @@
         aggClasses.add(paraExprInfo.getType().getPrimitiveClass());
       }
 
-      if (null == FunctionRegistry.getUDAFMethod(aggName, aggClasses)) {
-        String reason = "Looking for UDAF \"" + aggName + "\" with parameters " + aggClasses;
-        throw new SemanticException(ErrorMsg.INVALID_FUNCTION_SIGNATURE.getMsg((CommonTree)value.getChild(0), reason));
-      }
+      UDAFInfo udaf = getUDAFInfo(aggName, mode, aggClasses, aggParameters, value);
       
-      aggregations.add(new aggregationDesc(aggClass, aggParameters,
+      aggregations.add(new aggregationDesc(aggClass, udaf.convertedParameters,
           value.getToken().getType() == HiveParser.TOK_FUNCTIONDI));
       groupByOutputRowResolver.put("",value.toStringTree(),
                                    new ColumnInfo(Integer.valueOf(groupByKeys.size() + aggregations.size() -1).toString(),
-                                                  String.class));  // Everything is a string right now
+                                       udaf.evaluateMethod.getReturnType()));
     }
 
-    return new OperatorInfo(
-        OperatorFactory.getAndMakeChild(new groupByDesc(mode, groupByKeys, aggregations),
-                                        new RowSchema(groupByOutputRowResolver.getColumnInfos()),
-                                        reduceSinkOperatorInfo.getOp()),
+    return 
+      putOpInsertMap(OperatorFactory.getAndMakeChild(new groupByDesc(mode, groupByKeys, aggregations),
+                                                     new RowSchema(groupByOutputRowResolver.getColumnInfos()),
+                                                     reduceSinkOperatorInfo),
         groupByOutputRowResolver
     );
   }
 
   @SuppressWarnings("nls")
-  private OperatorInfo genGroupByPlanGroupByOpForward(
-        QBParseInfo parseInfo, String dest, OperatorInfo forwardOpInfo,
+  private Operator genGroupByPlanGroupByOperator1(
+        QBParseInfo parseInfo, String dest, Operator reduceSinkOperatorInfo,
         groupByDesc.Mode mode)
     throws SemanticException {
-    RowResolver inputRS  = forwardOpInfo.getRowResolver();
-    RowResolver outputRS = new RowResolver();
-    outputRS.setIsExprResolver(true);
+    RowResolver groupByInputRowResolver = opParseCtx.get(reduceSinkOperatorInfo).getRR();
+    RowResolver groupByOutputRowResolver = new RowResolver();
+    groupByOutputRowResolver.setIsExprResolver(true);
     ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
     ArrayList<aggregationDesc> aggregations = new ArrayList<aggregationDesc>();
     List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
-    for (int i = 0; i < grpByExprs.size(); i++) {
+    for (int i = 0; i < grpByExprs.size(); ++i) {
       CommonTree grpbyExpr = grpByExprs.get(i);
       String text = grpbyExpr.toStringTree();
-      ColumnInfo exprInfo = inputRS.get("",text);
+      ColumnInfo exprInfo = groupByInputRowResolver.get("",text);
 
       if (exprInfo == null) {
         throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(grpbyExpr));
@@ -975,16 +1241,13 @@
 
       groupByKeys.add(new exprNodeColumnDesc(exprInfo.getType(), exprInfo.getInternalName()));
       String field = (Integer.valueOf(i)).toString();
-      outputRS.put("", text,
-                   new ColumnInfo(field, exprInfo.getType()));
+      groupByOutputRowResolver.put("",grpbyExpr.toStringTree(),
+                                   new ColumnInfo(field, exprInfo.getType()));
     }
 
-    // For each aggregation
-    HashMap<String, CommonTree> aggregationTrees = parseInfo
-        .getAggregationExprsForClause(dest);
-    assert (aggregationTrees != null);
-    for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
-      CommonTree value = entry.getValue();
+    // If there is a distinctFuncExp, add all parameters to the reduceKeys.
+    if (parseInfo.getDistinctFuncExprForClause(dest) != null) {
+      CommonTree value = parseInfo.getDistinctFuncExprForClause(dest);
       String aggName = value.getChild(0).getText();
       Class<? extends UDAF> aggClass = FunctionRegistry.getUDAF(aggName);
       assert (aggClass != null);
@@ -994,7 +1257,7 @@
       for (int i = 1; i < value.getChildCount(); i++) {
         String text = value.getChild(i).toStringTree();
         CommonTree paraExpr = (CommonTree)value.getChild(i);
-        ColumnInfo paraExprInfo = inputRS.get("", text);
+        ColumnInfo paraExprInfo = groupByInputRowResolver.get("",text);
         if (paraExprInfo == null) {
           throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(paraExpr));
         }
@@ -1005,31 +1268,230 @@
         aggClasses.add(paraExprInfo.getType().getPrimitiveClass());
       }
 
-      if (null == FunctionRegistry.getUDAFMethod(aggName, aggClasses)) {
-        String reason = "Looking for UDAF \"" + aggName + "\" with parameters " + aggClasses;
-        throw new SemanticException(ErrorMsg.INVALID_FUNCTION_SIGNATURE.getMsg((CommonTree)value.getChild(0), reason));
-      }
+      UDAFInfo udaf = getUDAFInfo(aggName, mode, aggClasses, aggParameters, value);
       
-      aggregations.add(new aggregationDesc(aggClass, aggParameters,
-          value.getToken().getType() == HiveParser.TOK_FUNCTIONDI));
-      outputRS.put("",value.toStringTree(),
+      aggregations.add(new aggregationDesc(aggClass, udaf.convertedParameters, true));
+      groupByOutputRowResolver.put("",value.toStringTree(),
                                    new ColumnInfo(Integer.valueOf(groupByKeys.size() + aggregations.size() -1).toString(),
-                                                  String.class));  // Everything is a string right now
+                                       udaf.evaluateMethod.getReturnType()));
+    }
+
+    HashMap<String, CommonTree> aggregationTrees = parseInfo
+        .getAggregationExprsForClause(dest);
+    for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
+      CommonTree value = entry.getValue();
+      if (value.getToken().getType() == HiveParser.TOK_FUNCTIONDI)
+        continue;
+
+      String aggName = value.getChild(0).getText();
+      Class<? extends UDAF> aggClass = FunctionRegistry.getUDAF(aggName);
+      assert (aggClass != null);
+      ArrayList<exprNodeDesc> aggParameters = new ArrayList<exprNodeDesc>();
+      String text = entry.getKey();
+      ColumnInfo paraExprInfo = groupByInputRowResolver.get("",text);
+      if (paraExprInfo == null) {
+        throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(value));
+      }
+      String paraExpression = paraExprInfo.getInternalName();
+      assert(paraExpression != null);
+      aggParameters.add(new exprNodeColumnDesc(paraExprInfo.getType(), paraExpression));
+      aggregations.add(new aggregationDesc(aggClass, aggParameters, ((mode == groupByDesc.Mode.FINAL) ? false : (value.getToken().getType() == HiveParser.TOK_FUNCTIONDI))));
+      groupByOutputRowResolver.put("", value.toStringTree(),
+                                    new ColumnInfo(Integer.valueOf(groupByKeys.size() + aggregations.size() - 1).toString(),
+                                                   paraExprInfo.getType()));
     }
 
-    return new OperatorInfo(
+    return putOpInsertMap(
         OperatorFactory.getAndMakeChild(new groupByDesc(mode, groupByKeys, aggregations),
-                                        new RowSchema(outputRS.getColumnInfos()),
-                                        forwardOpInfo.getOp()),
-        outputRS
-    );
+                                        new RowSchema(groupByOutputRowResolver.getColumnInfos()),
+                                        reduceSinkOperatorInfo),
+        groupByOutputRowResolver);
   }
 
   @SuppressWarnings("nls")
-  private OperatorInfo genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
-      String dest, OperatorInfo inputOperatorInfo, int numPartitionFields)
+  private Operator genGroupByPlanMapGroupByOperator(QB qb, String dest, Operator inputOperatorInfo, 
+                                                    groupByDesc.Mode mode) throws SemanticException {
+
+    RowResolver groupByInputRowResolver = opParseCtx.get(inputOperatorInfo).getRR();
+    QBParseInfo parseInfo = qb.getParseInfo();
+    RowResolver groupByOutputRowResolver = new RowResolver();
+    groupByOutputRowResolver.setIsExprResolver(true);
+    ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
+    ArrayList<aggregationDesc> aggregations = new ArrayList<aggregationDesc>();
+    List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
+    for (int i = 0; i < grpByExprs.size(); ++i) {
+      CommonTree grpbyExpr = grpByExprs.get(i);
+      exprNodeDesc grpByExprNode = genExprNodeDesc(qb.getMetaData(), grpbyExpr, groupByInputRowResolver);
+
+      groupByKeys.add(grpByExprNode);
+      String field = (Integer.valueOf(i)).toString();
+      groupByOutputRowResolver.put("",grpbyExpr.toStringTree(),
+                                   new ColumnInfo(field, grpByExprNode.getTypeInfo()));
+    }
+
+    // If there is a distinctFuncExp, add all parameters to the reduceKeys.
+    if (parseInfo.getDistinctFuncExprForClause(dest) != null) {
+      CommonTree value = parseInfo.getDistinctFuncExprForClause(dest);
+      int numDistn=0;
+      // 0 is function name
+      for (int i = 1; i < value.getChildCount(); i++) {
+        CommonTree parameter = (CommonTree) value.getChild(i);
+        String text = parameter.toStringTree();
+        if (groupByOutputRowResolver.get("",text) == null) {
+          exprNodeDesc distExprNode = genExprNodeDesc(qb.getMetaData(), parameter, groupByInputRowResolver);
+          groupByKeys.add(distExprNode);
+          numDistn++;
+          String field = (Integer.valueOf(grpByExprs.size() + numDistn -1)).toString();
+          groupByOutputRowResolver.put("", text, new ColumnInfo(field, distExprNode.getTypeInfo()));
+        }
+      }
+    }
+
+    // For each aggregation
+    HashMap<String, CommonTree> aggregationTrees = parseInfo
+        .getAggregationExprsForClause(dest);
+    assert (aggregationTrees != null);
+
+    for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
+      CommonTree value = entry.getValue();
+      String aggName = value.getChild(0).getText();
+      Class<? extends UDAF> aggClass = FunctionRegistry.getUDAF(aggName);
+      assert (aggClass != null);
+      ArrayList<exprNodeDesc> aggParameters = new ArrayList<exprNodeDesc>();
+      ArrayList<Class<?>> aggClasses = new ArrayList<Class<?>>();
+      // 0 is the function name
+      for (int i = 1; i < value.getChildCount(); i++) {
+        CommonTree paraExpr = (CommonTree)value.getChild(i);
+        exprNodeDesc paraExprNode = genExprNodeDesc(qb.getMetaData(), paraExpr, groupByInputRowResolver);
+
+        aggParameters.add(paraExprNode);
+        aggClasses.add(paraExprNode.getTypeInfo().getPrimitiveClass());
+      }
+
+      UDAFInfo udaf = getUDAFInfo(aggName, mode, aggClasses, aggParameters, value);
+      
+      aggregations.add(new aggregationDesc(aggClass, udaf.convertedParameters,
+                                           value.getToken().getType() == HiveParser.TOK_FUNCTIONDI));
+      groupByOutputRowResolver.put("",value.toStringTree(),
+                                   new ColumnInfo(Integer.valueOf(groupByKeys.size() + aggregations.size() -1).toString(),
+                                       udaf.evaluateMethod.getReturnType()));
+    }
+
+    return putOpInsertMap(
+      OperatorFactory.getAndMakeChild(new groupByDesc(mode, groupByKeys, aggregations),
+                                      new RowSchema(groupByOutputRowResolver.getColumnInfos()),
+                                      inputOperatorInfo),
+      groupByOutputRowResolver);
+  }
+
+  private ArrayList<exprNodeDesc> convertParameters(Method m, ArrayList<exprNodeDesc> aggParameters) {
+    
+    ArrayList<exprNodeDesc> newParameters = new ArrayList<exprNodeDesc>();
+    Class<?>[] pTypes = m.getParameterTypes();
+
+    // 0 is the function name
+    for (int i = 0; i < aggParameters.size(); i++) {
+      exprNodeDesc desc = aggParameters.get(i);
+      Class<?> pType = ObjectInspectorUtils.generalizePrimitive(pTypes[i]);
+      if (desc instanceof exprNodeNullDesc) {
+        exprNodeConstantDesc newCh = new exprNodeConstantDesc(TypeInfoFactory.getPrimitiveTypeInfo(pType), null);
+        newParameters.add(newCh);
+      } else if (pType.isAssignableFrom(desc.getTypeInfo().getPrimitiveClass())) {
+        // no type conversion needed
+        newParameters.add(desc);
+      } else {
+        // must be implicit type conversion
+        Class<?> from = desc.getTypeInfo().getPrimitiveClass();
+        Class<?> to = pType;
+        assert(FunctionRegistry.implicitConvertable(from, to));
+        Method conv = FunctionRegistry.getUDFMethod(to.getName(), true, from);
+        assert(conv != null);
+        Class<? extends UDF> c = FunctionRegistry.getUDFClass(to.getName());
+        assert(c != null);
+        
+        // get the conversion method
+        ArrayList<exprNodeDesc> conversionArg = new ArrayList<exprNodeDesc>(1);
+        conversionArg.add(desc);
+        newParameters.add(new exprNodeFuncDesc(TypeInfoFactory.getPrimitiveTypeInfo(pType),
+                                               c, conv, conversionArg));
+      }
+    }
+
+    return newParameters;
+  }
+
+  @SuppressWarnings("nls")
+  private Operator genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
+      String dest, Operator inputOperatorInfo)
       throws SemanticException {
-    RowResolver reduceSinkInputRowResolver = inputOperatorInfo.getRowResolver();
+    RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo).getRR();
+    RowResolver reduceSinkOutputRowResolver = new RowResolver();
+    reduceSinkOutputRowResolver.setIsExprResolver(true);
+    ArrayList<exprNodeDesc> reduceKeys = new ArrayList<exprNodeDesc>();
+
+    // Pre-compute group-by keys and store in reduceKeys
+    List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
+    for (int i = 0; i < grpByExprs.size(); ++i) {
+      CommonTree grpbyExpr = grpByExprs.get(i);
+      String text = grpbyExpr.toStringTree();
+
+      if (reduceSinkOutputRowResolver.get("", text) == null) {
+        ColumnInfo exprInfo = reduceSinkInputRowResolver.get("", text);
+        reduceKeys.add(new exprNodeColumnDesc(exprInfo.getType(), exprInfo.getInternalName()));
+        reduceSinkOutputRowResolver.put("", text,
+                                        new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + Integer.valueOf(reduceKeys.size() - 1).toString(),
+                                                       exprInfo.getType()));
+      }
+    }
+
+    // If there is a distinctFuncExp, add all parameters to the reduceKeys.
+    if (parseInfo.getDistinctFuncExprForClause(dest) != null) {
+      CommonTree value = parseInfo.getDistinctFuncExprForClause(dest);
+      // 0 is function name
+      for (int i = 1; i < value.getChildCount(); i++) {
+        CommonTree parameter = (CommonTree) value.getChild(i);
+        String text = parameter.toStringTree();
+        if (reduceSinkOutputRowResolver.get("",text) == null) {
+          ColumnInfo exprInfo = reduceSinkInputRowResolver.get("", text);
+          reduceKeys.add(new exprNodeColumnDesc(exprInfo.getType(), exprInfo.getInternalName()));
+          reduceSinkOutputRowResolver.put("", text,
+                                          new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + Integer.valueOf(reduceKeys.size() - 1).toString(),
+                                                         exprInfo.getType()));
+        }
+      }
+    }
+
+    // Put partial aggregation results in reduceValues
+    ArrayList<exprNodeDesc> reduceValues = new ArrayList<exprNodeDesc>();
+    HashMap<String, CommonTree> aggregationTrees = parseInfo
+        .getAggregationExprsForClause(dest);
+    int inputField = reduceKeys.size();
+
+    for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
+
+      TypeInfo type = reduceSinkInputRowResolver.getColumnInfos().get(inputField).getType(); 
+      reduceValues.add(new exprNodeColumnDesc(
+          type, (Integer.valueOf(inputField)).toString()));
+      inputField++;
+      reduceSinkOutputRowResolver.put("", ((CommonTree)entry.getValue()).toStringTree(),
+                                      new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + (Integer.valueOf(reduceValues.size()-1)).toString(),
+                                                     type));
+    }
+
+    return putOpInsertMap(
+      OperatorFactory.getAndMakeChild(
+        PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, -1,
+                                    (parseInfo.getDistinctFuncExprForClause(dest) == null ? -1 : Integer.MAX_VALUE), -1, false),
+        new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()),
+        inputOperatorInfo),
+      reduceSinkOutputRowResolver);
+  }
+
+  @SuppressWarnings("nls")
+  private Operator genGroupByPlanReduceSinkOperator(QB qb,
+      String dest, Operator inputOperatorInfo, int numPartitionFields) throws SemanticException {
+    RowResolver reduceSinkInputRowResolver = opParseCtx.get(inputOperatorInfo).getRR();
+    QBParseInfo parseInfo = qb.getParseInfo();
     RowResolver reduceSinkOutputRowResolver = new RowResolver();
     reduceSinkOutputRowResolver.setIsExprResolver(true);
     ArrayList<exprNodeDesc> reduceKeys = new ArrayList<exprNodeDesc>();
@@ -1038,13 +1500,12 @@
     List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
     for (int i = 0; i < grpByExprs.size(); ++i) {
       CommonTree grpbyExpr = grpByExprs.get(i);
-      reduceKeys.add(genExprNodeDesc(grpbyExpr, parseInfo.getAlias(),
-          reduceSinkInputRowResolver));
+      reduceKeys.add(genExprNodeDesc(qb.getMetaData(), grpbyExpr, reduceSinkInputRowResolver));
       String text = grpbyExpr.toStringTree();
       if (reduceSinkOutputRowResolver.get("", text) == null) {
         reduceSinkOutputRowResolver.put("", text,
                                         new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + Integer.valueOf(reduceKeys.size() - 1).toString(),
-                                                       String.class)); // Everything is a string right now
+                                            reduceKeys.get(reduceKeys.size()-1).getTypeInfo()));
       } else {
         throw new SemanticException(ErrorMsg.DUPLICATE_GROUPBY_KEY.getMsg(grpbyExpr));
       }
@@ -1058,10 +1519,10 @@
         CommonTree parameter = (CommonTree) value.getChild(i);
         String text = parameter.toStringTree();
         if (reduceSinkOutputRowResolver.get("",text) == null) {
-          reduceKeys.add(genExprNodeDesc(parameter, parseInfo.getAlias(), reduceSinkInputRowResolver));
+          reduceKeys.add(genExprNodeDesc(qb.getMetaData(), parameter, reduceSinkInputRowResolver));
           reduceSinkOutputRowResolver.put("", text,
                                           new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + Integer.valueOf(reduceKeys.size() - 1).toString(),
-                                                         String.class)); // Everything is a string right now
+                                              reduceKeys.get(reduceKeys.size()-1).getTypeInfo()));
         }
       }
     }
@@ -1077,130 +1538,28 @@
         CommonTree parameter = (CommonTree) value.getChild(i);
         String text = parameter.toStringTree();
         if (reduceSinkOutputRowResolver.get("",text) == null) {
-          reduceValues.add(genExprNodeDesc(parameter, parseInfo.getAlias(), reduceSinkInputRowResolver));
+          reduceValues.add(genExprNodeDesc(qb.getMetaData(), parameter, reduceSinkInputRowResolver));
           reduceSinkOutputRowResolver.put("", text,
                                           new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + Integer.valueOf(reduceValues.size() - 1).toString(),
-                                                         String.class)); // Everything is a string right now
+                                              reduceValues.get(reduceValues.size()-1).getTypeInfo()));
         }
       }
     }
 
-    return new OperatorInfo(
+    return putOpInsertMap(
       OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, -1, numPartitionFields,
                                                                   -1, false),
                                         new RowSchema(reduceSinkOutputRowResolver.getColumnInfos()),
-                                        inputOperatorInfo.getOp()),
+                                        inputOperatorInfo),
         reduceSinkOutputRowResolver
     );
   }
 
   @SuppressWarnings("nls")
-  private OperatorInfo genGroupByPlanReduceSinkOperator(QBParseInfo parseInfo,
-     OperatorInfo input, CommonTree distinctText, TreeSet<String> ks)
-     throws SemanticException {
-    RowResolver inputRS = input.getRowResolver();
-    RowResolver outputRS = new RowResolver();
-    outputRS.setIsExprResolver(true);
-    ArrayList<exprNodeDesc> reduceKeys = new ArrayList<exprNodeDesc>();
-
-    // Spray on distinctText first
-    if (distinctText != null)
-    {
-    	reduceKeys.add(genExprNodeDesc(distinctText, parseInfo.getAlias(), inputRS));
-      String text = distinctText.toStringTree();
-      assert (outputRS.get("", text) == null);
-      outputRS.put("", text,
-        new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + Integer.valueOf(reduceKeys.size() - 1).toString(),
-                       String.class));
-    }
-    else {
-      // dummy key
-      reduceKeys.add(new exprNodeConstantDesc(0));
-    }
-
-    // copy the input row resolver
-    ArrayList<exprNodeDesc> reduceValues = new ArrayList<exprNodeDesc>();
-    Iterator<String> keysIter = inputRS.getTableNames().iterator();
-    while (keysIter.hasNext())
-    {
-      String key = keysIter.next();
-      HashMap<String, ColumnInfo> map = inputRS.getFieldMap(key);
-      Iterator<String> fNamesIter = map.keySet().iterator();
-      while (fNamesIter.hasNext())
-      {
-        String field = fNamesIter.next();
-        ColumnInfo valueInfo = inputRS.get(key, field);
-        
-        if (outputRS.get(key, field) == null) 
-        {
-        	reduceValues.add(new exprNodeColumnDesc(valueInfo.getType(), valueInfo.getInternalName()));
-          outputRS.put(key, field, new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + Integer.valueOf(reduceValues.size() - 1).toString(), 
-                                                  valueInfo.getType()));
-        }
-      }
-    }
-    
-    for (String dest : ks) {
-      List<CommonTree> grpByExprs = getGroupByForClause(parseInfo, dest);
-
-      // send all the group by expressions
-      for (int i = 0; i < grpByExprs.size(); ++i) {
-        CommonTree grpbyExpr = grpByExprs.get(i);
-        String text = grpbyExpr.toStringTree();
-        if (outputRS.get("", text) == null) {
-          exprNodeDesc grpbyExprNode = genExprNodeDesc(grpbyExpr, parseInfo.getAlias(), inputRS);
-          reduceValues.add(grpbyExprNode);
-          outputRS.put("", text,
-                       new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + Integer.valueOf(reduceValues.size() - 1).toString(),
-                                      grpbyExprNode.getTypeInfo())); 
-        }
-      }
-
-      // send all the aggregation expressions
-      HashMap<String, CommonTree> aggregationTrees = parseInfo.getAggregationExprsForClause(dest);
-      for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
-        CommonTree value = entry.getValue();
-        // 0 is function name
-        for (int i = 1; i < value.getChildCount(); i++) {
-          CommonTree parameter = (CommonTree) value.getChild(i);
-          String text = parameter.toStringTree();
-          if (outputRS.get("",text) == null) {
-            exprNodeDesc pNode = genExprNodeDesc(parameter, parseInfo.getAlias(), inputRS);
-            reduceValues.add(pNode);
-            outputRS.put("", text,
-                         new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + Integer.valueOf(reduceValues.size() - 1).toString(),
-                                        pNode.getTypeInfo()));
-          }
-        }
-      }
-    }
-
-    return new OperatorInfo(
-      OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, 
-                                                                  -1, distinctText == null ? -1 : 1, -1, false),
-                                        new RowSchema(outputRS.getColumnInfos()), input.getOp()),
-        outputRS);
-  }
-
-  @SuppressWarnings("nls")
-  private OperatorInfo genGroupByPlanForwardOperator(QBParseInfo parseInfo, OperatorInfo input)
-      throws SemanticException {
-    RowResolver outputRS = input.getRowResolver();;
-
-    Operator<? extends Serializable> forward = OperatorFactory.get(forwardDesc.class,
-        new RowSchema(outputRS.getColumnInfos()));
-    // set forward operator as child of each of input
-    List<Operator<? extends Serializable>> child = new ArrayList<Operator<? extends Serializable>>();
-    child.add(forward);
-    input.getOp().setChildOperators(child);
-
-    return new OperatorInfo(forward, outputRS);
-  }
-
-  @SuppressWarnings("nls")
-  private OperatorInfo genGroupByPlanReduceSinkOperator2MR(
-      QBParseInfo parseInfo, String dest, OperatorInfo groupByOperatorInfo,
-      int numPartitionFields) {
+  private Operator genGroupByPlanReduceSinkOperator2MR(
+      QBParseInfo parseInfo, String dest, Operator groupByOperatorInfo, int numPartitionFields)
+    throws SemanticException {
+    RowResolver reduceSinkInputRowResolver2 = opParseCtx.get(groupByOperatorInfo).getRR();
     RowResolver reduceSinkOutputRowResolver2 = new RowResolver();
     reduceSinkOutputRowResolver2.setIsExprResolver(true);
     ArrayList<exprNodeDesc> reduceKeys = new ArrayList<exprNodeDesc>();
@@ -1209,10 +1568,11 @@
     for (int i = 0; i < grpByExprs.size(); ++i) {
       CommonTree grpbyExpr = grpByExprs.get(i);
       String field = (Integer.valueOf(i)).toString();
-      reduceKeys.add(new exprNodeColumnDesc(TypeInfoFactory.getPrimitiveTypeInfo(String.class), field));
+      TypeInfo typeInfo = reduceSinkInputRowResolver2.get("", grpbyExpr.toStringTree()).getType();
+      reduceKeys.add(new exprNodeColumnDesc(typeInfo, field));
       reduceSinkOutputRowResolver2.put("", grpbyExpr.toStringTree(),
                                        new ColumnInfo(Utilities.ReduceField.KEY.toString() + "." + field,
-                                                      String.class)); // Everything is a string right now
+                                           typeInfo));
     }
     // Get partial aggregation results and store in reduceValues
     ArrayList<exprNodeDesc> reduceValues = new ArrayList<exprNodeDesc>();
@@ -1220,28 +1580,30 @@
     HashMap<String, CommonTree> aggregationTrees = parseInfo
         .getAggregationExprsForClause(dest);
     for (Map.Entry<String, CommonTree> entry : aggregationTrees.entrySet()) {
-      reduceValues.add(new exprNodeColumnDesc(TypeInfoFactory.getPrimitiveTypeInfo(String.class),
-                                              (Integer.valueOf(inputField)).toString()));
+      String field = (Integer.valueOf(inputField)).toString();
+      CommonTree t = entry.getValue();
+      TypeInfo typeInfo = reduceSinkInputRowResolver2.get("", t.toStringTree()).getType();
+      reduceValues.add(new exprNodeColumnDesc(typeInfo, field));
       inputField++;
-      reduceSinkOutputRowResolver2.put("", ((CommonTree)entry.getValue()).toStringTree(),
+      reduceSinkOutputRowResolver2.put("", t.toStringTree(),
                                        new ColumnInfo(Utilities.ReduceField.VALUE.toString() + "." + (Integer.valueOf(reduceValues.size()-1)).toString(),
-                                                      String.class)); // Everything is a string right now
+                                           typeInfo));
     }
 
-    return new OperatorInfo(
+    return putOpInsertMap(
       OperatorFactory.getAndMakeChild(PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, -1, 
                                                                   numPartitionFields, -1, true),
                                         new RowSchema(reduceSinkOutputRowResolver2.getColumnInfos()),
-                                        groupByOperatorInfo.getOp()),
+                                        groupByOperatorInfo),
         reduceSinkOutputRowResolver2
     );
   }
 
   @SuppressWarnings("nls")
-  private OperatorInfo genGroupByPlanGroupByOperator2MR(
-      QBParseInfo parseInfo, String dest, OperatorInfo reduceSinkOperatorInfo2)
+  private Operator genGroupByPlanGroupByOperator2MR(
+    QBParseInfo parseInfo, String dest, Operator reduceSinkOperatorInfo2, groupByDesc.Mode mode)
     throws SemanticException {
-    RowResolver groupByInputRowResolver2 = reduceSinkOperatorInfo2.getRowResolver();
+    RowResolver groupByInputRowResolver2 = opParseCtx.get(reduceSinkOperatorInfo2).getRR();
     RowResolver groupByOutputRowResolver2 = new RowResolver();
     groupByOutputRowResolver2.setIsExprResolver(true);
     ArrayList<exprNodeDesc> groupByKeys = new ArrayList<exprNodeDesc>();
@@ -1274,20 +1636,19 @@
       if (paraExprInfo == null) {
         throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(value));
       }
-
       String paraExpression = paraExprInfo.getInternalName();
       assert(paraExpression != null);
       aggParameters.add(new exprNodeColumnDesc(paraExprInfo.getType(), paraExpression));
-      aggregations.add(new aggregationDesc(aggClass, aggParameters, false));
+      aggregations.add(new aggregationDesc(aggClass, aggParameters, ((mode == groupByDesc.Mode.FINAL) ? false : (value.getToken().getType() == HiveParser.TOK_FUNCTIONDI))));
       groupByOutputRowResolver2.put("", value.toStringTree(),
                                     new ColumnInfo(Integer.valueOf(groupByKeys.size() + aggregations.size() - 1).toString(),
-                                                   paraExprInfo.getType())); // Everything is a string right now
+                                                   paraExprInfo.getType()));
     }
 
-    return new OperatorInfo(
-        OperatorFactory.getAndMakeChild(new groupByDesc(groupByDesc.Mode.PARTIAL2, groupByKeys, aggregations),
-                                        new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
-                                        reduceSinkOperatorInfo2.getOp()),
+    return putOpInsertMap(
+      OperatorFactory.getAndMakeChild(new groupByDesc(mode, groupByKeys, aggregations),
+                                      new RowSchema(groupByOutputRowResolver2.getColumnInfos()),
+                                      reduceSinkOperatorInfo2),
         groupByOutputRowResolver2
     );
   }
@@ -1308,20 +1669,18 @@
    * @throws SemanticException
    */
   @SuppressWarnings({ "unused", "nls" })
-  private OperatorInfo genGroupByPlan1MR(String dest, QB qb,
-      OperatorInfo input) throws SemanticException {
+  private Operator genGroupByPlan1MR(String dest, QB qb,
+      Operator input) throws SemanticException {
 
-    OperatorInfo inputOperatorInfo = input;
     QBParseInfo parseInfo = qb.getParseInfo();
 
     // ////// 1. Generate ReduceSinkOperator
-    OperatorInfo reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(
-        parseInfo, dest, inputOperatorInfo,
-        getGroupByForClause(parseInfo, dest).size());
+    Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(
+        qb, dest, input, getGroupByForClause(parseInfo, dest).size());
 
 
     // ////// 2. Generate GroupbyOperator
-    OperatorInfo groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
+    Operator groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
         dest, reduceSinkOperatorInfo, groupByDesc.Mode.COMPLETE);
 
     return groupByOperatorInfo;
@@ -1345,10 +1704,9 @@
    * @throws SemanticException
    */
   @SuppressWarnings("nls")
-  private OperatorInfo genGroupByPlan2MR(String dest, QB qb,
-      OperatorInfo input) throws SemanticException {
+  private Operator genGroupByPlan2MR(String dest, QB qb,
+      Operator input) throws SemanticException {
 
-    OperatorInfo inputOperatorInfo = input;
     QBParseInfo parseInfo = qb.getParseInfo();
 
     // ////// 1. Generate ReduceSinkOperator
@@ -1356,59 +1714,78 @@
     // reducers for load balancing problem.  That happens when there is no DISTINCT
     // operator.  We set the numPartitionColumns to -1 for this purpose.  This is 
     // captured by WritableComparableHiveObject.hashCode() function. 
-    OperatorInfo reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(
-        parseInfo, dest, inputOperatorInfo, (parseInfo
-            .getDistinctFuncExprForClause(dest) == null ? -1
+    Operator reduceSinkOperatorInfo = genGroupByPlanReduceSinkOperator(
+      qb, dest, input, (parseInfo.getDistinctFuncExprForClause(dest) == null ? -1
             : Integer.MAX_VALUE));
 
     // ////// 2. Generate GroupbyOperator
-    OperatorInfo groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
+    Operator groupByOperatorInfo = genGroupByPlanGroupByOperator(parseInfo,
         dest, reduceSinkOperatorInfo, groupByDesc.Mode.PARTIAL1);
 
     // ////// 3. Generate ReduceSinkOperator2
-    OperatorInfo reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(
+    Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(
         parseInfo, dest, groupByOperatorInfo,
         getGroupByForClause(parseInfo, dest).size());
 
     // ////// 4. Generate GroupbyOperator2
-    OperatorInfo groupByOperatorInfo2 = genGroupByPlanGroupByOperator2MR(
-        parseInfo, dest, reduceSinkOperatorInfo2);
+    Operator groupByOperatorInfo2 = 
+      genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, groupByDesc.Mode.FINAL);
 
     return groupByOperatorInfo2;
   }
 
+  private boolean optimizeMapAggrGroupBy(String dest, QB qb) {
+    List<CommonTree> grpByExprs = getGroupByForClause(qb.getParseInfo(), dest);
+    if ((grpByExprs != null) && !grpByExprs.isEmpty())
+      return false;
+
+    if (qb.getParseInfo().getDistinctFuncExprForClause(dest) != null)
+      return false;
+
+    return true;
+  }
+
   /**
-   * Generate a Group-By plan using a 2 map-reduce jobs. The first map-reduce
-   * job has already been constructed. Evaluate partial aggregates first,
-   * followed by actual aggregates. The first map-reduce stage will be 
-   * shared by all groupbys.
+   * Generate a Group-By plan using a 2 map-reduce jobs. First perform a map
+   * side partial aggregation (to reduce the amount of data). Then spray by
+   * the distinct key (or a random number) in hope of getting a uniform 
+   * distribution, and compute partial aggregates grouped by that distinct key.
+   * Evaluate partial aggregates first, followed by actual aggregates.
    */
   @SuppressWarnings("nls")
-  private OperatorInfo genGroupByPlan3MR(String dest, QB qb, 
-    OperatorInfo input) throws SemanticException {
+  private Operator genGroupByPlan4MR(String dest, QB qb, 
+    Operator inputOperatorInfo) throws SemanticException {
 
-    OperatorInfo inputOperatorInfo = input;
     QBParseInfo parseInfo = qb.getParseInfo();
 
-    // ////// Generate GroupbyOperator
-    OperatorInfo groupByOperatorInfo = genGroupByPlanGroupByOpForward(parseInfo,
-      dest, inputOperatorInfo, groupByDesc.Mode.PARTIAL1);
-
-    // //////  Generate ReduceSinkOperator2
-    OperatorInfo reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(
-      parseInfo, dest, groupByOperatorInfo,
-      getGroupByForClause(parseInfo, dest).size());
-
-    // ////// Generate GroupbyOperator2
-    OperatorInfo groupByOperatorInfo2 = genGroupByPlanGroupByOperator2MR(
-      parseInfo, dest, reduceSinkOperatorInfo2);
+    // ////// Generate GroupbyOperator for a map-side partial aggregation
+    Operator groupByOperatorInfo = genGroupByPlanMapGroupByOperator(qb,
+      dest, inputOperatorInfo, groupByDesc.Mode.HASH);
+
+    // ////// Generate ReduceSink Operator
+    Operator reduceSinkOperatorInfo = 
+      genGroupByPlanReduceSinkOperator(parseInfo, dest, groupByOperatorInfo);
+
+    // Optimize the scenario when there are no grouping keys and no distinct - 2 map-reduce jobs are not needed
+    if (!optimizeMapAggrGroupBy(dest, qb)) {
+      // ////// Generate GroupbyOperator for a partial aggregation
+      Operator groupByOperatorInfo2 = genGroupByPlanGroupByOperator1(parseInfo, dest, reduceSinkOperatorInfo, 
+                                                                         groupByDesc.Mode.PARTIAL2);
+      
+      // //////  Generate ReduceSinkOperator2
+      Operator reduceSinkOperatorInfo2 = genGroupByPlanReduceSinkOperator2MR(parseInfo, dest, groupByOperatorInfo2,
+                                                                                 getGroupByForClause(parseInfo, dest).size());
 
-    return groupByOperatorInfo2;
+      // ////// Generate GroupbyOperator3
+      return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo2, groupByDesc.Mode.FINAL);
+    }
+    else
+      return genGroupByPlanGroupByOperator2MR(parseInfo, dest, reduceSinkOperatorInfo, groupByDesc.Mode.FINAL);
   }
 
   @SuppressWarnings("nls")
-  private OperatorInfo genConversionOps(String dest, QB qb,
-      OperatorInfo input) throws SemanticException {
+  private Operator genConversionOps(String dest, QB qb,
+      Operator input) throws SemanticException {
 
     Integer dest_type = qb.getMetaData().getDestTypeForAlias(dest);
     Table dest_tab = null;
@@ -1433,9 +1810,10 @@
   }
 
   @SuppressWarnings("nls")
-  private OperatorInfo genFileSinkPlan(String dest, QB qb,
-      OperatorInfo input) throws SemanticException {
+  private Operator genFileSinkPlan(String dest, QB qb,
+      Operator input) throws SemanticException {
 
+  	RowResolver inputRR = opParseCtx.get(input).getRR();
     // Generate the destination file
     String queryTmpdir = this.scratchDir + File.separator + this.randomid + '.' + this.pathid + '.' + dest ;
     this.pathid ++;
@@ -1471,10 +1849,8 @@
       }
     case QBMetaData.DEST_LOCAL_FILE:
     case QBMetaData.DEST_DFS_FILE: {
-        table_desc = Utilities.defaultTd;
         dest_path = qb.getMetaData().getDestFileForAlias(dest);
         String cols = new String();
-        RowResolver inputRR = input.getRowResolver();
         Vector<ColumnInfo> colInfos = inputRR.getColumnInfos();
     
         boolean first = true;
@@ -1492,54 +1868,129 @@
         
         this.loadFileWork.add(new loadFileDesc(queryTmpdir, dest_path,
                                           (dest_type.intValue() == QBMetaData.DEST_DFS_FILE), cols));
+        table_desc = PlanUtils.getDefaultTableDesc(Integer.toString(Utilities.ctrlaCode),
+            cols);
         break;
     }
     default:
       throw new SemanticException("Unknown destination type: " + dest_type);
     }
 
-    OperatorInfo output = (OperatorInfo)input.clone();
-    output.setOp(
+    input = genConversionSelectOperator(dest, qb, input, table_desc);
+
+    Operator output = putOpInsertMap(
       OperatorFactory.getAndMakeChild(
         new fileSinkDesc(queryTmpdir, table_desc),
-        new RowSchema(output.getRowResolver().getColumnInfos()), input.getOp()
-      )
-    );
+        new RowSchema(inputRR.getColumnInfos()), input), inputRR);
 
     LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: "
         + dest_path + " row schema: "
-        + output.getRowResolver().toString());
+        + inputRR.toString());
     return output;
   }
 
+  /**
+   * Generate the conversion SelectOperator that converts the columns into 
+   * the types that are expected by the table_desc.
+   */
+  Operator genConversionSelectOperator(String dest, QB qb,
+      Operator input, tableDesc table_desc) throws SemanticException {
+    StructObjectInspector oi = null;
+    try {
+      Deserializer deserializer = table_desc.getDeserializerClass().newInstance();
+      deserializer.initialize(null, table_desc.getProperties());
+      oi = (StructObjectInspector) deserializer.getObjectInspector();
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+    
+    // Check column number
+    List<? extends StructField> tableFields = oi.getAllStructFieldRefs();
+    Vector<ColumnInfo> rowFields = opParseCtx.get(input).getRR().getColumnInfos();
+    if (tableFields.size() != rowFields.size()) {
+      String reason = "Table " + dest + " has " + tableFields.size() + " columns but query has "
+          + rowFields + ".";
+      throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
+          qb.getParseInfo().getDestForClause(dest), reason));
+    }
+    
+    // Check column types
+    boolean converted = false;
+    int columnNumber = tableFields.size();
+    ArrayList<exprNodeDesc> expressions = new ArrayList<exprNodeDesc>(columnNumber);
+    // MetadataTypedColumnsetSerDe does not need type conversions because it does
+    // the conversion to String by itself.
+    if (! table_desc.getDeserializerClass().equals(MetadataTypedColumnsetSerDe.class)) { 
+      for (int i=0; i<columnNumber; i++) {
+        ObjectInspector tableFieldOI = tableFields.get(i).getFieldObjectInspector();
+        TypeInfo tableFieldTypeInfo = TypeInfoUtils.getTypeInfoFromObjectInspector(tableFieldOI);
+        TypeInfo rowFieldTypeInfo = rowFields.get(i).getType();
+        exprNodeDesc column = new exprNodeColumnDesc(rowFieldTypeInfo, Integer.valueOf(i).toString());
+        if (! tableFieldTypeInfo.equals(rowFieldTypeInfo)) {
+          // need to do some conversions here
+          converted = true;
+          if (tableFieldTypeInfo.getCategory() != Category.PRIMITIVE) {
+            // cannot convert to complex types
+            column = null; 
+          } else {
+            column = getFuncExprNodeDesc(tableFieldTypeInfo.getPrimitiveClass().getName(), column);
+          }
+          if (column == null) {
+            String reason = "Cannot convert column " + i + " from " + rowFieldTypeInfo + " to " 
+                + tableFieldTypeInfo + ".";
+            throw new SemanticException(ErrorMsg.TARGET_TABLE_COLUMN_MISMATCH.getMsg(
+                qb.getParseInfo().getDestForClause(dest), reason));
+          }
+        } else {
+          expressions.add(column);
+        }
+      }
+    }
+    
+    if (converted) {
+      // add the select operator
+      RowResolver rowResolver = new RowResolver();
+      for (int i=0; i<expressions.size(); i++) {
+        String name = Integer.valueOf(i).toString();
+        rowResolver.put("", name, new ColumnInfo(name, expressions.get(i).getTypeInfo()));
+      }
+      Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
+        new selectDesc(expressions), new RowSchema(rowResolver.getColumnInfos()), input), rowResolver);
+
+      return output;
+    } else {
+      // not converted
+      return input;
+    }
+  }
+
   @SuppressWarnings("nls")
-  private OperatorInfo genLimitPlan(String dest, QB qb, OperatorInfo input, int limit) throws SemanticException {
+  private Operator genLimitPlan(String dest, QB qb, Operator input, int limit) throws SemanticException {
     // A map-only job can be optimized - instead of converting it to a map-reduce job, we can have another map
     // job to do the same to avoid the cost of sorting in the map-reduce phase. A better approach would be to
     // write into a local file and then have a map-only job.
     // Add the limit operator to get the value fields
 
-    OperatorInfo limitMap = (OperatorInfo)input.clone();
-    limitMap.setOp(
-      OperatorFactory.getAndMakeChild(
-        new limitDesc(limit), new RowSchema(limitMap.getRowResolver().getColumnInfos()),
-        input.getOp()
-      )
-    );
+  	RowResolver inputRR = opParseCtx.get(input).getRR();
+    Operator limitMap = 
+      putOpInsertMap(OperatorFactory.getAndMakeChild(
+                       new limitDesc(limit), new RowSchema(inputRR.getColumnInfos()), input), 
+                       inputRR);
+      
 
     LOG.debug("Created LimitOperator Plan for clause: " + dest + " row schema: "
-        + limitMap.getRowResolver().toString());
+        + inputRR.toString());
 
     return limitMap;
   }
 
   @SuppressWarnings("nls")
-  private OperatorInfo genLimitMapRedPlan(String dest, QB qb, OperatorInfo input, int limit, boolean isOuterQuery) throws SemanticException {
+  private Operator genLimitMapRedPlan(String dest, QB qb, Operator input, int limit, boolean isOuterQuery) throws SemanticException {
     // A map-only job can be optimized - instead of converting it to a map-reduce job, we can have another map
     // job to do the same to avoid the cost of sorting in the map-reduce phase. A better approach would be to
     // write into a local file and then have a map-only job.
     // Add the limit operator to get the value fields
-    OperatorInfo curr = genLimitPlan(dest, qb, input, limit);
+    Operator curr = genLimitPlan(dest, qb, input, limit);
 
     if (isOuterQuery)
       return curr;
@@ -1550,20 +2001,21 @@
   }
 
   @SuppressWarnings("nls")
-  private OperatorInfo genReduceSinkPlan(String dest, QB qb,
-                                         OperatorInfo input, int numReducers) throws SemanticException {
+  private Operator genReduceSinkPlan(String dest, QB qb,
+                                     Operator input, int numReducers) throws SemanticException {
 
     // First generate the expression for the key
     // The cluster by clause has the aliases for the keys
     ArrayList<exprNodeDesc> keyCols = new ArrayList<exprNodeDesc>();
-
+    RowResolver inputRR = opParseCtx.get(input).getRR();
+    
     CommonTree clby = qb.getParseInfo().getClusterByForClause(dest);
     if (clby != null) {
       int ccount = clby.getChildCount();
       for(int i=0; i<ccount; ++i) {
         CommonTree cl = (CommonTree)clby.getChild(i);
-        ColumnInfo colInfo = input.getRowResolver().get(qb.getParseInfo().getAlias(),
-                                                        cl.getText());
+        ColumnInfo colInfo = inputRR.get(qb.getParseInfo().getAlias(),
+                                         unescapeIdentifier(cl.getText()));
         if (colInfo == null) {
           throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(cl));
         }
@@ -1576,22 +2028,19 @@
 
     // For the generation of the values expression just get the inputs
     // signature and generate field expressions for those
-    for(ColumnInfo colInfo: input.getRowResolver().getColumnInfos()) {
+    for(ColumnInfo colInfo: inputRR.getColumnInfos()) {
       valueCols.add(new exprNodeColumnDesc(colInfo.getType(), colInfo.getInternalName()));
     }
 
-    OperatorInfo interim = (OperatorInfo)input.clone();
-    interim.setOp(
+    Operator interim = putOpInsertMap(
       OperatorFactory.getAndMakeChild(
         PlanUtils.getReduceSinkDesc(keyCols, valueCols, -1, keyCols.size(), numReducers, false),
-        new RowSchema(interim.getRowResolver().getColumnInfos()),
-        input.getOp()
-      )
-    );
+        new RowSchema(inputRR.getColumnInfos()),
+        input), inputRR);
 
     // Add the extract operator to get the value fields
     RowResolver out_rwsch = new RowResolver();
-    RowResolver interim_rwsch = interim.getRowResolver();
+    RowResolver interim_rwsch = inputRR;
     Integer pos = Integer.valueOf(0);
     for(ColumnInfo colInfo: interim_rwsch.getColumnInfos()) {
       String [] info = interim_rwsch.reverseLookup(colInfo.getInternalName());
@@ -1600,23 +2049,19 @@
       pos = Integer.valueOf(pos.intValue() + 1);
     }
 
-    OperatorInfo output = (OperatorInfo)interim.clone();
-    output.setOp(
+    Operator output = putOpInsertMap(
       OperatorFactory.getAndMakeChild(
         new extractDesc(new exprNodeColumnDesc(String.class, Utilities.ReduceField.VALUE.toString())),
         new RowSchema(out_rwsch.getColumnInfos()),
-        interim.getOp()
-      )
-    );
-    output.setRowResolver(out_rwsch);
+        interim), out_rwsch);
 
     LOG.debug("Created ReduceSink Plan for clause: " + dest + " row schema: "
-        + output.getRowResolver().toString());
+        + out_rwsch.toString());
     return output;
   }
 
-  private OperatorInfo genJoinOperatorChildren(QBJoinTree join, OperatorInfo left,
-      OperatorInfo[] right) {
+  private Operator genJoinOperatorChildren(QBJoinTree join, Operator left, Operator[] right) 
+    throws SemanticException {
     RowResolver outputRS = new RowResolver();
     // all children are base classes
     Operator<?>[] rightOps = new Operator[right.length];
@@ -1625,13 +2070,13 @@
 
     HashMap<Byte, ArrayList<exprNodeDesc>> exprMap = new HashMap<Byte, ArrayList<exprNodeDesc>>();
 
-    for (OperatorInfo input : right)
+    for (Operator input : right)
     {
       ArrayList<exprNodeDesc> keyDesc = new ArrayList<exprNodeDesc>();
       if (input == null)
         input = left;
-      Byte tag = Byte.valueOf((byte)(((reduceSinkDesc)(input.getOp().getConf())).getTag()));
-      RowResolver inputRS = input.getRowResolver();
+      Byte tag = Byte.valueOf((byte)(((reduceSinkDesc)(input.getConf())).getTag()));
+      RowResolver inputRS = opParseCtx.get(input).getRR();
       Iterator<String> keysIter = inputRS.getTableNames().iterator();
       while (keysIter.hasNext())
       {
@@ -1650,7 +2095,7 @@
       }
 
       exprMap.put(tag, keyDesc);
-      rightOps[pos++] = input.getOp();
+      rightOps[pos++] = input;
     }
 
     org.apache.hadoop.hive.ql.plan.joinCond[] joinCondns = new org.apache.hadoop.hive.ql.plan.joinCond[join.getJoinCond().length];
@@ -1659,14 +2104,15 @@
       joinCondns[i] = new org.apache.hadoop.hive.ql.plan.joinCond(condn);
     }
 
-    return new OperatorInfo(OperatorFactory.getAndMakeChild(new joinDesc(exprMap, joinCondns),
-      new RowSchema(outputRS.getColumnInfos()), rightOps), outputRS);
+    return putOpInsertMap(
+      OperatorFactory.getAndMakeChild(new joinDesc(exprMap, joinCondns),
+                                      new RowSchema(outputRS.getColumnInfos()), rightOps), outputRS);
   }
 
   @SuppressWarnings("nls")
-  private OperatorInfo genJoinReduceSinkChild(QB qb, QBJoinTree joinTree,
-      OperatorInfo child, String srcName, int pos) throws SemanticException {
-    RowResolver inputRS = child.getRowResolver();
+  private Operator genJoinReduceSinkChild(QB qb, QBJoinTree joinTree,
+      Operator child, String srcName, int pos) throws SemanticException {
+    RowResolver inputRS = opParseCtx.get(child).getRR();
     RowResolver outputRS = new RowResolver();
     ArrayList<exprNodeDesc> reduceKeys = new ArrayList<exprNodeDesc>();
 
@@ -1674,7 +2120,7 @@
     Vector<CommonTree> exprs = joinTree.getExpressions().get(pos);
     for (int i = 0; i < exprs.size(); i++) {
       CommonTree expr = exprs.get(i);
-      reduceKeys.add(genExprNodeDesc(expr, srcName, inputRS));
+      reduceKeys.add(genExprNodeDesc(qb.getMetaData(), expr, inputRS));
     }
 
     // Walk over the input row resolver and copy in the output
@@ -1696,28 +2142,32 @@
       }
     }
 
-    return new OperatorInfo(
+    return putOpInsertMap(
       OperatorFactory.getAndMakeChild(
         PlanUtils.getReduceSinkDesc(reduceKeys, reduceValues, joinTree.getNextTag(), reduceKeys.size(), -1, false), 
         new RowSchema(outputRS.getColumnInfos()),
-        child.getOp()), outputRS);
+        child), outputRS);
   }
 
-  private OperatorInfo genJoinOperator(QB qb, QBJoinTree joinTree,
-      HashMap<String, OperatorInfo> map) throws SemanticException {
+  private Operator genJoinOperator(QB qb, QBJoinTree joinTree,
+      HashMap<String, Operator> map) throws SemanticException {
     QBJoinTree leftChild = joinTree.getJoinSrc();
-    OperatorInfo joinSrcOp = null;
+    Operator joinSrcOp = null;
     if (leftChild != null)
     {
-      OperatorInfo joinOp = genJoinOperator(qb, leftChild, map);
+      Operator joinOp = genJoinOperator(qb, leftChild, map);
+      Vector<CommonTree> filter = joinTree.getFilters().get(0);
+      for (CommonTree cond: filter) 
+        joinOp = genFilterPlan(qb, cond, joinOp);
+      
       joinSrcOp = genJoinReduceSinkChild(qb, joinTree, joinOp, null, 0);
     }
 
-    OperatorInfo[] srcOps = new OperatorInfo[joinTree.getBaseSrc().length];
+    Operator[] srcOps = new Operator[joinTree.getBaseSrc().length];
     int pos = 0;
     for (String src : joinTree.getBaseSrc()) {
       if (src != null) {
-        OperatorInfo srcOp = map.get(src);
+        Operator srcOp = map.get(src);
         srcOps[pos] = genJoinReduceSinkChild(qb, joinTree, srcOp, src, pos);
         pos++;
       } else {
@@ -1732,13 +2182,13 @@
     return genJoinOperatorChildren(joinTree, joinSrcOp, srcOps);
   }
 
-  private void genJoinOperatorTypeCheck(OperatorInfo left, OperatorInfo[] right) throws SemanticException {
+  private void genJoinOperatorTypeCheck(Operator left, Operator[] right) throws SemanticException {
     // keys[i] -> ArrayList<exprNodeDesc> for the i-th join operator key list 
     ArrayList<ArrayList<exprNodeDesc>> keys = new ArrayList<ArrayList<exprNodeDesc>>();
     int keyLength = 0;
     for (int i=0; i<right.length; i++) {
-      OperatorInfo oi = (i==0 && right[i] == null ? left : right[i]);
-      reduceSinkDesc now = ((ReduceSinkOperator)(oi.getOp())).getConf();
+      Operator oi = (i==0 && right[i] == null ? left : right[i]);
+      reduceSinkDesc now = ((ReduceSinkOperator)(oi)).getConf();
       if (i == 0) {
         keyLength = now.getKeyCols().size();
       } else {
@@ -1765,15 +2215,45 @@
         }
       }
     }
+    // regenerate keySerializationInfo because the ReduceSinkOperator's 
+    // output key types might have changed.
+    for (int i=0; i<right.length; i++) {
+      Operator oi = (i==0 && right[i] == null ? left : right[i]);
+      reduceSinkDesc now = ((ReduceSinkOperator)(oi)).getConf();
+      now.setKeySerializeInfo(PlanUtils.getBinarySortableTableDesc(
+          PlanUtils.getFieldSchemasFromColumnList(now.getKeyCols(), "joinkey")));
+    }
   }
   
-  private OperatorInfo genJoinPlan(QB qb, HashMap<String, OperatorInfo> map)
+  private Operator genJoinPlan(QB qb, HashMap<String, Operator> map)
       throws SemanticException {
     QBJoinTree joinTree = qb.getQbJoinTree();
-    OperatorInfo joinOp = genJoinOperator(qb, joinTree, map);
+    Operator joinOp = genJoinOperator(qb, joinTree, map);
     return joinOp;
   }
 
+  /**
+   * Extract the filters from the join condition and push them on top of the source operators. This procedure 
+   * traverses the query tree recursively,
+   */
+  private void pushJoinFilters(QB qb, QBJoinTree joinTree, HashMap<String, Operator> map) throws SemanticException {
+    Vector<Vector<CommonTree>> filters = joinTree.getFilters();
+    if (joinTree.getJoinSrc() != null)
+      pushJoinFilters(qb, joinTree.getJoinSrc(), map);
+
+    int pos = 0;
+    for (String src : joinTree.getBaseSrc()) {
+      if (src != null) {
+        Operator srcOp = map.get(src);
+        Vector<CommonTree> filter = filters.get(pos);
+        for (CommonTree cond: filter) 
+          srcOp = genFilterPlan(qb, cond, srcOp);
+        map.put(src, srcOp);
+      }
+      pos++;
+    }
+  }
+  
   private QBJoinTree genJoinTree(CommonTree joinParseTree)
       throws SemanticException {
     QBJoinTree joinTree = new QBJoinTree();
@@ -1807,9 +2287,9 @@
 
     if ((left.getToken().getType() == HiveParser.TOK_TABREF)
         || (left.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
-      String table_name = left.getChild(0).getText();
-      String alias = left.getChildCount() == 1 ? table_name : left.getChild(1)
-          .getText();
+      String table_name = unescapeIdentifier(left.getChild(0).getText());
+      String alias = left.getChildCount() == 1 ? table_name : 
+        unescapeIdentifier(left.getChild(1).getText().toLowerCase());
       joinTree.setLeftAlias(alias);
       String[] leftAliases = new String[1];
       leftAliases[0] = alias;
@@ -1832,9 +2312,9 @@
 
     if ((right.getToken().getType() == HiveParser.TOK_TABREF)
         || (right.getToken().getType() == HiveParser.TOK_SUBQUERY)) {
-      String table_name = right.getChild(0).getText();
-      String alias = right.getChildCount() == 1 ? table_name : right.getChild(1)
-          .getText();

[... 757 lines stripped ...]