You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC

svn commit: r1635536 [12/28] - in /hive/branches/spark: ./ accumulo-handler/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/test/org/apache/hado...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Thu Oct 30 16:22:33 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.optimi
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -28,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
+import java.util.TreeMap;
 import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
@@ -67,7 +69,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
@@ -121,7 +122,11 @@ import org.apache.hadoop.hive.serde2.Des
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.util.ReflectionUtils;
 
 public class Vectorizer implements PhysicalPlanResolver {
@@ -282,13 +287,13 @@ public class Vectorizer implements Physi
 
     private PhysicalContext pctx;
 
-    private int keyColCount;
-    private int valueColCount;
+    private List<String> reduceColumnNames;
+    private List<TypeInfo> reduceTypeInfos;
 
     public VectorizationDispatcher(PhysicalContext pctx) {
       this.pctx = pctx;
-      keyColCount = 0;
-      valueColCount = 0;
+      reduceColumnNames = null;
+      reduceTypeInfos = null;
     }
 
     @Override
@@ -385,14 +390,13 @@ public class Vectorizer implements Physi
       HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
       ogw.startWalking(topNodes, nodeOutput);
 
-      Map<String, Map<Integer, String>> columnVectorTypes = vnp.getScratchColumnVectorTypes();
-      mapWork.setScratchColumnVectorTypes(columnVectorTypes);
-      Map<String, Map<String, Integer>> columnMap = vnp.getScratchColumnMap();
-      mapWork.setScratchColumnMap(columnMap);
+      Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps = vnp.getAllScratchColumnVectorTypeMaps();
+      mapWork.setAllScratchColumnVectorTypeMaps(allScratchColumnVectorTypeMaps);
+      Map<String, Map<String, Integer>> allColumnVectorMaps = vnp.getAllColumnVectorMaps();
+      mapWork.setAllColumnVectorMaps(allColumnVectorMaps);
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("vectorTypes: %s", columnVectorTypes.toString()));
-        LOG.debug(String.format("columnMap: %s", columnMap.toString()));
+        debugDisplayAllMaps(allColumnVectorMaps, allScratchColumnVectorTypeMaps);
       }
 
       return;
@@ -413,7 +417,7 @@ public class Vectorizer implements Physi
           return false;
         }
         StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector;
-        keyColCount = keyStructObjectInspector.getAllStructFieldRefs().size();
+        List<? extends StructField> keyFields = keyStructObjectInspector.getAllStructFieldRefs();
 
         // Tez doesn't use tagging...
         if (reduceWork.getNeedsTagging()) {
@@ -426,9 +430,20 @@ public class Vectorizer implements Physi
                 !(valueObjectInspector instanceof StructObjectInspector)) {
           return false;
         }
-        StructObjectInspector valueStructObjectInspector =
-                (StructObjectInspector)valueObjectInspector;
-        valueColCount = valueStructObjectInspector.getAllStructFieldRefs().size();
+        StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
+        List<? extends StructField> valueFields = valueStructObjectInspector.getAllStructFieldRefs();
+
+        reduceColumnNames = new ArrayList<String>();
+        reduceTypeInfos = new ArrayList<TypeInfo>();
+
+        for (StructField field: keyFields) {
+          reduceColumnNames.add(Utilities.ReduceField.KEY.toString() + "." + field.getFieldName());
+          reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
+        }
+        for (StructField field: valueFields) {
+          reduceColumnNames.add(Utilities.ReduceField.VALUE.toString() + "." + field.getFieldName());
+          reduceTypeInfos.add(TypeInfoUtils.getTypeInfoFromTypeString(field.getFieldObjectInspector().getTypeName()));
+        }
       } catch (Exception e) {
         throw new SemanticException(e);
       }
@@ -478,7 +493,7 @@ public class Vectorizer implements Physi
       // VectorizationContext...  Do we use PreOrderWalker instead of DefaultGraphWalker.
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
       ReduceWorkVectorizationNodeProcessor vnp =
-              new ReduceWorkVectorizationNodeProcessor(reduceWork, keyColCount, valueColCount);
+              new ReduceWorkVectorizationNodeProcessor(reduceColumnNames);
       addReduceWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new PreOrderWalker(disp);
@@ -495,18 +510,17 @@ public class Vectorizer implements Physi
 
       Operator<? extends OperatorDesc> reducer = reduceWork.getReducer();
       if (reducer.getType().equals(OperatorType.EXTRACT)) {
-        ((VectorExtractOperator)reducer).setKeyAndValueColCounts(keyColCount, valueColCount);
+        ((VectorExtractOperator)reducer).setReduceTypeInfos(reduceTypeInfos);
       }
 
-      Map<String, Map<Integer, String>> columnVectorTypes = vnp.getScratchColumnVectorTypes();
-      reduceWork.setScratchColumnVectorTypes(columnVectorTypes);
-      Map<String, Map<String, Integer>> columnMap = vnp.getScratchColumnMap();
-      reduceWork.setScratchColumnMap(columnMap);
+      Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps = vnp.getAllScratchColumnVectorTypeMaps();
+      reduceWork.setAllScratchColumnVectorTypeMaps(allScratchColumnVectorTypeMaps);
+      Map<String, Map<String, Integer>> allColumnVectorMaps = vnp.getAllColumnVectorMaps();
+      reduceWork.setAllColumnVectorMaps(allColumnVectorMaps);
 
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug(String.format("vectorTypes: %s", columnVectorTypes.toString()));
-        LOG.debug(String.format("columnMap: %s", columnMap.toString()));
+        debugDisplayAllMaps(allColumnVectorMaps, allScratchColumnVectorTypeMaps);
       }
     }
   }
@@ -571,26 +585,26 @@ public class Vectorizer implements Physi
     protected final Set<Operator<? extends OperatorDesc>> opsDone =
         new HashSet<Operator<? extends OperatorDesc>>();
 
-    public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
-      Map<String, Map<Integer, String>> scratchColumnVectorTypes =
+    public Map<String, Map<Integer, String>> getAllScratchColumnVectorTypeMaps() {
+      Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps =
           new HashMap<String, Map<Integer, String>>();
       for (String onefile : scratchColumnContext.keySet()) {
         VectorizationContext vc = scratchColumnContext.get(onefile);
-        Map<Integer, String> cmap = vc.getOutputColumnTypeMap();
-        scratchColumnVectorTypes.put(onefile, cmap);
+        Map<Integer, String> cmap = vc.getScratchColumnTypeMap();
+        allScratchColumnVectorTypeMaps.put(onefile, cmap);
       }
-      return scratchColumnVectorTypes;
+      return allScratchColumnVectorTypeMaps;
     }
 
-    public Map<String, Map<String, Integer>> getScratchColumnMap() {
-      Map<String, Map<String, Integer>> scratchColumnMap =
+    public Map<String, Map<String, Integer>> getAllColumnVectorMaps() {
+      Map<String, Map<String, Integer>> allColumnVectorMaps =
           new HashMap<String, Map<String, Integer>>();
       for(String oneFile: scratchColumnContext.keySet()) {
         VectorizationContext vc = scratchColumnContext.get(oneFile);
-        Map<String, Integer> cmap = vc.getColumnMap();
-        scratchColumnMap.put(oneFile, cmap);
+        Map<String, Integer> cmap = vc.getProjectionColumnMap();
+        allColumnVectorMaps.put(oneFile, cmap);
       }
-      return scratchColumnMap;
+      return allColumnVectorMaps;
     }
 
     public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack,
@@ -676,10 +690,7 @@ public class Vectorizer implements Physi
               vContext.setFileKey(onefile);
               scratchColumnContext.put(onefile, vContext);
               if (LOG.isDebugEnabled()) {
-                LOG.debug("Vectorized MapWork operator " + op.getName() +
-                        " with vectorization context key=" + vContext.getFileKey() +
-                        ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
-                        ", columnMap: " + vContext.getColumnMap().toString());
+                LOG.debug("Vectorized MapWork operator " + op.getName() + " vectorization context " + vContext.toString());
               }
               break;
             }
@@ -710,17 +721,11 @@ public class Vectorizer implements Physi
       Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Vectorized MapWork operator " + vectorOp.getName() +
-                " with vectorization context key=" + vContext.getFileKey() +
-                ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
-                ", columnMap: " + vContext.getColumnMap().toString());
+        LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + " vectorization context " + vContext.toString());
         if (vectorOp instanceof VectorizationContextRegion) {
           VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
           VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
-          LOG.debug("Vectorized MapWork operator " + vectorOp.getName() +
-                  " added new vectorization context key=" + vOutContext.getFileKey() +
-                  ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() +
-                  ", columnMap: " + vOutContext.getColumnMap().toString());
+          LOG.debug("Vectorized MapWork operator " + vectorOp.getName() + " added vectorization context " + vContext.toString());
         }
       }
 
@@ -730,10 +735,7 @@ public class Vectorizer implements Physi
 
   class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
 
-    private final ReduceWork rWork;
-    private int keyColCount;
-    private int valueColCount;
-    private Map<String, Integer> reduceColumnNameMap;
+    private List<String> reduceColumnNames;
     
     private VectorizationContext reduceShuffleVectorizationContext;
 
@@ -743,12 +745,8 @@ public class Vectorizer implements Physi
       return rootVectorOp;
     }
 
-    public ReduceWorkVectorizationNodeProcessor(ReduceWork rWork, int keyColCount,
-            int valueColCount) {
-      this.rWork = rWork;
-      reduceColumnNameMap = rWork.getReduceColumnNameMap();
-      this.keyColCount = keyColCount;
-      this.valueColCount = valueColCount;
+    public ReduceWorkVectorizationNodeProcessor(List<String> reduceColumnNames) {
+      this.reduceColumnNames =  reduceColumnNames;
       rootVectorOp = null;
       reduceShuffleVectorizationContext = null;
     }
@@ -766,17 +764,16 @@ public class Vectorizer implements Physi
       boolean saveRootVectorOp = false;
 
       if (op.getParentOperators().size() == 0) {
-        vContext = getReduceVectorizationContext(reduceColumnNameMap);
+        LOG.info("ReduceWorkVectorizationNodeProcessor process reduceColumnNames " + reduceColumnNames.toString());
+
+        vContext = new VectorizationContext(reduceColumnNames);
         vContext.setFileKey("_REDUCE_SHUFFLE_");
         scratchColumnContext.put("_REDUCE_SHUFFLE_", vContext);
         reduceShuffleVectorizationContext = vContext;
         saveRootVectorOp = true;
 
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context key=" +
-                  vContext.getFileKey() +
-                  ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
-                  ", columnMap: " + vContext.getColumnMap().toString());
+          LOG.debug("Vectorized ReduceWork reduce shuffle vectorization context " + vContext.toString());
         }
       } else {
         vContext = walkStackToFindVectorizationContext(stack, op);
@@ -802,17 +799,11 @@ public class Vectorizer implements Physi
       Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() +
-                " with vectorization context key=" + vContext.getFileKey() +
-                ", vectorTypes: " + vContext.getOutputColumnTypeMap().toString() +
-                ", columnMap: " + vContext.getColumnMap().toString());
+        LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + " vectorization context " + vContext.toString());
         if (vectorOp instanceof VectorizationContextRegion) {
           VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
           VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
-          LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() +
-                  " added new vectorization context key=" + vOutContext.getFileKey() +
-                  ", vectorTypes: " + vOutContext.getOutputColumnTypeMap().toString() +
-                  ", columnMap: " + vOutContext.getColumnMap().toString());
+          LOG.debug("Vectorized ReduceWork operator " + vectorOp.getName() + " added vectorization context " + vContext.toString());
         }
       }
       if (vectorOp instanceof VectorGroupByOperator) {
@@ -830,7 +821,7 @@ public class Vectorizer implements Physi
 
   private static class ValidatorVectorizationContext extends VectorizationContext {
     private ValidatorVectorizationContext() {
-      super(null, -1);
+      super();
     }
 
     @Override
@@ -918,7 +909,12 @@ public class Vectorizer implements Physi
         }
         break;
       case GROUPBY:
-        ret = validateGroupByOperator((GroupByOperator) op, true, true);
+        if (HiveConf.getBoolVar(physicalContext.getConf(),
+                    HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_GROUPBY_ENABLED)) {
+          ret = validateGroupByOperator((GroupByOperator) op, true, true);
+        } else {
+          ret = false;
+        }
         break;
       case FILTER:
         ret = validateFilterOperator((FilterOperator) op);
@@ -1026,12 +1022,10 @@ public class Vectorizer implements Physi
     if (!ret) {
       return false;
     }
-    boolean isVectorOutput = isTez && aggregatorsOutputIsPrimitive(desc.getAggregators(), isReduce);
-    vectorDesc.setVectorOutput(isVectorOutput);
     if (isReduce) {
       if (desc.isDistinct()) {
         LOG.info("Distinct not supported in reduce vector mode");
-        return false;    
+        return false;
       }
       // Sort-based GroupBy?
       if (desc.getMode() != GroupByDesc.Mode.COMPLETE &&
@@ -1044,21 +1038,24 @@ public class Vectorizer implements Physi
       LOG.info("Reduce GROUP BY mode is " + desc.getMode().name());
       if (desc.getGroupKeyNotReductionKey()) {
         LOG.info("Reduce vector mode not supported when group key is not reduction key");
-        return false;    
+        return false;
       }
-      if (!isVectorOutput) {
+      if (!aggregatorsOutputIsPrimitive(desc.getAggregators(), isReduce)) {
         LOG.info("Reduce vector mode only supported when aggregate outputs are primitive types");
-        return false;    
+        return false;
       }
       if (desc.getKeys().size() > 0) {
+        if (op.getParentOperators().size() > 0) {
+          LOG.info("Reduce vector mode can only handle a key group GROUP BY operator when it is fed by reduce-shuffle");
+          return false;
+        }
         LOG.info("Reduce-side GROUP BY will process key groups");
         vectorDesc.setVectorGroupBatches(true);
       } else {
         LOG.info("Reduce-side GROUP BY will do global aggregation");
       }
+      vectorDesc.setVectorOutput(true);
       vectorDesc.setIsReduce(true);
-    } else {
-      LOG.info("Downstream operators of map-side GROUP BY will be vectorized: " + isVectorOutput);
     }
     return true;
   }
@@ -1227,21 +1224,17 @@ public class Vectorizer implements Physi
       PhysicalContext pctx) {
     RowSchema rs = op.getSchema();
 
-    Map<String, Integer> cmap = new HashMap<String, Integer>();
-    int columnCount = 0;
+    // Add all non-virtual columns to make a vectorization context for
+    // the TableScan operator.
+    VectorizationContext vContext = new VectorizationContext();
     for (ColumnInfo c : rs.getSignature()) {
       // Earlier, validation code should have eliminated virtual columns usage (HIVE-5560).
       if (!isVirtualColumn(c)) {
-        cmap.put(c.getInternalName(), columnCount++);
+        vContext.addInitialColumn(c.getInternalName());
       }
     }
-
-    return new VectorizationContext(cmap, columnCount);
-  }
-
-  private VectorizationContext getReduceVectorizationContext(
-          Map<String, Integer> reduceColumnNameMap) {
-    return new VectorizationContext(reduceColumnNameMap, reduceColumnNameMap.size());
+    vContext.finishedAddingInitialColumns();
+    return vContext;
   }
 
   private void fixupParentChildOperators(Operator<? extends OperatorDesc> op, 
@@ -1297,4 +1290,41 @@ public class Vectorizer implements Physi
     }
     return false;
   }
+
+  public void debugDisplayAllMaps(Map<String, Map<String, Integer>> allColumnVectorMaps, 
+          Map<String, Map<Integer, String>> allScratchColumnVectorTypeMaps) {
+
+    // Context keys grow in length since they are a path...
+    Comparator<String> comparerShorterString = new Comparator<String>() {
+      @Override
+      public int compare(String o1, String o2) {
+        Integer length1 = o1.length();
+        Integer length2 = o2.length();
+        return length1.compareTo(length2);
+      }};
+
+    Comparator<Integer> comparerInteger = new Comparator<Integer>() {
+      @Override
+      public int compare(Integer o1, Integer o2) {
+        return o1.compareTo(o2);
+      }};
+
+    Map<String, Map<Integer, String>> sortedAllColumnVectorMaps = new TreeMap<String, Map<Integer, String>>(comparerShorterString);
+    for (Map.Entry<String, Map<String, Integer>> entry : allColumnVectorMaps.entrySet()) {
+      Map<Integer, String> sortedColumnMap = new TreeMap<Integer, String>(comparerInteger);
+      for (Map.Entry<String, Integer> innerEntry : entry.getValue().entrySet()) {
+        sortedColumnMap.put(innerEntry.getValue(), innerEntry.getKey());
+      }
+      sortedAllColumnVectorMaps.put(entry.getKey(), sortedColumnMap);
+    }
+    LOG.debug("sortedAllColumnVectorMaps " + sortedAllColumnVectorMaps);
+
+    Map<String, Map<Integer, String>> sortedAllScratchColumnVectorTypeMap = new TreeMap<String, Map<Integer, String>>(comparerShorterString);
+    for (Map.Entry<String, Map<Integer, String>> entry : allScratchColumnVectorTypeMaps.entrySet()) {
+      Map<Integer, String> sortedScratchColumnTypeMap = new TreeMap<Integer, String>(comparerInteger);
+      sortedScratchColumnTypeMap.putAll(entry.getValue());
+      sortedAllScratchColumnVectorTypeMap.put(entry.getKey(), sortedScratchColumnTypeMap);
+    }
+    LOG.debug("sortedAllScratchColumnVectorTypeMap " + sortedAllScratchColumnVectorTypeMap);
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java Thu Oct 30 16:22:33 2014
@@ -56,7 +56,9 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 
 /**
@@ -188,12 +190,18 @@ public class PartitionPruner implements 
     // Replace virtual columns with nulls. See javadoc for details.
     prunerExpr = removeNonPartCols(prunerExpr, extractPartColNames(tab), partColsUsedInFilter);
     // Remove all parts that are not partition columns. See javadoc for details.
-    ExprNodeGenericFuncDesc compactExpr = (ExprNodeGenericFuncDesc)compactExpr(prunerExpr.clone());
+    ExprNodeDesc compactExpr = compactExpr(prunerExpr.clone());
     String oldFilter = prunerExpr.getExprString();
-    if (compactExpr == null) {
-      // Non-strict mode, and all the predicates are on non-partition columns - get everything.
-      LOG.debug("Filter " + oldFilter + " was null after compacting");
-      return getAllPartsFromCacheOrServer(tab, key, true, prunedPartitionsMap);
+    if (isBooleanExpr(compactExpr)) {
+    	// For null and true values, return every partition
+    	if (!isFalseExpr(compactExpr)) {
+    		// Non-strict mode, and all the predicates are on non-partition columns - get everything.
+    		LOG.debug("Filter " + oldFilter + " was null after compacting");
+    		return getAllPartsFromCacheOrServer(tab, key, true, prunedPartitionsMap);
+    	} else {
+    		return new PrunedPartitionList(tab, new LinkedHashSet<Partition>(new ArrayList<Partition>()),
+    				new ArrayList<String>(), false);
+    	}
     }
     LOG.debug("Filter w/ compacting: " + compactExpr.getExprString()
         + "; filter w/o compacting: " + oldFilter);
@@ -204,7 +212,7 @@ public class PartitionPruner implements 
       return ppList;
     }
 
-    ppList = getPartitionsFromServer(tab, compactExpr, conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString()));
+    ppList = getPartitionsFromServer(tab, (ExprNodeGenericFuncDesc)compactExpr, conf, alias, partColsUsedInFilter, oldFilter.equals(compactExpr.getExprString()));
     prunedPartitionsMap.put(key, ppList);
     return ppList;
   }
@@ -225,16 +233,22 @@ public class PartitionPruner implements 
     partsCache.put(key, ppList);
     return ppList;
   }
-
-  private static ExprNodeDesc removeTruePredciates(ExprNodeDesc e) {
-    if (e instanceof ExprNodeConstantDesc) {
-      ExprNodeConstantDesc eC = (ExprNodeConstantDesc) e;
-      if (e.getTypeInfo() == TypeInfoFactory.booleanTypeInfo
-          && eC.getValue() == Boolean.TRUE) {
-        return null;
-      }
-    }
-    return e;
+  
+  static private boolean isBooleanExpr(ExprNodeDesc expr) {
+	  return  expr != null && expr instanceof ExprNodeConstantDesc && 
+              ((ExprNodeConstantDesc)expr).getTypeInfo() instanceof PrimitiveTypeInfo &&
+              ((PrimitiveTypeInfo)(((ExprNodeConstantDesc)expr).getTypeInfo())).
+              getTypeName().equals(serdeConstants.BOOLEAN_TYPE_NAME);	  
+  }
+  static private boolean isTrueExpr(ExprNodeDesc expr) {
+      return  isBooleanExpr(expr) &&  
+              ((ExprNodeConstantDesc)expr).getValue() != null &&
+              ((ExprNodeConstantDesc)expr).getValue().equals(Boolean.TRUE);
+  }
+  static private boolean isFalseExpr(ExprNodeDesc expr) {
+      return  isBooleanExpr(expr) && 
+              ((ExprNodeConstantDesc)expr).getValue() != null &&
+              ((ExprNodeConstantDesc)expr).getValue().equals(Boolean.FALSE);	  
   }
 
   /**
@@ -245,10 +259,13 @@ public class PartitionPruner implements 
    * @return partition pruning expression that only contains partition columns.
    */
   static private ExprNodeDesc compactExpr(ExprNodeDesc expr) {
-    if (expr instanceof ExprNodeConstantDesc) {
-      expr = removeTruePredciates(expr);
-      if (expr == null || ((ExprNodeConstantDesc)expr).getValue() == null) {
-        return null;
+    // If this is a constant boolean expression, return the value.
+	if (expr == null) {
+		return null;
+	}
+	if (expr instanceof ExprNodeConstantDesc) {
+      if (isBooleanExpr(expr)) {
+        return expr;
       } else {
         throw new IllegalStateException("Unexpected non-null ExprNodeConstantDesc: "
           + expr.getExprString());
@@ -256,22 +273,29 @@ public class PartitionPruner implements 
     } else if (expr instanceof ExprNodeGenericFuncDesc) {
       GenericUDF udf = ((ExprNodeGenericFuncDesc)expr).getGenericUDF();
       boolean isAnd = udf instanceof GenericUDFOPAnd;
-      if (isAnd || udf instanceof GenericUDFOPOr) {
+      boolean isOr = udf instanceof GenericUDFOPOr;
+      
+      if (isAnd || isOr) {
         List<ExprNodeDesc> children = expr.getChildren();
-        ExprNodeDesc left = removeTruePredciates(children.get(0));
-        children.set(0, left == null ? null : compactExpr(left));
-        ExprNodeDesc right = removeTruePredciates(children.get(1));
-        children.set(1, right == null ? null : compactExpr(right));
-
-        // Note that one does not simply compact (not-null or null) to not-null.
-        // Only if we have an "and" is it valid to send one side to metastore.
-        if (children.get(0) == null && children.get(1) == null) {
-          return null;
-        } else if (children.get(0) == null) {
-          return isAnd ? children.get(1) : null;
-        } else if (children.get(1) == null) {
-          return isAnd ? children.get(0) : null;
-        }
+        ExprNodeDesc left = children.get(0);
+        children.set(0, compactExpr(left));
+        ExprNodeDesc right = children.get(1);
+        children.set(1, compactExpr(right));
+
+        if (isTrueExpr(children.get(0)) && isTrueExpr(children.get(1))) {
+        	return new ExprNodeConstantDesc(Boolean.TRUE);
+        } else if (isTrueExpr(children.get(0)))  {
+        	return isAnd ? children.get(1) :  new ExprNodeConstantDesc(Boolean.TRUE);
+        } else if (isTrueExpr(children.get(1))) {
+        	return isAnd ? children.get(0) : new ExprNodeConstantDesc(Boolean.TRUE);
+        } else if (isFalseExpr(children.get(0)) && isFalseExpr(children.get(1))) {
+        	return new ExprNodeConstantDesc(Boolean.FALSE);
+        } else if (isFalseExpr(children.get(0)))  {
+            return isAnd ? new ExprNodeConstantDesc(Boolean.FALSE) : children.get(1);
+        } else if (isFalseExpr(children.get(1))) {
+            return isAnd ? new ExprNodeConstantDesc(Boolean.FALSE) : children.get(0);
+        } 
+        
       }
       return expr;
     } else {
@@ -296,9 +320,9 @@ public class PartitionPruner implements 
       if (!partCols.contains(column)) {
         // Column doesn't appear to be a partition column for the table.
         return new ExprNodeConstantDesc(expr.getTypeInfo(), null);
-      }
+      } 
       referred.add(column);
-    }
+    }	        
     if (expr instanceof ExprNodeGenericFuncDesc) {
       List<ExprNodeDesc> children = expr.getChildren();
       for (int i = 0; i < children.size(); ++i) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Thu Oct 30 16:22:33 2014
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.exec.Re
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
-import org.apache.hadoop.hive.ql.exec.tez.DagUtils;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -151,28 +150,28 @@ public class StatsRulesProcFactory {
       Statistics parentStats = parent.getStatistics();
       AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
       HiveConf conf = aspCtx.getConf();
+      Statistics stats = null;
 
-      // SELECT (*) does not change the statistics. Just pass on the parent statistics
-      if (sop.getConf().isSelectStar()) {
+      if (parentStats != null) {
         try {
-          if (parentStats != null) {
-            sop.setStatistics(parentStats.clone());
-          }
+          stats = parentStats.clone();
         } catch (CloneNotSupportedException e) {
           throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg());
         }
-        return null;
       }
 
       try {
         if (satisfyPrecondition(parentStats)) {
-          Statistics stats = parentStats.clone();
-          List<ColStatistics> colStats =
-              StatsUtils.getColStatisticsFromExprMap(conf, parentStats, sop.getColumnExprMap(),
-                  sop.getSchema());
-          long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats);
+          // this will take care of mapping between input column names and output column names. The
+          // returned column stats will have the output column names.
+          List<ColStatistics> colStats = StatsUtils.getColStatisticsFromExprMap(conf, parentStats,
+              sop.getColumnExprMap(), sop.getSchema());
           stats.setColumnStats(colStats);
-          stats.setDataSize(setMaxIfInvalid(dataSize));
+          // in case of select(*) the data size does not change
+          if (!sop.getConf().isSelectStar() && !sop.getConf().isSelStarNoCompute()) {
+            long dataSize = StatsUtils.getDataSizeFromColumnStats(stats.getNumRows(), colStats);
+            stats.setDataSize(setMaxIfInvalid(dataSize));
+          }
           sop.setStatistics(stats);
 
           if (isDebugEnabled) {
@@ -889,16 +888,12 @@ public class StatsRulesProcFactory {
       GroupByDesc.Mode mode = desc.getMode();
 
       if (mode.equals(GroupByDesc.Mode.HASH)) {
-        float hashAggMem = conf.getFloatVar(
-            HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
-        float hashAggMaxThreshold = conf.getFloatVar(
-            HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
-
-        // get memory for container. May be use mapreduce.map.java.opts instead?
-        long totalMemory =
-            DagUtils.getContainerResource(conf).getMemory() * 1000L * 1000L;
-        long maxMemHashAgg = Math
-            .round(totalMemory * hashAggMem * hashAggMaxThreshold);
+        float hashAggMem = conf.getFloatVar(HiveConf.ConfVars.HIVEMAPAGGRHASHMEMORY);
+        float hashAggMaxThreshold = conf.getFloatVar(HiveConf.ConfVars.HIVEMAPAGGRMEMORYTHRESHOLD);
+
+        // get available map memory
+        long totalMemory = StatsUtils.getAvailableMemory(conf) * 1000L * 1000L;
+        long maxMemHashAgg = Math.round(totalMemory * hashAggMem * hashAggMaxThreshold);
 
         // estimated number of rows will be product of NDVs
         long numEstimatedRows = 1;
@@ -1026,11 +1021,17 @@ public class StatsRulesProcFactory {
    */
   public static class JoinStatsRule extends DefaultStatsRule implements NodeProcessor {
 
+    private boolean pkfkInferred = false;
+    private long newNumRows = 0;
+    private List<Operator<? extends OperatorDesc>> parents;
+    private CommonJoinOperator<? extends JoinDesc> jop;
+    private int numAttr = 1;
+
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
-      CommonJoinOperator<? extends JoinDesc> jop = (CommonJoinOperator<? extends JoinDesc>) nd;
-      List<Operator<? extends OperatorDesc>> parents = jop.getParentOperators();
+      jop = (CommonJoinOperator<? extends JoinDesc>) nd;
+      parents = jop.getParentOperators();
       AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
       HiveConf conf = aspCtx.getConf();
       boolean allStatsAvail = true;
@@ -1057,22 +1058,26 @@ public class StatsRulesProcFactory {
           Statistics stats = new Statistics();
           Map<String, Long> rowCountParents = new HashMap<String, Long>();
           List<Long> distinctVals = Lists.newArrayList();
-
-          // 2 relations, multiple attributes
-          boolean multiAttr = false;
-          int numAttr = 1;
           int numParent = parents.size();
-
           Map<String, ColStatistics> joinedColStats = Maps.newHashMap();
           Map<Integer, List<String>> joinKeys = Maps.newHashMap();
           List<Long> rowCounts = Lists.newArrayList();
 
+          // detect if there are multiple attributes in join key
+          ReduceSinkOperator rsOp = (ReduceSinkOperator) jop.getParentOperators().get(0);
+          List<ExprNodeDesc> keyExprs = rsOp.getConf().getKeyCols();
+          numAttr = keyExprs.size();
+
+          // infer PK-FK relationship in single attribute join case
+          pkfkInferred = false;
+          inferPKFKRelationship();
+
           // get the join keys from parent ReduceSink operators
           for (int pos = 0; pos < parents.size(); pos++) {
             ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos);
 
             Statistics parentStats = parent.getStatistics();
-            List<ExprNodeDesc> keyExprs = parent.getConf().getKeyCols();
+            keyExprs = parent.getConf().getKeyCols();
 
             // Parent RS may have column statistics from multiple parents.
             // Populate table alias to row count map, this will be used later to
@@ -1087,12 +1092,6 @@ public class StatsRulesProcFactory {
             }
             rowCounts.add(parentStats.getNumRows());
 
-            // multi-attribute join key
-            if (keyExprs.size() > 1) {
-              multiAttr = true;
-              numAttr = keyExprs.size();
-            }
-
             // compute fully qualified join key column names. this name will be
             // used to quickly look-up for column statistics of join key.
             // TODO: expressions in join condition will be ignored. assign
@@ -1115,7 +1114,7 @@ public class StatsRulesProcFactory {
           // attribute join, else max(V(R,y1), V(S,y1)) * max(V(R,y2), V(S,y2))
           // in case of multi-attribute join
           long denom = 1;
-          if (multiAttr) {
+          if (numAttr > 1) {
             List<Long> perAttrDVs = Lists.newArrayList();
             for (int idx = 0; idx < numAttr; idx++) {
               for (Integer i : joinKeys.keySet()) {
@@ -1154,9 +1153,7 @@ public class StatsRulesProcFactory {
           }
 
           // Update NDV of joined columns to be min(V(R,y), V(S,y))
-          if (multiAttr) {
-            updateJoinColumnsNDV(joinKeys, joinedColStats, numAttr);
-          }
+          updateJoinColumnsNDV(joinKeys, joinedColStats, numAttr);
 
           // column statistics from different sources are put together and rename
           // fully qualified column names based on output schema of join operator
@@ -1186,10 +1183,8 @@ public class StatsRulesProcFactory {
 
           // update join statistics
           stats.setColumnStats(outColStats);
-          long newRowCount = computeNewRowCount(rowCounts, denom);
-
-          updateStatsForJoinType(stats, newRowCount, jop, rowCountParents,
-              outInTabAlias);
+          long newRowCount = pkfkInferred ? newNumRows : computeNewRowCount(rowCounts, denom);
+          updateStatsForJoinType(stats, newRowCount, jop, rowCountParents,outInTabAlias);
           jop.setStatistics(stats);
 
           if (isDebugEnabled) {
@@ -1234,6 +1229,195 @@ public class StatsRulesProcFactory {
       return null;
     }
 
+    private void inferPKFKRelationship() {
+      if (numAttr == 1) {
+        List<Integer> parentsWithPK = getPrimaryKeyCandidates(parents);
+
+        // in case of fact to many dimensional tables join, the join key in fact table will be
+        // mostly foreign key which will have corresponding primary key in dimension table.
+        // The selectivity of fact table in that case will be product of all selectivities of
+        // dimension tables (assumes conjunctivity)
+        for (Integer id : parentsWithPK) {
+          ColStatistics csPK = null;
+          Operator<? extends OperatorDesc> parent = parents.get(id);
+          for (ColStatistics cs : parent.getStatistics().getColumnStats()) {
+            if (cs.isPrimaryKey()) {
+              csPK = cs;
+              break;
+            }
+          }
+
+          // infer foreign key candidates positions
+          List<Integer> parentsWithFK = getForeignKeyCandidates(parents, csPK);
+          if (parentsWithFK.size() == 1 &&
+              parentsWithFK.size() + parentsWithPK.size() == parents.size()) {
+            Operator<? extends OperatorDesc> parentWithFK = parents.get(parentsWithFK.get(0));
+            List<Float> parentsSel = getSelectivity(parents, parentsWithPK);
+            Float prodSelectivity = 1.0f;
+            for (Float selectivity : parentsSel) {
+              prodSelectivity *= selectivity;
+            }
+            newNumRows = (long) Math.ceil(
+                parentWithFK.getStatistics().getNumRows() * prodSelectivity);
+            pkfkInferred = true;
+
+            // some debug information
+            if (isDebugEnabled) {
+              List<String> parentIds = Lists.newArrayList();
+
+              // print primary key containing parents
+              for (Integer i : parentsWithPK) {
+                parentIds.add(parents.get(i).toString());
+              }
+              LOG.debug("STATS-" + jop.toString() + ": PK parent id(s) - " + parentIds);
+              parentIds.clear();
+
+              // print foreign key containing parents
+              for (Integer i : parentsWithFK) {
+                parentIds.add(parents.get(i).toString());
+              }
+              LOG.debug("STATS-" + jop.toString() + ": FK parent id(s) - " + parentIds);
+            }
+          }
+        }
+      }
+    }
+
+    /**
+     * Get selectivity of reduce sink operators.
+     * @param ops - reduce sink operators
+     * @param opsWithPK - reduce sink operators with primary keys
+     * @return - list of selectivity for primary key containing operators
+     */
+    private List<Float> getSelectivity(List<Operator<? extends OperatorDesc>> ops,
+        List<Integer> opsWithPK) {
+      List<Float> result = Lists.newArrayList();
+      for (Integer idx : opsWithPK) {
+        Operator<? extends OperatorDesc> op = ops.get(idx);
+        float selectivity = getSelectivitySimpleTree(op);
+        result.add(selectivity);
+      }
+      return result;
+    }
+
+    private float getSelectivitySimpleTree(Operator<? extends OperatorDesc> op) {
+      TableScanOperator tsOp = OperatorUtils
+          .findSingleOperatorUpstream(op, TableScanOperator.class);
+      if (tsOp == null) {
+        // complex tree with multiple parents
+        return getSelectivityComplexTree(op);
+      } else {
+        // simple tree with single parent
+        long inputRow = tsOp.getStatistics().getNumRows();
+        long outputRow = op.getStatistics().getNumRows();
+        return (float) outputRow / (float) inputRow;
+      }
+    }
+
+    private float getSelectivityComplexTree(Operator<? extends OperatorDesc> op) {
+      Operator<? extends OperatorDesc> multiParentOp = null;
+      Operator<? extends OperatorDesc> currentOp = op;
+
+      // TS-1      TS-2
+      //  |          |
+      // RS-1      RS-2
+      //    \      /
+      //      JOIN
+      //        |
+      //       FIL
+      //        |
+      //       RS-3
+      //
+      // For the above complex operator tree,
+      // selectivity(JOIN) = selectivity(RS-1) * selectivity(RS-2) and
+      // selectivity(RS-3) = numRows(RS-3)/numRows(JOIN) * selectivity(JOIN)
+      while(multiParentOp == null) {
+        if (op.getParentOperators().size() > 1) {
+          multiParentOp = op;
+        } else {
+          op = op.getParentOperators().get(0);
+        }
+      }
+
+      float selMultiParent = 1.0f;
+      for(Operator<? extends OperatorDesc> parent : multiParentOp.getParentOperators()) {
+        // In the above example, TS-1 -> RS-1 and TS-2 -> RS-2 are simple trees
+        selMultiParent *= getSelectivitySimpleTree(parent);
+      }
+
+      float selCurrOp = ((float) currentOp.getStatistics().getNumRows() /
+          (float) multiParentOp.getStatistics().getNumRows()) * selMultiParent;
+
+      return selCurrOp;
+    }
+
+    /**
+     * Returns the index of parents whose join key column statistics ranges are within the specified
+     * primary key range (inferred as foreign keys).
+     * @param ops - operators
+     * @param csPK - column statistics of primary key
+     * @return - list of foreign key containing parent ids
+     */
+    private List<Integer> getForeignKeyCandidates(List<Operator<? extends OperatorDesc>> ops,
+        ColStatistics csPK) {
+      List<Integer> result = Lists.newArrayList();
+      if (csPK == null || ops == null) {
+        return result;
+      }
+
+      for (int i = 0; i < ops.size(); i++) {
+        Operator<? extends OperatorDesc> op = ops.get(i);
+        if (op != null && op instanceof ReduceSinkOperator) {
+          ReduceSinkOperator rsOp = (ReduceSinkOperator) op;
+          List<ExprNodeDesc> keys = rsOp.getConf().getKeyCols();
+          List<String> fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys,
+              rsOp.getColumnExprMap());
+          if (fqCols.size() == 1) {
+            String joinCol = fqCols.get(0);
+            if (rsOp.getStatistics() != null) {
+              ColStatistics cs = rsOp.getStatistics().getColumnStatisticsFromFQColName(joinCol);
+              if (cs != null && !cs.isPrimaryKey()) {
+                if (StatsUtils.inferForeignKey(csPK, cs)) {
+                  result.add(i);
+                }
+              }
+            }
+          }
+        }
+      }
+      return result;
+    }
+
+    /**
+     * Returns the index of parents whose join key columns are infer as primary keys
+     * @param ops - operators
+     * @return - list of primary key containing parent ids
+     */
+    private List<Integer> getPrimaryKeyCandidates(List<Operator<? extends OperatorDesc>> ops) {
+      List<Integer> result = Lists.newArrayList();
+      if (ops != null && !ops.isEmpty()) {
+        for (int i = 0; i < ops.size(); i++) {
+          Operator<? extends OperatorDesc> op = ops.get(i);
+          if (op instanceof ReduceSinkOperator) {
+            ReduceSinkOperator rsOp = (ReduceSinkOperator) op;
+            List<ExprNodeDesc> keys = rsOp.getConf().getKeyCols();
+            List<String> fqCols = StatsUtils.getFullQualifedColNameFromExprs(keys,
+                rsOp.getColumnExprMap());
+            if (fqCols.size() == 1) {
+              String joinCol = fqCols.get(0);
+              if (rsOp.getStatistics() != null) {
+                ColStatistics cs = rsOp.getStatistics().getColumnStatisticsFromFQColName(joinCol);
+                if (cs != null && cs.isPrimaryKey()) {
+                  result.add(i);
+                }
+              }
+            }
+          }
+        }
+      }
+      return result;
+    }
+
     private Long getEasedOutDenominator(List<Long> distinctVals) {
       // Exponential back-off for NDVs.
       // 1) Descending order sort of NDVs
@@ -1253,7 +1437,7 @@ public class StatsRulesProcFactory {
         Map<String, Long> rowCountParents,
         Map<String, String> outInTabAlias) {
 
-      if (newNumRows <= 0) {
+      if (newNumRows < 0) {
         LOG.info("STATS-" + jop.toString() + ": Overflow in number of rows."
           + newNumRows + " rows will be set to Long.MAX_VALUE");
       }
@@ -1528,6 +1712,8 @@ public class StatsRulesProcFactory {
         Object... nodeOutputs) throws SemanticException {
       Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
       OperatorDesc conf = op.getConf();
+      AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
+      HiveConf hconf = aspCtx.getConf();
 
       if (conf != null) {
         Statistics stats = conf.getStatistics();
@@ -1544,7 +1730,9 @@ public class StatsRulesProcFactory {
                   stats.addToNumRows(parentStats.getNumRows());
                   stats.addToDataSize(parentStats.getDataSize());
                   stats.updateColumnStatsState(parentStats.getColumnStatsState());
-                  stats.addToColumnStats(parentStats.getColumnStats());
+                  List<ColStatistics> colStats = StatsUtils.getColStatisticsFromExprMap(hconf,
+                      parentStats, op.getColumnExprMap(), op.getSchema());
+                  stats.addToColumnStats(colStats);
                   op.getConf().setStatistics(stats);
 
                   if (isDebugEnabled) {
@@ -1622,7 +1810,7 @@ public class StatsRulesProcFactory {
       boolean useColStats, Operator<? extends OperatorDesc> op,
       boolean updateNDV) {
 
-    if (newNumRows <= 0) {
+    if (newNumRows < 0) {
       LOG.info("STATS-" + op.toString() + ": Overflow in number of rows."
           + newNumRows + " rows will be set to Long.MAX_VALUE");
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Thu Oct 30 16:22:33 2014
@@ -79,7 +79,7 @@ import com.google.common.annotations.Vis
  *
  */
 public abstract class BaseSemanticAnalyzer {
-  private static final Log STATIC_LOG = LogFactory.getLog(BaseSemanticAnalyzer.class.getName());
+  protected static final Log STATIC_LOG = LogFactory.getLog(BaseSemanticAnalyzer.class.getName());
   protected final Hive db;
   protected final HiveConf conf;
   protected List<Task<? extends Serializable>> rootTasks;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java Thu Oct 30 16:22:33 2014
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Index;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.ql.Driver;
@@ -214,24 +215,6 @@ public class DDLSemanticAnalyzer extends
     return typeName;
   }
 
-  static class TablePartition {
-    String tableName;
-    HashMap<String, String> partSpec = null;
-
-    public TablePartition() {
-    }
-
-    public TablePartition(ASTNode tblPart) throws SemanticException {
-      tableName = getDotName((getQualifiedTableName((ASTNode) tblPart.getChild(0))));
-      if (tblPart.getChildCount() > 1) {
-        ASTNode part = (ASTNode) tblPart.getChild(1);
-        if (part.getToken().getType() == HiveParser.TOK_PARTSPEC) {
-          this.partSpec = DDLSemanticAnalyzer.getPartSpec(part);
-        }
-      }
-    }
-  }
-
   public DDLSemanticAnalyzer(HiveConf conf) throws SemanticException {
     this(conf, createHiveDB(conf));
   }
@@ -1034,7 +1017,7 @@ public class DDLSemanticAnalyzer extends
     rootTasks.add(truncateTask);
   }
 
-  private boolean isFullSpec(Table table, Map<String, String> partSpec) {
+  public static boolean isFullSpec(Table table, Map<String, String> partSpec) {
     for (FieldSchema partCol : table.getPartCols()) {
       if (partSpec.get(partCol.getName()) == null) {
         return false;
@@ -1139,20 +1122,25 @@ public class DDLSemanticAnalyzer extends
     // configured not to ignore this
     boolean throwException =
         !ifExists && !HiveConf.getBoolVar(conf, ConfVars.DROPIGNORESNONEXISTENT);
-    if (throwException) {
-      try {
-        Index idx = db.getIndex(tableName, indexName);
-      } catch (HiveException e) {
+    Table tbl = getTable(tableName, false);
+    if (throwException && tbl == null) {
+      throw new SemanticException(ErrorMsg.INVALID_TABLE.getMsg(tableName));
+    }
+    try {
+      Index idx = db.getIndex(tableName, indexName);
+    } catch (HiveException e) {
+      if (!(e.getCause() instanceof NoSuchObjectException)) {
+        throw new SemanticException(ErrorMsg.GENERIC_ERROR.getMsg("dropping index"), e);
+      }
+      if (throwException) {
         throw new SemanticException(ErrorMsg.INVALID_INDEX.getMsg(indexName));
       }
     }
-
-    Table tbl = getTable(tableName, false);
     if (tbl != null) {
-      inputs.add(new ReadEntity(getTable(tableName)));
+      inputs.add(new ReadEntity(tbl));
     }
 
-    DropIndexDesc dropIdxDesc = new DropIndexDesc(indexName, tableName);
+    DropIndexDesc dropIdxDesc = new DropIndexDesc(indexName, tableName, throwException);
     rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(),
         dropIdxDesc), conf));
   }
@@ -1399,11 +1387,22 @@ public class DDLSemanticAnalyzer extends
       // ReadEntity as no lock.
       re.noLockNeeded();
       inputs.add(re);
-      if (desc == null || desc.getOp() != AlterTableDesc.AlterTableTypes.ALTERPROTECTMODE) {
+
+      if (isFullSpec(tab, partSpec)) {
+        // Fully specified partition spec
         Partition part = getPartition(tab, partSpec, true);
-        outputs.add(new WriteEntity(part, writeType));
-      }
-      else {
+        outputs.add(new WriteEntity(part, writeType));        
+      } else {
+        // Partial partition spec supplied. Make sure this is allowed.
+        if (desc == null
+            || !AlterTableDesc.doesAlterTableTypeSupportPartialPartitionSpec(desc.getOp())) {
+          String alterTabletype = (desc != null) ? desc.getOp().name() : "";
+          throw new SemanticException(
+              ErrorMsg.ALTER_TABLE_TYPE_PARTIAL_PARTITION_SPEC_NO_SUPPORTED, alterTabletype);
+        } else if (!conf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
+          throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_DISABLED);
+        }
+
         for (Partition part : getPartitions(tab, partSpec, true)) {
           outputs.add(new WriteEntity(part, writeType));
         }
@@ -2240,6 +2239,10 @@ public class DDLSemanticAnalyzer extends
     if (ast.getChildCount() == 1) {
       String funcNames = stripQuotes(ast.getChild(0).getText());
       showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile(), funcNames);
+    } else if (ast.getChildCount() == 2) {
+      assert (ast.getChild(0).getType() == HiveParser.KW_LIKE);
+      String funcNames = stripQuotes(ast.getChild(1).getText());
+      showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile(), funcNames, true);
     } else {
       showFuncsDesc = new ShowFunctionsDesc(ctx.getResFile());
     }
@@ -2805,7 +2808,7 @@ public class DDLSemanticAnalyzer extends
    * @param ast
    *          The parsed command tree.
    * @throws SemanticException
-   *           Parsin failed
+   *           Parsing failed
    */
   private void analyzeAlterTableTouch(String[] qualified, CommonTree ast)
       throws SemanticException {
@@ -2929,8 +2932,8 @@ public class DDLSemanticAnalyzer extends
    *
    * @param ast Tree to extract partitions from.
    * @param tab Table.
-   * @param result Map of partitions by prefix length. Most of the time prefix length will
-   *               be the same for all partition specs, so we can just OR the expressions.
+   * @return    Map of partitions by prefix length. Most of the time prefix length will
+   *            be the same for all partition specs, so we can just OR the expressions.
    */
   private Map<Integer, List<ExprNodeGenericFuncDesc>> getFullPartitionSpecs(
       CommonTree ast, Table tab, boolean canGroupExprs) throws SemanticException {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Thu Oct 30 16:22:33 2014
@@ -57,6 +57,8 @@ import org.apache.hadoop.hive.ql.plan.Un
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
 
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.AUTOPARALLEL;
+
 /**
  * GenTezUtils is a collection of shared helper methods to produce
  * TezWork
@@ -117,7 +119,7 @@ public class GenTezUtils {
 
     reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
 
-    if (isAutoReduceParallelism && reduceSink.getConf().isAutoParallel()) {
+    if (isAutoReduceParallelism && reduceSink.getConf().getReducerTraits().contains(AUTOPARALLEL)) {
       reduceWork.setAutoReduceParallelism(true);
 
       // configured limit for reducers

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Thu Oct 30 16:22:33 2014
@@ -123,6 +123,11 @@ public class GenTezWork implements NodeP
       context.rootToWorkMap.put(root, work);
     }
 
+    // this is where we set the sort columns that we will be using for KeyValueInputMerge
+    if (operator instanceof DummyStoreOperator) {
+      work.addSortCols(root.getOpTraits().getSortCols().get(0));
+    }
+
     if (!context.childToWorkMap.containsKey(operator)) {
       List<BaseWork> workItems = new LinkedList<BaseWork>();
       workItems.add(work);
@@ -137,17 +142,18 @@ public class GenTezWork implements NodeP
       // we are currently walking the big table side of the merge join. we need to create or hook up
       // merge join work.
       MergeJoinWork mergeJoinWork = null;
-      if (context.opMergeJoinWorkMap.containsKey(operator)) {
+      if (context.opMergeJoinWorkMap.containsKey(context.currentMergeJoinOperator)) {
         // we have found a merge work corresponding to this closing operator. Hook up this work.
-        mergeJoinWork = context.opMergeJoinWorkMap.get(operator);
+        mergeJoinWork = context.opMergeJoinWorkMap.get(context.currentMergeJoinOperator);
       } else {
         // we need to create the merge join work
         mergeJoinWork = new MergeJoinWork();
         mergeJoinWork.setMergeJoinOperator(context.currentMergeJoinOperator);
         tezWork.add(mergeJoinWork);
-        context.opMergeJoinWorkMap.put(operator, mergeJoinWork);
+        context.opMergeJoinWorkMap.put(context.currentMergeJoinOperator, mergeJoinWork);
       }
       // connect the work correctly.
+      work.addSortCols(root.getOpTraits().getSortCols().get(0));
       mergeJoinWork.addMergedWork(work, null);
       Operator<? extends OperatorDesc> parentOp =
           getParentFromStack(context.currentMergeJoinOperator, stack);
@@ -334,10 +340,16 @@ public class GenTezWork implements NodeP
           UnionWork unionWork = (UnionWork) followingWork;
           int index = getMergeIndex(tezWork, unionWork, rs);
           // guaranteed to be instance of MergeJoinWork if index is valid
-          MergeJoinWork mergeJoinWork = (MergeJoinWork) tezWork.getChildren(unionWork).get(index);
-          // disconnect the connection to union work and connect to merge work
-          followingWork = mergeJoinWork;
-          rWork = (ReduceWork) mergeJoinWork.getMainWork();
+          BaseWork baseWork = tezWork.getChildren(unionWork).get(index);
+          if (baseWork instanceof MergeJoinWork) {
+            MergeJoinWork mergeJoinWork = (MergeJoinWork) baseWork;
+            // disconnect the connection to union work and connect to merge work
+            followingWork = mergeJoinWork;
+            rWork = (ReduceWork) mergeJoinWork.getMainWork();
+          } else {
+            throw new SemanticException("Unknown work type found: "
+                + baseWork.getClass().getCanonicalName());
+          }
         } else {
           rWork = (ReduceWork) followingWork;
         }
@@ -391,6 +403,8 @@ public class GenTezWork implements NodeP
         } else {
           index++;
         }
+      } else {
+        index++;
       }
     }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Thu Oct 30 16:22:33 2014
@@ -1334,7 +1334,7 @@ showStatement
     | KW_SHOW KW_TABLES ((KW_FROM|KW_IN) db_name=identifier)? (KW_LIKE showStmtIdentifier|showStmtIdentifier)?  -> ^(TOK_SHOWTABLES (TOK_FROM $db_name)? showStmtIdentifier?)
     | KW_SHOW KW_COLUMNS (KW_FROM|KW_IN) tableName ((KW_FROM|KW_IN) db_name=identifier)?
     -> ^(TOK_SHOWCOLUMNS tableName $db_name?)
-    | KW_SHOW KW_FUNCTIONS showFunctionIdentifier?  -> ^(TOK_SHOWFUNCTIONS showFunctionIdentifier?)
+    | KW_SHOW KW_FUNCTIONS (KW_LIKE showFunctionIdentifier|showFunctionIdentifier)?  -> ^(TOK_SHOWFUNCTIONS KW_LIKE? showFunctionIdentifier?)
     | KW_SHOW KW_PARTITIONS tabName=tableName partitionSpec? -> ^(TOK_SHOWPARTITIONS $tabName partitionSpec?) 
     | KW_SHOW KW_CREATE KW_TABLE tabName=tableName -> ^(TOK_SHOW_CREATETABLE $tabName)
     | KW_SHOW KW_TABLE KW_EXTENDED ((KW_FROM|KW_IN) db_name=identifier)? KW_LIKE showStmtIdentifier partitionSpec?

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContext.java Thu Oct 30 16:22:33 2014
@@ -57,4 +57,12 @@ public interface HiveSemanticAnalyzerHoo
   public String getUserName();
 
   public void setUserName(String userName);
+
+  public String getIpAddress();
+
+  public void setIpAddress(String ipAddress);
+
+  public String getCommand();
+
+  public void setCommand(String command);
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveSemanticAnalyzerHookContextImpl.java Thu Oct 30 16:22:33 2014
@@ -33,6 +33,8 @@ public class HiveSemanticAnalyzerHookCon
   Set<ReadEntity> inputs = null;
   Set<WriteEntity> outputs = null;
   private String userName;
+  private String ipAddress;
+  private String command;
 
   @Override
   public Hive getHive() throws HiveException {
@@ -73,4 +75,24 @@ public class HiveSemanticAnalyzerHookCon
   public void setUserName(String userName) {
     this.userName = userName;
   }
+
+  @Override
+  public String getIpAddress() {
+    return ipAddress;
+  }
+
+  @Override
+  public void setIpAddress(String ipAddress) {
+    this.ipAddress = ipAddress;
+  }
+
+  @Override
+  public String getCommand() {
+    return command;
+  }
+
+  @Override
+  public void setCommand(String command) {
+    this.command = command;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java Thu Oct 30 16:22:33 2014
@@ -130,6 +130,10 @@ public class QB {
     return (outer_id == null ? alias : outer_id + ":" + alias);
   }
 
+  public String getAlias() {
+    return qbp.getAlias();
+  }
+
   public QBParseInfo getParseInfo() {
     return qbp;
   }
@@ -248,6 +252,12 @@ public class QB {
     return isQuery;
   }
 
+  // to decide whether to rewrite RR of subquery
+  public boolean isTopLevelSelectStarQuery() {
+    return !isCTAS() && qbp.isTopLevelSimpleSelectStarQuery();
+  }
+
+  // to find target for fetch task conversion optimizer (not allows subqueries)
   public boolean isSimpleSelectQuery() {
     return qbp.isSimpleSelectQuery() && aliasToSubq.isEmpty() && !isCTAS() &&
         !qbp.isAnalyzeCommand();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBParseInfo.java Thu Oct 30 16:22:33 2014
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.antlr.runtime.tree.Tree;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
@@ -449,39 +450,49 @@ public class QBParseInfo {
     this.outerQueryLimit = outerQueryLimit;
   }
 
+  public boolean isTopLevelSimpleSelectStarQuery() {
+    if (alias != null || destToSelExpr.size() != 1 || !isSimpleSelectQuery()) {
+      return false;
+    }
+    for (ASTNode selExprs : destToSelExpr.values()) {
+      if (selExprs.getChildCount() != 1) {
+        return false;
+      }
+      Tree sel = selExprs.getChild(0).getChild(0);
+      if (sel == null || sel.getType() != HiveParser.TOK_ALLCOLREF) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   public boolean isSimpleSelectQuery() {
-    if (isSubQ || (joinExpr != null)
-        || (!destToGroupby.isEmpty()) || (!destToClusterby.isEmpty())
-        || (!aliasToLateralViews.isEmpty())) {
+    if (isSubQ || joinExpr != null || !destToOrderby.isEmpty() || !destToSortby.isEmpty()
+        || !destToGroupby.isEmpty() || !destToClusterby.isEmpty() || !destToDistributeby.isEmpty()
+        || !aliasToLateralViews.isEmpty() || !destToLateralView.isEmpty()) {
       return false;
     }
 
-    Iterator<Map.Entry<String, LinkedHashMap<String, ASTNode>>> aggrIter = destToAggregationExprs
-        .entrySet().iterator();
-    while (aggrIter.hasNext()) {
-      HashMap<String, ASTNode> h = aggrIter.next().getValue();
-      if ((h != null) && (!h.isEmpty())) {
+    for (Map<String, ASTNode> entry : destToAggregationExprs.values()) {
+      if (entry != null && !entry.isEmpty()) {
         return false;
       }
     }
 
-    if (!destToDistinctFuncExprs.isEmpty()) {
-      Iterator<Map.Entry<String, List<ASTNode>>> distn = destToDistinctFuncExprs
-          .entrySet().iterator();
-      while (distn.hasNext()) {
-        List<ASTNode> ct = distn.next().getValue();
-        if (!ct.isEmpty()) {
-          return false;
-        }
+    for (Map<String, ASTNode> entry : destToWindowingExprs.values()) {
+      if (entry != null && !entry.isEmpty()) {
+        return false;
+      }
+    }
+
+    for (List<ASTNode> ct : destToDistinctFuncExprs.values()) {
+      if (!ct.isEmpty()) {
+        return false;
       }
     }
 
-    Iterator<Map.Entry<String, ASTNode>> iter = nameToDest.entrySet()
-        .iterator();
-    while (iter.hasNext()) {
-      Map.Entry<String, ASTNode> entry = iter.next();
-      ASTNode v = entry.getValue();
-      if (!(((ASTNode)v.getChild(0)).getToken().getType() == HiveParser.TOK_TMP_FILE)) {
+    for (ASTNode v : nameToDest.values()) {
+      if (!(v.getChild(0).getType() == HiveParser.TOK_TMP_FILE)) {
         return false;
       }
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Thu Oct 30 16:22:33 2014
@@ -29,7 +29,6 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 
@@ -120,7 +119,11 @@ public class RowResolver implements Seri
       f_map = new LinkedHashMap<String, ColumnInfo>();
       rslvMap.put(tab_alias, f_map);
     }
-    f_map.put(col_alias, colInfo);
+    ColumnInfo oldColInfo = f_map.put(col_alias, colInfo);
+    if (oldColInfo != null) {
+      LOG.warn("Duplicate column info for " + tab_alias + "." + col_alias
+          + " was overwritten in RowResolver map: " + oldColInfo + " by " + colInfo);
+    }
 
     String[] qualifiedAlias = new String[2];
     qualifiedAlias[0] = tab_alias;
@@ -195,17 +198,6 @@ public class RowResolver implements Seri
     return ret;
   }
 
-  /**
-   * check if column name is already exist in RR
-   */
-  public void checkColumn(String tableAlias, String columnAlias) throws SemanticException {
-    ColumnInfo prev = get(null, columnAlias);
-    if (prev != null &&
-        (tableAlias == null || !tableAlias.equalsIgnoreCase(prev.getTabAlias()))) {
-      throw new SemanticException(ErrorMsg.AMBIGUOUS_COLUMN.getMsg(columnAlias));
-    }
-  }
-
   public ArrayList<ColumnInfo> getColumnInfos() {
     return rowSchema.getSignature();
   }
@@ -351,40 +343,44 @@ public class RowResolver implements Seri
     this.expressionMap = expressionMap;
   }
 
+  private static class IntRef {
+    public int val = 0;
+  }
 
-  // TODO: 1) How to handle collisions? 2) Should we be cloning ColumnInfo or
-  // not?
-  public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
-      int outputColPos, int numColumns) throws SemanticException {
+  public static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom, int numColumns)
+      throws SemanticException {
+    return add(rrToAddTo, rrToAddFrom, null, numColumns);
+  }
+
+  // TODO: 1) How to handle collisions? 2) Should we be cloning ColumnInfo or not?
+  private static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
+      IntRef outputColPosRef, int numColumns) throws SemanticException {
+    boolean hasDuplicates = false;
     String tabAlias;
     String colAlias;
     String[] qualifiedColName;
     int i = 0;
 
+    int outputColPos = outputColPosRef == null ? 0 : outputColPosRef.val;
     for (ColumnInfo cInfoFrmInput : rrToAddFrom.getRowSchema().getSignature()) {
       if ( numColumns >= 0 && i == numColumns ) {
         break;
       }
       ColumnInfo newCI = null;
-      qualifiedColName = rrToAddFrom.getInvRslvMap().get(
-          cInfoFrmInput.getInternalName());
+      String internalName = cInfoFrmInput.getInternalName();
+      qualifiedColName = rrToAddFrom.reverseLookup(internalName);
       tabAlias = qualifiedColName[0];
       colAlias = qualifiedColName[1];
 
       newCI = new ColumnInfo(cInfoFrmInput);
-      newCI.setInternalName(SemanticAnalyzer
-          .getColumnInternalName(outputColPos));
+      newCI.setInternalName(SemanticAnalyzer.getColumnInternalName(outputColPos));
 
       outputColPos++;
 
-      if (rrToAddTo.get(tabAlias, colAlias) != null) {
-        LOG.debug("Found duplicate column alias in RR: " + rrToAddTo.get(tabAlias, colAlias));
-      } else {
-        rrToAddTo.put(tabAlias, colAlias, newCI);
-      }
+      boolean isUnique = rrToAddTo.putWithCheck(tabAlias, colAlias, internalName, newCI);
+      hasDuplicates |= (!isUnique);
 
-      qualifiedColName = rrToAddFrom.getAlternateMappings(cInfoFrmInput
-          .getInternalName());
+      qualifiedColName = rrToAddFrom.getAlternateMappings(internalName);
       if (qualifiedColName != null) {
         tabAlias = qualifiedColName[0];
         colAlias = qualifiedColName[1];
@@ -393,31 +389,73 @@ public class RowResolver implements Seri
       i++;
     }
 
-    return outputColPos;
-	}
+    if (outputColPosRef != null) {
+      outputColPosRef.val = outputColPos;
+    }
+    return !hasDuplicates;
+  }
 
-  public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
-      int outputColPos) throws SemanticException {
-    return add(rrToAddTo, rrToAddFrom, outputColPos, -1);
-  }
-
-	/**
-	 * Return a new row resolver that is combination of left RR and right RR.
-	 * The schema will be schema of left, schema of right
-	 *
-	 * @param leftRR
-	 * @param rightRR
-	 * @return
-	 * @throws SemanticException
-	 */
-	public static RowResolver getCombinedRR(RowResolver leftRR,
-			RowResolver rightRR) throws SemanticException {
-		int outputColPos = 0;
-
-		RowResolver combinedRR = new RowResolver();
-		outputColPos = add(combinedRR, leftRR, outputColPos);
-		outputColPos = add(combinedRR, rightRR, outputColPos);
+  /**
+   * Adds column to RR, checking for duplicate columns. Needed because CBO cannot handle the Hive
+   * behavior of blindly overwriting old mapping in RR and still somehow working after that.
+   * @return True if mapping was added without duplicates.
+   */
+  public boolean putWithCheck(String tabAlias, String colAlias,
+      String internalName, ColumnInfo newCI) throws SemanticException {
+    ColumnInfo existing = get(tabAlias, colAlias);
+    // Hive adds the same mapping twice... I wish we could fix stuff like that.
+    if (existing == null) {
+      put(tabAlias, colAlias, newCI);
+      return true;
+    } else if (existing.isSameColumnForRR(newCI)) {
+      return true;
+    }
+    LOG.warn("Found duplicate column alias in RR: "
+        + existing.toMappingString(tabAlias, colAlias) + " adding "
+        + newCI.toMappingString(tabAlias, colAlias));
+    if (internalName != null) {
+      existing = get(tabAlias, internalName);
+      if (existing == null) {
+        put(tabAlias, internalName, newCI);
+        return true;
+      } else if (existing.isSameColumnForRR(newCI)) {
+        return true;
+      }
+      LOG.warn("Failed to use internal name after finding a duplicate: "
+          + existing.toMappingString(tabAlias, internalName));
+    }
+    return false;
+  }
+
+  private static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
+      IntRef outputColPosRef) throws SemanticException {
+    return add(rrToAddTo, rrToAddFrom, outputColPosRef, -1);
+  }
+
+  public static boolean add(RowResolver rrToAddTo, RowResolver rrToAddFrom)
+      throws SemanticException {
+    return add(rrToAddTo, rrToAddFrom, null, -1);
+  }
 
-		return combinedRR;
-	}
+  /**
+   * Return a new row resolver that is combination of left RR and right RR.
+   * The schema will be schema of left, schema of right
+   *
+   * @param leftRR
+   * @param rightRR
+   * @return
+   * @throws SemanticException
+   */
+  public static RowResolver getCombinedRR(RowResolver leftRR,
+      RowResolver rightRR) throws SemanticException {
+    RowResolver combinedRR = new RowResolver();
+    IntRef outputColPos = new IntRef();
+    if (!add(combinedRR, leftRR, outputColPos)) {
+      LOG.warn("Duplicates detected when adding columns to RR: see previous message");
+    }
+    if (!add(combinedRR, rightRR, outputColPos)) {
+      LOG.warn("Duplicates detected when adding columns to RR: see previous message");
+    }
+    return combinedRR;
+  }
 }