You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/10 03:33:55 UTC

svn commit: r1617040 [11/13] - in /hive/branches/spark: ./ ant/src/org/apache/hadoop/hive/ant/ beeline/ beeline/src/java/org/apache/hive/beeline/ common/src/java/org/apache/hadoop/hive/common/ common/src/java/org/apache/hadoop/hive/conf/ data/conf/ dat...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java Sun Aug 10 01:33:50 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.serde2.Col
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.util.StringUtils;
 
+import parquet.column.ColumnDescriptor;
 import parquet.hadoop.api.ReadSupport;
 import parquet.io.api.RecordMaterializer;
 import parquet.schema.MessageType;
@@ -46,8 +47,8 @@ public class DataWritableReadSupport ext
 
   private static final String TABLE_SCHEMA = "table_schema";
   public static final String HIVE_SCHEMA_KEY = "HIVE_TABLE_SCHEMA";
-  public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access";  
-  
+  public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access";
+
   /**
    * From a string which columns names (including hive column), return a list
    * of string columns
@@ -75,12 +76,16 @@ public class DataWritableReadSupport ext
     final Map<String, String> contextMetadata = new HashMap<String, String>();
     if (columns != null) {
       final List<String> listColumns = getColumns(columns);
-
+      final Map<String, String> lowerCaseFileSchemaColumns = new HashMap<String,String>();
+      for (ColumnDescriptor c : fileSchema.getColumns()) {
+        lowerCaseFileSchemaColumns.put(c.getPath()[0].toLowerCase(), c.getPath()[0]);
+      }
       final List<Type> typeListTable = new ArrayList<Type>();
-      for (final String col : listColumns) {
+      for (String col : listColumns) {
+        col = col.toLowerCase();
         // listColumns contains partition columns which are metadata only
-        if (fileSchema.containsField(col)) {
-          typeListTable.add(fileSchema.getType(col));
+        if (lowerCaseFileSchemaColumns.containsKey(col)) {
+          typeListTable.add(fileSchema.getType(lowerCaseFileSchemaColumns.get(col)));
         } else {
           // below allows schema evolution
           typeListTable.add(new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, col));
@@ -93,10 +98,24 @@ public class DataWritableReadSupport ext
       final List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
 
       final List<Type> typeListWanted = new ArrayList<Type>();
+      final boolean indexAccess = configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false);
       for (final Integer idx : indexColumnsWanted) {
-        typeListWanted.add(tableSchema.getType(listColumns.get(idx)));
+        String col = listColumns.get(idx);
+        if (indexAccess) {
+          typeListWanted.add(tableSchema.getType(col));
+        } else {
+          col = col.toLowerCase();
+          if (lowerCaseFileSchemaColumns.containsKey(col)) {
+            typeListWanted.add(tableSchema.getType(lowerCaseFileSchemaColumns.get(col)));
+          } else {
+            // should never occur?
+            String msg = "Column " + col + " at index " + idx + " does not exist in " +
+              lowerCaseFileSchemaColumns;
+            throw new IllegalStateException(msg);
+          }
+        }
       }
-      requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(), 
+      requestedSchemaByUser = resolveSchemaAccess(new MessageType(fileSchema.getName(),
               typeListWanted), fileSchema, configuration);
 
       return new ReadContext(requestedSchemaByUser, contextMetadata);
@@ -127,29 +146,24 @@ public class DataWritableReadSupport ext
     }
     final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser.
         parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration);
-    
     return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema);
   }
-  
+
   /**
-  * Determine the file column names based on the position within the requested columns and 
+  * Determine the file column names based on the position within the requested columns and
   * use that as the requested schema.
   */
-  private MessageType resolveSchemaAccess(MessageType requestedSchema, MessageType fileSchema, 
+  private MessageType resolveSchemaAccess(MessageType requestedSchema, MessageType fileSchema,
           Configuration configuration) {
-    if(configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false)) {
+    if (configuration.getBoolean(PARQUET_COLUMN_INDEX_ACCESS, false)) {
       final List<String> listColumns = getColumns(configuration.get(IOConstants.COLUMNS));
-        
       List<Type> requestedTypes = new ArrayList<Type>();
-        
       for(Type t : requestedSchema.getFields()) {
         int index = listColumns.indexOf(t.getName());
         requestedTypes.add(fileSchema.getType(index));
       }
-          
       requestedSchema = new MessageType(requestedSchema.getName(), requestedTypes);
     }
-      
     return requestedSchema;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Sun Aug 10 01:33:50 2014
@@ -64,6 +64,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.RetryingMetaStoreClient;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
@@ -1241,7 +1242,7 @@ public class Hive {
            */
           FileSystem oldPartPathFS = oldPartPath.getFileSystem(getConf());
           FileSystem loadPathFS = loadPath.getFileSystem(getConf());
-          if (oldPartPathFS.equals(loadPathFS)) {
+          if (FileUtils.equalsFileSystem(oldPartPathFS,loadPathFS)) {
             newPartPath = oldPartPath;
           }
         }
@@ -2576,6 +2577,16 @@ private void constructOneLBLocationMap(F
     }
   }
 
+  public AggrStats getAggrColStatsFor(String dbName, String tblName,
+    List<String> colNames, List<String> partName) {
+    try {
+      return getMSC().getAggrColStatsFor(dbName, tblName, colNames, partName);
+    } catch (Exception e) {
+      LOG.debug(StringUtils.stringifyException(e));
+      return null;
+    }
+  }
+
   public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName)
     throws HiveException {
     try {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Sun Aug 10 01:33:50 2014
@@ -348,9 +348,9 @@ public final class ConstantPropagateProc
         ExprNodeDesc childExpr = newExprs.get(i);
         if (childExpr instanceof ExprNodeConstantDesc) {
           ExprNodeConstantDesc c = (ExprNodeConstantDesc) childExpr;
-          if (c.getValue() == Boolean.TRUE) {
+          if (Boolean.TRUE.equals(c.getValue())) {
 
-            // if true, prune it
+        	  // if true, prune it
             return newExprs.get(Math.abs(i - 1));
           } else {
 
@@ -366,7 +366,7 @@ public final class ConstantPropagateProc
         ExprNodeDesc childExpr = newExprs.get(i);
         if (childExpr instanceof ExprNodeConstantDesc) {
           ExprNodeConstantDesc c = (ExprNodeConstantDesc) childExpr;
-          if (c.getValue() == Boolean.FALSE) {
+          if (Boolean.FALSE.equals(c.getValue())) {
 
             // if false, prune it
             return newExprs.get(Math.abs(i - 1));
@@ -565,10 +565,10 @@ public final class ConstantPropagateProc
       ExprNodeDesc newCondn = foldExpr(condn, constants, cppCtx, op, 0, true);
       if (newCondn instanceof ExprNodeConstantDesc) {
         ExprNodeConstantDesc c = (ExprNodeConstantDesc) newCondn;
-        if (c.getValue() == Boolean.TRUE) {
+        if (Boolean.TRUE.equals(c.getValue())) {
           cppCtx.addOpToDelete(op);
           LOG.debug("Filter expression " + condn + " holds true. Will delete it.");
-        } else if (c.getValue() == Boolean.FALSE) {
+        } else if (Boolean.FALSE.equals(c.getValue())) {
           LOG.warn("Filter expression " + condn + " holds false!");
         }
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Sun Aug 10 01:33:50 2014
@@ -36,6 +36,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.ql.exec.*;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
+import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
@@ -62,9 +63,12 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
@@ -107,6 +111,12 @@ import org.apache.hadoop.hive.ql.udf.UDF
 import org.apache.hadoop.hive.ql.udf.UDFWeekOfYear;
 import org.apache.hadoop.hive.ql.udf.UDFYear;
 import org.apache.hadoop.hive.ql.udf.generic.*;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.util.ReflectionUtils;
 
 public class Vectorizer implements PhysicalPlanResolver {
 
@@ -133,6 +143,7 @@ public class Vectorizer implements Physi
     patternBuilder.append("|short");
     patternBuilder.append("|timestamp");
     patternBuilder.append("|boolean");
+    patternBuilder.append("|binary");
     patternBuilder.append("|string");
     patternBuilder.append("|byte");
     patternBuilder.append("|float");
@@ -256,7 +267,15 @@ public class Vectorizer implements Physi
 
   class VectorizationDispatcher implements Dispatcher {
 
+    private PhysicalContext pctx;
+
+    private int keyColCount;
+    private int valueColCount;
+
     public VectorizationDispatcher(PhysicalContext pctx) {
+      this.pctx = pctx;
+      keyColCount = 0;
+      valueColCount = 0;
     }
 
     @Override
@@ -270,6 +289,9 @@ public class Vectorizer implements Physi
         for (BaseWork w: work.getAllWork()) {
           if (w instanceof MapWork) {
             convertMapWork((MapWork)w);
+          } else if (w instanceof ReduceWork) {
+            // We are only vectorizing Reduce under Tez.
+            convertReduceWork((ReduceWork)w);
           }
         }
       }
@@ -283,6 +305,13 @@ public class Vectorizer implements Physi
       }
     }
 
+    private void addMapWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
+      opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*"
+          + FileSinkOperator.getOperatorName()), np);
+      opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
+          + ReduceSinkOperator.getOperatorName()), np);
+    }
+
     private boolean validateMapWork(MapWork mapWork) throws SemanticException {
 
       // Validate the input format
@@ -297,11 +326,8 @@ public class Vectorizer implements Physi
         }
       }
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      ValidationNodeProcessor vnp = new ValidationNodeProcessor();
-      opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*"
-          + FileSinkOperator.getOperatorName()), vnp);
-      opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
-          + ReduceSinkOperator.getOperatorName()), vnp);
+      MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor();
+      addMapWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new DefaultGraphWalker(disp);
       // iterator the mapper operator tree
@@ -320,14 +346,11 @@ public class Vectorizer implements Physi
     }
 
     private void vectorizeMapWork(MapWork mapWork) throws SemanticException {
-      LOG.info("Vectorizing task...");
+      LOG.info("Vectorizing MapWork...");
       mapWork.setVectorMode(true);
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      VectorizationNodeProcessor vnp = new VectorizationNodeProcessor(mapWork);
-      opRules.put(new RuleRegExp("R1", TableScanOperator.getOperatorName() + ".*" +
-          ReduceSinkOperator.getOperatorName()), vnp);
-      opRules.put(new RuleRegExp("R2", TableScanOperator.getOperatorName() + ".*"
-          + FileSinkOperator.getOperatorName()), vnp);
+      MapWorkVectorizationNodeProcessor vnp = new MapWorkVectorizationNodeProcessor(mapWork);
+      addMapWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new PreOrderWalker(disp);
       // iterator the mapper operator tree
@@ -348,9 +371,114 @@ public class Vectorizer implements Physi
 
       return;
     }
+
+    private void convertReduceWork(ReduceWork reduceWork) throws SemanticException {
+      boolean ret = validateReduceWork(reduceWork);
+      if (ret) {
+        vectorizeReduceWork(reduceWork);
+      }
+    }
+
+    private boolean getOnlyStructObjectInspectors(ReduceWork reduceWork) throws SemanticException {
+      try {
+        // Check key ObjectInspector.
+        ObjectInspector keyObjectInspector = reduceWork.getKeyObjectInspector();
+        if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) {
+          return false;
+        }
+        StructObjectInspector keyStructObjectInspector = (StructObjectInspector)keyObjectInspector;
+        keyColCount = keyStructObjectInspector.getAllStructFieldRefs().size();
+
+        // Tez doesn't use tagging...
+        if (reduceWork.getNeedsTagging()) {
+          return false;
+        }
+
+        // Check value ObjectInspector.
+        ObjectInspector valueObjectInspector = reduceWork.getValueObjectInspector();
+        if (valueObjectInspector == null || !(valueObjectInspector instanceof StructObjectInspector)) {
+          return false;
+        }
+        StructObjectInspector valueStructObjectInspector = (StructObjectInspector)valueObjectInspector;
+        valueColCount = valueStructObjectInspector.getAllStructFieldRefs().size();
+      } catch (Exception e) {
+        throw new SemanticException(e);
+      }
+      return true;
+    }
+
+    private void addReduceWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
+      opRules.put(new RuleRegExp("R1", ExtractOperator.getOperatorName() + ".*"), np);
+      opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + ".*"), np);
+    }
+
+    private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException {
+      // Validate input to ReduceWork.
+      if (!getOnlyStructObjectInspectors(reduceWork)) {
+        return false;
+      }
+      // Now check the reduce operator tree.
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      ReduceWorkValidationNodeProcessor vnp = new ReduceWorkValidationNodeProcessor();
+      addReduceWorkRules(opRules, vnp);
+      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+      GraphWalker ogw = new DefaultGraphWalker(disp);
+      // iterator the reduce operator tree
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.add(reduceWork.getReducer());
+      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+      ogw.startWalking(topNodes, nodeOutput);
+      for (Node n : nodeOutput.keySet()) {
+        if (nodeOutput.get(n) != null) {
+          if (!((Boolean)nodeOutput.get(n)).booleanValue()) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+
+    private void vectorizeReduceWork(ReduceWork reduceWork) throws SemanticException {
+      LOG.info("Vectorizing ReduceWork...");
+      reduceWork.setVectorMode(true);
+ 
+      // For some reason, the DefaultGraphWalker does not descend down from the reducer Operator as expected.
+      // We need to descend down, otherwise it breaks our algorithm that determines VectorizationContext...
+      // Do we use PreOrderWalker instead of DefaultGraphWalker.
+      Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+      ReduceWorkVectorizationNodeProcessor vnp = new ReduceWorkVectorizationNodeProcessor(reduceWork, keyColCount, valueColCount);
+      addReduceWorkRules(opRules, vnp);
+      Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
+      GraphWalker ogw = new PreOrderWalker(disp);
+      // iterator the reduce operator tree
+      ArrayList<Node> topNodes = new ArrayList<Node>();
+      topNodes.add(reduceWork.getReducer());
+      LOG.info("vectorizeReduceWork reducer Operator: " + reduceWork.getReducer().getName() + "...");
+      HashMap<Node, Object> nodeOutput = new HashMap<Node, Object>();
+      ogw.startWalking(topNodes, nodeOutput);
+
+      // Necessary since we are vectorizing the root operator in reduce.
+      reduceWork.setReducer(vnp.getRootVectorOp());
+
+      Operator<? extends OperatorDesc> reducer = reduceWork.getReducer();
+      if (reducer.getType().equals(OperatorType.EXTRACT)) {
+        ((VectorExtractOperator)reducer).setKeyAndValueColCounts(keyColCount, valueColCount);
+      }
+
+      Map<String, Map<Integer, String>> columnVectorTypes = vnp.getScratchColumnVectorTypes();
+      reduceWork.setScratchColumnVectorTypes(columnVectorTypes);
+      Map<String, Map<String, Integer>> columnMap = vnp.getScratchColumnMap();
+      reduceWork.setScratchColumnMap(columnMap);
+
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("vectorTypes: %s", columnVectorTypes.toString()));
+        LOG.debug(String.format("columnMap: %s", columnMap.toString()));
+      }
+    }
   }
 
-  class ValidationNodeProcessor implements NodeProcessor {
+  class MapWorkValidationNodeProcessor implements NodeProcessor {
 
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
@@ -361,9 +489,9 @@ public class Vectorizer implements Physi
             op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) {
           return new Boolean(true);
         }
-        boolean ret = validateOperator(op);
+        boolean ret = validateMapWorkOperator(op);
         if (!ret) {
-          LOG.info("Operator: " + op.getName() + " could not be vectorized.");
+          LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
           return new Boolean(false);
         }
       }
@@ -371,24 +499,37 @@ public class Vectorizer implements Physi
     }
   }
 
-  class VectorizationNodeProcessor implements NodeProcessor {
+  class ReduceWorkValidationNodeProcessor implements NodeProcessor {
 
-    private final MapWork mWork;
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      for (Node n : stack) {
+        Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
+        boolean ret = validateReduceWorkOperator(op);
+        if (!ret) {
+          LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized.");
+          return new Boolean(false);
+        }
+      }
+      return new Boolean(true);
+    }
+  }
+
+  // This class has common code used by both MapWorkVectorizationNodeProcessor and
+  // ReduceWorkVectorizationNodeProcessor.
+  class VectorizationNodeProcessor implements NodeProcessor {
 
     // This is used to extract scratch column types for each file key
-    private final Map<String, VectorizationContext> scratchColumnContext =
+    protected final Map<String, VectorizationContext> scratchColumnContext =
         new HashMap<String, VectorizationContext>();
 
-    private final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByTSOp =
+    protected final Map<Operator<? extends OperatorDesc>, VectorizationContext> vContextsByTSOp =
         new HashMap<Operator<? extends OperatorDesc>, VectorizationContext>();
 
-    private final Set<Operator<? extends OperatorDesc>> opsDone =
+    protected final Set<Operator<? extends OperatorDesc>> opsDone =
         new HashSet<Operator<? extends OperatorDesc>>();
 
-    public VectorizationNodeProcessor(MapWork mWork) {
-      this.mWork = mWork;
-    }
-
     public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
       Map<String, Map<Integer, String>> scratchColumnVectorTypes =
           new HashMap<String, Map<Integer, String>>();
@@ -411,16 +552,90 @@ public class Vectorizer implements Physi
       return scratchColumnMap;
     }
 
+    public VectorizationContext walkStackToFindVectorizationContext(Stack<Node> stack, Operator<? extends OperatorDesc> op)
+            throws SemanticException {
+      VectorizationContext vContext = null;
+      if (stack.size() <= 1) {
+        throw new SemanticException(String.format("Expected operator stack for operator %s to have at least 2 operators", op.getName()));
+      }
+      // Walk down the stack of operators until we found one willing to give us a context.
+      // At the bottom will be the root operator, guaranteed to have a context
+      int i= stack.size()-2;
+      while (vContext == null) {
+        if (i < 0) {
+          throw new SemanticException(String.format("Did not find vectorization context for operator %s in operator stack", op.getName()));
+        }
+        Operator<? extends OperatorDesc> opParent = (Operator<? extends OperatorDesc>) stack.get(i);
+        vContext = vContextsByTSOp.get(opParent);
+        --i;
+      }
+      return vContext;
+    }
+
+    public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) {
+      Operator<? extends OperatorDesc> currentOp = op;
+      while (currentOp.getParentOperators().size() > 0) {
+        currentOp = currentOp.getParentOperators().get(0);
+        if (currentOp.getType().equals(OperatorType.GROUPBY)) {
+          // No need to vectorize
+          if (!opsDone.contains(op)) {
+            opsDone.add(op);
+          }
+          return true;
+        }
+      }
+      return false;
+    }
+
+    public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, VectorizationContext vContext)
+            throws SemanticException {
+      Operator<? extends OperatorDesc> vectorOp = op;
+      try {
+        if (!opsDone.contains(op)) {
+          vectorOp = vectorizeOperator(op, vContext);
+          opsDone.add(op);
+          if (vectorOp != op) {
+            opsDone.add(vectorOp);
+          }
+          if (vectorOp instanceof VectorizationContextRegion) {
+            VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
+            VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
+            vContextsByTSOp.put(op, vOutContext);
+            scratchColumnContext.put(vOutContext.getFileKey(), vOutContext);
+          }
+        }
+      } catch (HiveException e) {
+        throw new SemanticException(e);
+      }
+      return vectorOp;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      throw new SemanticException("Must be overridden");
+    }
+  }
+  
+  class MapWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
+
+    private final MapWork mWork;
+
+    public MapWorkVectorizationNodeProcessor(MapWork mWork) {
+      this.mWork = mWork;
+    }
+
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
 
       Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+      LOG.info("MapWorkVectorizationNodeProcessor processing Operator: " + op.getName() + "...");
 
       VectorizationContext vContext = null;
 
       if (op instanceof TableScanOperator) {
-        vContext = getVectorizationContext((TableScanOperator) op, physicalContext);
+        vContext = getVectorizationContext(op, physicalContext);
         for (String onefile : mWork.getPathToAliases().keySet()) {
           List<String> aliases = mWork.getPathToAliases().get(onefile);
           for (String alias : aliases) {
@@ -438,45 +653,76 @@ public class Vectorizer implements Physi
         }
         vContextsByTSOp.put(op, vContext);
       } else {
-        assert stack.size() > 1;
-        // Walk down the stack of operators until we found one willing to give us a context.
-        // At the bottom will be the TS operator, guaranteed to have a context
-        int i= stack.size()-2;
-        while (vContext == null) {
-          Operator<? extends OperatorDesc> opParent = (Operator<? extends OperatorDesc>) stack.get(i);
-          vContext = vContextsByTSOp.get(opParent);
-          --i;
-        }
+        vContext = walkStackToFindVectorizationContext(stack, op);
       }
 
       assert vContext != null;
 
-      if ((op.getType().equals(OperatorType.REDUCESINK) || op.getType().equals(OperatorType.FILESINK)) &&
-          op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) {
-        // No need to vectorize
-        if (!opsDone.contains(op)) {
-          opsDone.add(op);
-        }
+      // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs.  So, don't vectorize
+      // any operators below GROUPBY.
+      if (nonVectorizableChildOfGroupBy(op)) {
+        return null;
+      }
+
+      doVectorize(op, vContext);
+
+      return null;
+    }
+  }
+
+  class ReduceWorkVectorizationNodeProcessor extends VectorizationNodeProcessor {
+
+    private final ReduceWork rWork;
+    private int keyColCount;
+    private int valueColCount;
+    private Map<String, Integer> reduceColumnNameMap;
+
+    private Operator<? extends OperatorDesc> rootVectorOp;
+
+    public Operator<? extends OperatorDesc> getRootVectorOp() {
+      return rootVectorOp;
+    }
+
+    public ReduceWorkVectorizationNodeProcessor(ReduceWork rWork, int keyColCount, int valueColCount) {
+      this.rWork = rWork;
+      reduceColumnNameMap = rWork.getReduceColumnNameMap();
+      this.keyColCount = keyColCount;
+      this.valueColCount = valueColCount;
+      rootVectorOp = null;
+    }
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+
+      Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) nd;
+      LOG.info("ReduceWorkVectorizationNodeProcessor processing Operator: " + op.getName() + "...");
+
+      VectorizationContext vContext = null;
+
+      boolean saveRootVectorOp = false;
+
+      if (op.getParentOperators().size() == 0) {
+        vContext = getReduceVectorizationContext(reduceColumnNameMap);
+        vContextsByTSOp.put(op, vContext);
+        saveRootVectorOp = true;
       } else {
-        try {
-          if (!opsDone.contains(op)) {
-            Operator<? extends OperatorDesc> vectorOp =
-                vectorizeOperator(op, vContext);
-            opsDone.add(op);
-            if (vectorOp != op) {
-              opsDone.add(vectorOp);
-            }
-            if (vectorOp instanceof VectorizationContextRegion) {
-              VectorizationContextRegion vcRegion = (VectorizationContextRegion) vectorOp;
-              VectorizationContext vOutContext = vcRegion.getOuputVectorizationContext();
-              vContextsByTSOp.put(op, vOutContext);
-              scratchColumnContext.put(vOutContext.getFileKey(), vOutContext);
-            }
-          }
-        } catch (HiveException e) {
-          throw new SemanticException(e);
-        }
+        vContext = walkStackToFindVectorizationContext(stack, op);
+      }
+
+      assert vContext != null;
+
+      // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs.  So, don't vectorize
+      // any operators below GROUPBY.
+      if (nonVectorizableChildOfGroupBy(op)) {
+        return null;
       }
+
+      Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
+      if (saveRootVectorOp && op != vectorOp) {
+        rootVectorOp = vectorOp;
+      }
+
       return null;
     }
   }
@@ -519,7 +765,7 @@ public class Vectorizer implements Physi
     return pctx;
   }
 
-  boolean validateOperator(Operator<? extends OperatorDesc> op) {
+  boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op) {
     boolean ret = false;
     switch (op.getType()) {
       case MAPJOIN:
@@ -555,6 +801,32 @@ public class Vectorizer implements Physi
     return ret;
   }
 
+  boolean validateReduceWorkOperator(Operator<? extends OperatorDesc> op) {
+    boolean ret = false;
+    switch (op.getType()) {
+      case EXTRACT:
+        ret = validateExtractOperator((ExtractOperator) op);
+        break;
+      case FILTER:
+        ret = validateFilterOperator((FilterOperator) op);
+        break;
+      case SELECT:
+        ret = validateSelectOperator((SelectOperator) op);
+        break;
+      case REDUCESINK:
+          ret = validateReduceSinkOperator((ReduceSinkOperator) op);
+          break;
+      case FILESINK:
+      case LIMIT:
+        ret = true;
+        break;
+      default:
+        ret = false;
+        break;
+    }
+    return ret;
+  }
+
   private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) {
     SMBJoinDesc desc = op.getConf();
     // Validation is the same as for map join, since the 'small' tables are not vectorized
@@ -617,6 +889,15 @@ public class Vectorizer implements Physi
     return validateAggregationDesc(op.getConf().getAggregators());
   }
 
+  private boolean validateExtractOperator(ExtractOperator op) {
+    ExprNodeDesc expr = op.getConf().getCol();
+    boolean ret = validateExprNodeDesc(expr);
+    if (!ret) {
+      return false;
+    }
+    return true;
+  }
+
   private boolean validateExprNodeDesc(List<ExprNodeDesc> descs) {
     return validateExprNodeDesc(descs, VectorExpressionDescriptor.Mode.PROJECTION);
   }
@@ -728,7 +1009,7 @@ public class Vectorizer implements Physi
     return supportedDataTypesPattern.matcher(type.toLowerCase()).matches();
   }
 
-  private VectorizationContext getVectorizationContext(TableScanOperator op,
+  private VectorizationContext getVectorizationContext(Operator op,
       PhysicalContext pctx) {
     RowSchema rs = op.getSchema();
 
@@ -741,8 +1022,26 @@ public class Vectorizer implements Physi
       }
     }
 
-    VectorizationContext vc =  new VectorizationContext(cmap, columnCount);
-    return vc;
+    return new VectorizationContext(cmap, columnCount);
+  }
+
+  private VectorizationContext getReduceVectorizationContext(Map<String, Integer> reduceColumnNameMap) {
+    return new VectorizationContext(reduceColumnNameMap, reduceColumnNameMap.size());
+  }
+
+  private void fixupParentChildOperators(Operator<? extends OperatorDesc> op, Operator<? extends OperatorDesc> vectorOp) {
+    if (op.getParentOperators() != null) {
+      vectorOp.setParentOperators(op.getParentOperators());
+      for (Operator<? extends OperatorDesc> p : op.getParentOperators()) {
+        p.replaceChild(op, vectorOp);
+      }
+    }
+    if (op.getChildOperators() != null) {
+      vectorOp.setChildOperators(op.getChildOperators());
+      for (Operator<? extends OperatorDesc> c : op.getChildOperators()) {
+        c.replaceParent(op, vectorOp);
+      }
+    }
   }
 
   Operator<? extends OperatorDesc> vectorizeOperator(Operator<? extends OperatorDesc> op,
@@ -757,6 +1056,7 @@ public class Vectorizer implements Physi
       case FILESINK:
       case REDUCESINK:
       case LIMIT:
+      case EXTRACT:
         vectorOp = OperatorFactory.getVectorOperator(op.getConf(), vContext);
         break;
       default:
@@ -765,18 +1065,7 @@ public class Vectorizer implements Physi
     }
 
     if (vectorOp != op) {
-      if (op.getParentOperators() != null) {
-        vectorOp.setParentOperators(op.getParentOperators());
-        for (Operator<? extends OperatorDesc> p : op.getParentOperators()) {
-          p.replaceChild(op, vectorOp);
-        }
-      }
-      if (op.getChildOperators() != null) {
-        vectorOp.setChildOperators(op.getChildOperators());
-        for (Operator<? extends OperatorDesc> c : op.getChildOperators()) {
-          c.replaceParent(op, vectorOp);
-        }
-      }
+      fixupParentChildOperators(op, vectorOp);
       ((AbstractOperatorDesc) vectorOp.getConf()).setVectorMode(true);
     }
     return vectorOp;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/AnnotateWithStatistics.java Sun Aug 10 01:33:50 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.Fi
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.LimitOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
@@ -62,6 +63,8 @@ public class AnnotateWithStatistics impl
         + MapJoinOperator.getOperatorName() + "%"), StatsRulesProcFactory.getJoinRule());
     opRules.put(new RuleRegExp("LIM", LimitOperator.getOperatorName() + "%"),
         StatsRulesProcFactory.getLimitRule());
+    opRules.put(new RuleRegExp("RS", ReduceSinkOperator.getOperatorName() + "%"),
+        StatsRulesProcFactory.getReduceSinkRule());
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/stats/annotation/StatsRulesProcFactory.java Sun Aug 10 01:33:50 2014
@@ -100,9 +100,9 @@ public class StatsRulesProcFactory {
       }
       Table table = aspCtx.getParseContext().getTopToTable().get(tsop);
 
-      // gather statistics for the first time and the attach it to table scan operator
-      Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, table, tsop);
       try {
+        // gather statistics for the first time and the attach it to table scan operator
+        Statistics stats = StatsUtils.collectStatistics(aspCtx.getConf(), partList, table, tsop);
         tsop.setStatistics(stats.clone());
 
         if (LOG.isDebugEnabled()) {
@@ -110,6 +110,9 @@ public class StatsRulesProcFactory {
         }
       } catch (CloneNotSupportedException e) {
         throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg());
+      } catch (HiveException e) {
+        LOG.debug(e);
+        throw new SemanticException(e);
       }
       return null;
     }
@@ -598,12 +601,18 @@ public class StatsRulesProcFactory {
               }
               dvProd *= dv;
             } else {
-
-              // partial column statistics on grouping attributes case.
-              // if column statistics on grouping attribute is missing, then
-              // assume worst case.
-              // GBY rule will emit half the number of rows if dvProd is 0
-              dvProd = 0;
+              if (parentStats.getColumnStatsState().equals(Statistics.State.COMPLETE)) {
+                // the column must be an aggregate column inserted by GBY. We
+                // don't have to account for this column when computing product
+                // of NDVs
+                continue;
+              } else {
+                // partial column statistics on grouping attributes case.
+                // if column statistics on grouping attribute is missing, then
+                // assume worst case.
+                // GBY rule will emit half the number of rows if dvProd is 0
+                dvProd = 0;
+              }
               break;
             }
           }
@@ -684,7 +693,17 @@ public class StatsRulesProcFactory {
               aggColStats.add(cs);
             }
           }
-          stats.addToColumnStats(aggColStats);
+
+          // add the new aggregate column and recompute data size
+          if (aggColStats.size() > 0) {
+            stats.addToColumnStats(aggColStats);
+
+            // only if the column stats is available, update the data size from
+            // the column stats
+            if (!stats.getColumnStatsState().equals(Statistics.State.NONE)) {
+              updateStats(stats, stats.getNumRows(), true);
+            }
+          }
 
           // if UDAF present and if column expression map is empty then it must
           // be full aggregation query like count(*) in which case number of
@@ -731,15 +750,24 @@ public class StatsRulesProcFactory {
    * <p>
    * In the absence of histograms, we can use the following general case
    * <p>
-   * <b>Single attribute</b>
+   * <b>2 Relations, 1 attribute</b>
    * <p>
    * T(RXS) = (T(R)*T(S))/max(V(R,Y), V(S,Y)) where Y is the join attribute
    * <p>
-   * <b>Multiple attributes</b>
+   * <b>2 Relations, 2 attributes</b>
    * <p>
    * T(RXS) = T(R)*T(S)/max(V(R,y1), V(S,y1)) * max(V(R,y2), V(S,y2)), where y1 and y2 are the join
    * attributes
    * <p>
+   * <b>3 Relations, 1 attributes</b>
+   * <p>
+   * T(RXSXQ) = T(R)*T(S)*T(Q)/top2largest(V(R,y), V(S,y), V(Q,y)), where y is the join attribute
+   * <p>
+   * <b>3 Relations, 2 attributes</b>
+   * <p>
+   * T(RXSXQ) = T(R)*T(S)*T(Q)/top2largest(V(R,y1), V(S,y1), V(Q,y1)) * top2largest(V(R,y2), V(S,y2), V(Q,y2)),
+   * where y1 and y2 are the join attributes
+   * <p>
    * <i>Worst case:</i> If no column statistics are available, then T(RXS) = joinFactor * max(T(R),
    * T(S)) * (numParents - 1) will be used as heuristics. joinFactor is from hive.stats.join.factor
    * hive config. In the worst case, since we do not know any information about join keys (and hence
@@ -780,9 +808,12 @@ public class StatsRulesProcFactory {
           // statistics object that is combination of statistics from all
           // relations involved in JOIN
           Statistics stats = new Statistics();
-          long prodRows = 1;
+          List<Long> rowCountParents = Lists.newArrayList();
           List<Long> distinctVals = Lists.newArrayList();
+
+          // 2 relations, multiple attributes
           boolean multiAttr = false;
+          int numAttr = 1;
 
           Map<String, ColStatistics> joinedColStats = Maps.newHashMap();
           Map<Integer, List<String>> joinKeys = Maps.newHashMap();
@@ -792,12 +823,13 @@ public class StatsRulesProcFactory {
             ReduceSinkOperator parent = (ReduceSinkOperator) jop.getParentOperators().get(pos);
 
             Statistics parentStats = parent.getStatistics();
-            prodRows *= parentStats.getNumRows();
+            rowCountParents.add(parentStats.getNumRows());
             List<ExprNodeDesc> keyExprs = parent.getConf().getKeyCols();
 
             // multi-attribute join key
             if (keyExprs.size() > 1) {
               multiAttr = true;
+              numAttr = keyExprs.size();
             }
 
             // compute fully qualified join key column names. this name will be
@@ -808,16 +840,9 @@ public class StatsRulesProcFactory {
                 StatsUtils.getFullQualifedColNameFromExprs(keyExprs, parent.getColumnExprMap());
             joinKeys.put(pos, fqCols);
 
-            Map<String, ExprNodeDesc> colExprMap = parent.getColumnExprMap();
-            RowSchema rs = parent.getSchema();
-
             // get column statistics for all output columns
-            List<ColStatistics> cs =
-                StatsUtils.getColStatisticsFromExprMap(conf, parentStats, colExprMap, rs);
-            for (ColStatistics c : cs) {
-              if (c != null) {
-                joinedColStats.put(c.getFullyQualifiedColName(), c);
-              }
+            for (ColStatistics cs : parentStats.getColumnStats()) {
+              joinedColStats.put(cs.getFullyQualifiedColName(), cs);
             }
 
             // since new statistics is derived from all relations involved in
@@ -831,10 +856,10 @@ public class StatsRulesProcFactory {
           long denom = 1;
           if (multiAttr) {
             List<Long> perAttrDVs = Lists.newArrayList();
-            int numAttr = joinKeys.get(0).size();
             for (int idx = 0; idx < numAttr; idx++) {
               for (Integer i : joinKeys.keySet()) {
                 String col = joinKeys.get(i).get(idx);
+                col = StatsUtils.stripPrefixFromColumnName(col);
                 ColStatistics cs = joinedColStats.get(col);
                 if (cs != null) {
                   perAttrDVs.add(cs.getCountDistint());
@@ -850,6 +875,7 @@ public class StatsRulesProcFactory {
           } else {
             for (List<String> jkeys : joinKeys.values()) {
               for (String jk : jkeys) {
+                jk = StatsUtils.stripPrefixFromColumnName(jk);
                 ColStatistics cs = joinedColStats.get(jk);
                 if (cs != null) {
                   distinctVals.add(cs.getCountDistint());
@@ -859,6 +885,11 @@ public class StatsRulesProcFactory {
             denom = getDenominator(distinctVals);
           }
 
+          // Update NDV of joined columns to be min(V(R,y), V(S,y))
+          if (multiAttr) {
+            updateJoinColumnsNDV(joinKeys, joinedColStats, numAttr);
+          }
+
           // column statistics from different sources are put together and rename
           // fully qualified column names based on output schema of join operator
           Map<String, ExprNodeDesc> colExprMap = jop.getColumnExprMap();
@@ -875,7 +906,6 @@ public class StatsRulesProcFactory {
               ColStatistics cs = joinedColStats.get(fqColName);
               String outColName = key;
               String outTabAlias = ci.getTabAlias();
-              outColName = StatsUtils.stripPrefixFromColumnName(outColName);
               if (cs != null) {
                 cs.setColumnName(outColName);
                 cs.setTableAlias(outTabAlias);
@@ -886,13 +916,21 @@ public class StatsRulesProcFactory {
 
           // update join statistics
           stats.setColumnStats(outColStats);
-          long newRowCount = prodRows / denom;
+          long newRowCount = computeNewRowCount(rowCountParents, denom);
+
+          if (newRowCount <= 0 && LOG.isDebugEnabled()) {
+            newRowCount = 0;
+            LOG.debug("[0] STATS-" + jop.toString() + ": Product of #rows might be greater than"
+                + " denominator or overflow might have occurred. Resetting row count to 0."
+                + " #Rows of parents: " + rowCountParents.toString() + ". Denominator: " + denom);
+          }
+
           stats.setNumRows(newRowCount);
           stats.setDataSize(StatsUtils.getDataSizeFromColumnStats(newRowCount, outColStats));
           jop.setStatistics(stats);
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug("[0] STATS-" + jop.toString() + ": " + stats.extendedToString());
+            LOG.debug("[1] STATS-" + jop.toString() + ": " + stats.extendedToString());
           }
         } else {
 
@@ -927,13 +965,72 @@ public class StatsRulesProcFactory {
           jop.setStatistics(wcStats);
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug("[1] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
+            LOG.debug("[2] STATS-" + jop.toString() + ": " + wcStats.extendedToString());
           }
         }
       }
       return null;
     }
 
+    private long computeNewRowCount(List<Long> rowCountParents, long denom) {
+      double factor = 0.0d;
+      long result = 1;
+      long max = rowCountParents.get(0);
+      long maxIdx = 0;
+
+      // To avoid long overflow, we will divide the max row count by denominator
+      // and use that factor to multiply with other row counts
+      for (int i = 1; i < rowCountParents.size(); i++) {
+        if (rowCountParents.get(i) > max) {
+          max = rowCountParents.get(i);
+          maxIdx = i;
+        }
+      }
+
+      factor = (double) max / (double) denom;
+
+      for (int i = 0; i < rowCountParents.size(); i++) {
+        if (i != maxIdx) {
+          result *= rowCountParents.get(i);
+        }
+      }
+
+      result = (long) (result * factor);
+
+      return result;
+    }
+
+    private void updateJoinColumnsNDV(Map<Integer, List<String>> joinKeys,
+        Map<String, ColStatistics> joinedColStats, int numAttr) {
+      int joinColIdx = 0;
+      while (numAttr > 0) {
+        long minNDV = Long.MAX_VALUE;
+
+        // find min NDV for joining columns
+        for (Map.Entry<Integer, List<String>> entry : joinKeys.entrySet()) {
+          String key = entry.getValue().get(joinColIdx);
+          key = StatsUtils.stripPrefixFromColumnName(key);
+          ColStatistics cs = joinedColStats.get(key);
+          if (cs != null && cs.getCountDistint() < minNDV) {
+            minNDV = cs.getCountDistint();
+          }
+        }
+
+        // set min NDV value to both columns involved in join
+        if (minNDV != Long.MAX_VALUE) {
+          for (Map.Entry<Integer, List<String>> entry : joinKeys.entrySet()) {
+            String key = entry.getValue().get(joinColIdx);
+            key = StatsUtils.stripPrefixFromColumnName(key);
+            ColStatistics cs = joinedColStats.get(key);
+            cs.setCountDistint(minNDV);
+          }
+        }
+
+        joinColIdx++;
+        numAttr--;
+      }
+    }
+
     private long getDenominator(List<Long> distinctVals) {
 
       if (distinctVals.isEmpty()) {
@@ -951,16 +1048,23 @@ public class StatsRulesProcFactory {
         return Collections.max(distinctVals);
       } else {
 
+        // remember min value and ignore it from the denominator
+        long minNDV = distinctVals.get(0);
+        int minIdx = 0;
+
+        for (int i = 1; i < distinctVals.size(); i++) {
+          if (distinctVals.get(i) < minNDV) {
+            minNDV = distinctVals.get(i);
+            minIdx = i;
+          }
+        }
+
         // join from multiple relations:
-        // denom = max(v1, v2) * max(v2, v3) * max(v3, v4)
+        // denom = Product of all NDVs except the least of all
         long denom = 1;
-        for (int i = 0; i < distinctVals.size() - 1; i++) {
-          long v1 = distinctVals.get(i);
-          long v2 = distinctVals.get(i + 1);
-          if (v1 >= v2) {
-            denom *= v1;
-          } else {
-            denom *= v2;
+        for (int i = 0; i < distinctVals.size(); i++) {
+          if (i != minIdx) {
+            denom *= distinctVals.get(i);
           }
         }
         return denom;
@@ -980,8 +1084,6 @@ public class StatsRulesProcFactory {
       LimitOperator lop = (LimitOperator) nd;
       Operator<? extends OperatorDesc> parent = lop.getParentOperators().get(0);
       Statistics parentStats = parent.getStatistics();
-      AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
-      HiveConf conf = aspCtx.getConf();
 
       try {
         long limit = -1;
@@ -1029,6 +1131,73 @@ public class StatsRulesProcFactory {
   }
 
   /**
+   * ReduceSink operator does not change any of the statistics. But it renames
+   * the column statistics from its parent based on the output key and value
+   * column names to make it easy for the downstream operators. This is different
+   * from the default stats which just aggregates and passes along the statistics
+   * without actually renaming based on output schema of the operator.
+   */
+  public static class ReduceSinkStatsRule extends DefaultStatsRule implements NodeProcessor {
+
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      ReduceSinkOperator rop = (ReduceSinkOperator) nd;
+      Operator<? extends OperatorDesc> parent = rop.getParentOperators().get(0);
+      Statistics parentStats = parent.getStatistics();
+      if (parentStats != null) {
+        AnnotateStatsProcCtx aspCtx = (AnnotateStatsProcCtx) procCtx;
+        HiveConf conf = aspCtx.getConf();
+
+        List<String> outKeyColNames = rop.getConf().getOutputKeyColumnNames();
+        List<String> outValueColNames = rop.getConf().getOutputValueColumnNames();
+        Map<String, ExprNodeDesc> colExprMap = rop.getColumnExprMap();
+        try {
+          Statistics outStats = parentStats.clone();
+          if (satisfyPrecondition(parentStats)) {
+            List<ColStatistics> colStats = Lists.newArrayList();
+            for (String key : outKeyColNames) {
+              String prefixedKey = "KEY." + key;
+              ExprNodeDesc end = colExprMap.get(prefixedKey);
+              if (end != null) {
+                ColStatistics cs = StatsUtils
+                    .getColStatisticsFromExpression(conf, parentStats, end);
+                if (cs != null) {
+                  cs.setColumnName(key);
+                  colStats.add(cs);
+                }
+              }
+            }
+
+            for (String val : outValueColNames) {
+              String prefixedVal = "VALUE." + val;
+              ExprNodeDesc end = colExprMap.get(prefixedVal);
+              if (end != null) {
+                ColStatistics cs = StatsUtils
+                    .getColStatisticsFromExpression(conf, parentStats, end);
+                if (cs != null) {
+                  cs.setColumnName(val);
+                  colStats.add(cs);
+                }
+              }
+            }
+
+            outStats.setColumnStats(colStats);
+          }
+          rop.setStatistics(outStats);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("[0] STATS-" + rop.toString() + ": " + outStats.extendedToString());
+          }
+        } catch (CloneNotSupportedException e) {
+          throw new SemanticException(ErrorMsg.STATISTICS_CLONING_FAILED.getMsg());
+        }
+      }
+      return null;
+    }
+
+  }
+
+  /**
    * Default rule is to aggregate the statistics from all its parent operators.
    */
   public static class DefaultStatsRule implements NodeProcessor {
@@ -1105,6 +1274,10 @@ public class StatsRulesProcFactory {
     return new LimitStatsRule();
   }
 
+  public static NodeProcessor getReduceSinkRule() {
+    return new ReduceSinkStatsRule();
+  }
+
   public static NodeProcessor getDefaultRule() {
     return new DefaultStatsRule();
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ColumnStatsSemanticAnalyzer.java Sun Aug 10 01:33:50 2014
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -102,6 +103,10 @@ public class ColumnStatsSemanticAnalyzer
   private Map<String,String> getPartKeyValuePairsFromAST(ASTNode tree) {
     ASTNode child = ((ASTNode) tree.getChild(0).getChild(1));
     Map<String,String> partSpec = new HashMap<String, String>();
+    if (null == child) {
+      // case of analyze table T compute statistics for columns;
+      return partSpec;
+    }
     String partKey;
     String partValue;
     for (int i = 0; i < child.getChildCount(); i++) {
@@ -361,6 +366,9 @@ public class ColumnStatsSemanticAnalyzer
       checkIfTemporaryTable();
       checkForPartitionColumns(colNames, Utilities.getColumnNamesFromFieldSchema(tbl.getPartitionKeys()));
       validateSpecifiedColumnNames(colNames);
+      if (conf.getBoolVar(ConfVars.HIVE_STATS_COLLECT_PART_LEVEL_STATS) && tbl.isPartitioned()) {
+        isPartitionStats = true;
+      }
 
       if (isPartitionStats) {
         isTableLevel = false;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Sun Aug 10 01:33:50 2014
@@ -52,6 +52,11 @@ public abstract class BaseWork extends A
 
   private String name;
 
+  // Vectorization.
+  protected Map<String, Map<Integer, String>> scratchColumnVectorTypes = null;
+  protected Map<String, Map<String, Integer>> scratchColumnMap = null;
+  protected boolean vectorMode = false;
+
   public void setGatheringStats(boolean gatherStats) {
     this.gatheringStats = gatherStats;
   }
@@ -107,5 +112,31 @@ public abstract class BaseWork extends A
     return returnSet;
   }
 
+  public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
+    return scratchColumnVectorTypes;
+  }
+
+  public void setScratchColumnVectorTypes(
+      Map<String, Map<Integer, String>> scratchColumnVectorTypes) {
+    this.scratchColumnVectorTypes = scratchColumnVectorTypes;
+  }
+
+  public Map<String, Map<String, Integer>> getScratchColumnMap() {
+    return scratchColumnMap;
+  }
+
+  public void setScratchColumnMap(Map<String, Map<String, Integer>> scratchColumnMap) {
+    this.scratchColumnMap = scratchColumnMap;
+  }
+
+  @Override
+  public void setVectorMode(boolean vectorMode) {
+    this.vectorMode = vectorMode;
+  }
+
+  public boolean getVectorMode() {
+    return vectorMode;
+  }
+
   public abstract void configureJobConf(JobConf job);
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Sun Aug 10 01:33:50 2014
@@ -116,10 +116,6 @@ public class MapWork extends BaseWork {
 
   private boolean useOneNullRowInputFormat;
 
-  private Map<String, Map<Integer, String>> scratchColumnVectorTypes = null;
-  private Map<String, Map<String, Integer>> scratchColumnMap = null;
-  private boolean vectorMode = false;
-
   public MapWork() {}
 
   public MapWork(String name) {
@@ -519,32 +515,6 @@ public class MapWork extends BaseWork {
     }
   }
 
-  public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
-    return scratchColumnVectorTypes;
-  }
-
-  public void setScratchColumnVectorTypes(
-      Map<String, Map<Integer, String>> scratchColumnVectorTypes) {
-    this.scratchColumnVectorTypes = scratchColumnVectorTypes;
-  }
-
-  public Map<String, Map<String, Integer>> getScratchColumnMap() {
-    return scratchColumnMap;
-  }
-
-  public void setScratchColumnMap(Map<String, Map<String, Integer>> scratchColumnMap) {
-    this.scratchColumnMap = scratchColumnMap;
-  }
-
-  public boolean getVectorMode() {
-    return vectorMode;
-  }
-
-  @Override
-  public void setVectorMode(boolean vectorMode) {
-    this.vectorMode = vectorMode;
-  }
-
   public void logPathToAliases() {
     if (LOG.isDebugEnabled()) {
       LOG.debug("LOGGING PATH TO ALIASES");

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java Sun Aug 10 01:33:50 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -30,7 +31,18 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.Rule;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * ReduceWork represents all the information used to run a reduce task on the cluster.
@@ -84,6 +96,11 @@ public class ReduceWork extends BaseWork
   // for auto reduce parallelism - max reducers requested
   private int maxReduceTasks;
 
+  private ObjectInspector keyObjectInspector = null;
+  private ObjectInspector valueObjectInspector = null;
+
+  private Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>();
+
   /**
    * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing
    * to keySerializeInfo of the ReduceSink
@@ -95,7 +112,90 @@ public class ReduceWork extends BaseWork
   }
 
   public TableDesc getKeyDesc() {
-    return keyDesc;
+     return keyDesc;
+  }
+
+  private ObjectInspector getObjectInspector(TableDesc desc) {
+    ObjectInspector objectInspector;
+    try {
+      Deserializer deserializer = (SerDe) ReflectionUtils.newInstance(desc
+                .getDeserializerClass(), null);
+      SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), null);
+      objectInspector = deserializer.getObjectInspector();
+    } catch (Exception e) {
+      return null;
+    }
+    return objectInspector;
+  }
+
+  public ObjectInspector getKeyObjectInspector() {
+    if (keyObjectInspector == null) {
+      keyObjectInspector = getObjectInspector(keyDesc);
+    }
+    return keyObjectInspector;
+  }
+
+  // Only works when not tagging.
+  public ObjectInspector getValueObjectInspector() {
+    if (needsTagging) {
+      return null;
+    }
+    if (valueObjectInspector == null) {
+      valueObjectInspector = getObjectInspector(tagToValueDesc.get(0));
+    }
+    return valueObjectInspector;
+  }
+
+  private int addToReduceColumnNameMap(StructObjectInspector structObjectInspector, int startIndex, String prefix) {
+    List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs();
+    int index = startIndex;
+    for (StructField field: fields) {
+      reduceColumnNameMap.put(prefix + "." + field.getFieldName(), index);
+      index++;
+    }
+    return index;
+  }
+
+  public Boolean fillInReduceColumnNameMap() {
+    ObjectInspector keyObjectInspector = getKeyObjectInspector();
+    if (keyObjectInspector == null || !(keyObjectInspector instanceof StructObjectInspector)) {
+        return false;
+    }
+    StructObjectInspector keyStructObjectInspector = (StructObjectInspector) keyObjectInspector;
+
+    ObjectInspector valueObjectInspector = getValueObjectInspector();
+    if (valueObjectInspector == null || !(valueObjectInspector instanceof StructObjectInspector)) {
+        return false;
+    }
+    StructObjectInspector valueStructObjectInspector = (StructObjectInspector) valueObjectInspector;
+
+    int keyCount = addToReduceColumnNameMap(keyStructObjectInspector, 0, Utilities.ReduceField.KEY.toString());
+    addToReduceColumnNameMap(valueStructObjectInspector, keyCount, Utilities.ReduceField.VALUE.toString());
+    return true;
+  }
+
+  public Map<String, Integer> getReduceColumnNameMap() {
+    if (needsTagging) {
+      return null;
+    }
+    if (reduceColumnNameMap.size() == 0) {
+      if (!fillInReduceColumnNameMap()) {
+        return null;
+      }
+    }
+    return reduceColumnNameMap;
+  }
+
+  public List<String> getReduceColumnNames() {
+    if (needsTagging) {
+        return null;
+    }
+    if (reduceColumnNameMap.size() == 0) {
+        if (!fillInReduceColumnNameMap()) {
+            return null;
+        }
+    }
+    return new ArrayList<String>(reduceColumnNameMap.keySet());
   }
 
   public List<TableDesc> getTagToValueDesc() {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/StorageBasedAuthorizationProvider.java Sun Aug 10 01:33:50 2014
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.securi
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.security.AccessControlException;
+import java.security.PrivilegedExceptionAction;
 import java.util.EnumSet;
 import java.util.List;
 
@@ -35,6 +36,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStore.HMSHandler;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -44,6 +48,7 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.shims.ShimLoader;
 
 /**
  * StorageBasedAuthorizationProvider is an implementation of
@@ -288,7 +293,7 @@ public class StorageBasedAuthorizationPr
    * If the given path does not exists, it checks for its parent folder.
    */
   protected void checkPermissions(final Configuration conf, final Path path,
-      final EnumSet<FsAction> actions) throws IOException, LoginException {
+      final EnumSet<FsAction> actions) throws IOException, LoginException, HiveException {
 
     if (path == null) {
       throw new IllegalArgumentException("path is null");
@@ -297,8 +302,7 @@ public class StorageBasedAuthorizationPr
     final FileSystem fs = path.getFileSystem(conf);
 
     if (fs.exists(path)) {
-      checkPermissions(fs, path, actions,
-          authenticator.getUserName(), authenticator.getGroupNames());
+      checkPermissions(fs, path, actions, authenticator.getUserName());
     } else if (path.getParent() != null) {
       // find the ancestor which exists to check its permissions
       Path par = path.getParent();
@@ -309,8 +313,7 @@ public class StorageBasedAuthorizationPr
         par = par.getParent();
       }
 
-      checkPermissions(fs, par, actions,
-          authenticator.getUserName(), authenticator.getGroupNames());
+      checkPermissions(fs, par, actions, authenticator.getUserName());
     }
   }
 
@@ -320,56 +323,23 @@ public class StorageBasedAuthorizationPr
    */
   @SuppressWarnings("deprecation")
   protected static void checkPermissions(final FileSystem fs, final Path path,
-      final EnumSet<FsAction> actions, String user, List<String> groups) throws IOException,
-      AccessControlException {
-
-    String superGroupName = getSuperGroupName(fs.getConf());
-    if (userBelongsToSuperGroup(superGroupName, groups)) {
-      LOG.info("User \"" + user + "\" belongs to super-group \"" + superGroupName + "\". " +
-          "Permission granted for actions: (" + actions + ").");
-      return;
-    }
-
-    final FileStatus stat;
+      final EnumSet<FsAction> actions, String user) throws IOException,
+      AccessControlException, HiveException {
 
     try {
-      stat = fs.getFileStatus(path);
+      FileStatus stat = fs.getFileStatus(path);
+      for (FsAction action : actions) {
+        FileUtils.checkFileAccessWithImpersonation(fs, stat, action, user);
+      }
     } catch (FileNotFoundException fnfe) {
       // File named by path doesn't exist; nothing to validate.
       return;
     } catch (org.apache.hadoop.fs.permission.AccessControlException ace) {
       // Older hadoop version will throw this @deprecated Exception.
       throw accessControlException(ace);
+    } catch (Exception err) {
+      throw new HiveException(err);
     }
-
-    final FsPermission dirPerms = stat.getPermission();
-    final String grp = stat.getGroup();
-
-    for (FsAction action : actions) {
-      if (user.equals(stat.getOwner())) {
-        if (dirPerms.getUserAction().implies(action)) {
-          continue;
-        }
-      }
-      if (groups.contains(grp)) {
-        if (dirPerms.getGroupAction().implies(action)) {
-          continue;
-        }
-      }
-      if (dirPerms.getOtherAction().implies(action)) {
-        continue;
-      }
-      throw new AccessControlException("action " + action + " not permitted on path "
-          + path + " for user " + user);
-    }
-  }
-
-  private static String getSuperGroupName(Configuration configuration) {
-    return configuration.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, "");
-  }
-
-  private static boolean userBelongsToSuperGroup(String superGroupName, List<String> groups) {
-    return groups.contains(superGroupName);
   }
 
   protected Path getDbLocation(Database db) throws HiveException {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLAuthorizationUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLAuthorizationUtils.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLAuthorizationUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLAuthorizationUtils.java Sun Aug 10 01:33:50 2014
@@ -394,7 +394,7 @@ public class SQLAuthorizationUtils {
       if (FileUtils.isActionPermittedForFileHierarchy(fs, fileStatus, userName, FsAction.READ)) {
         availPrivs.addPrivilege(SQLPrivTypeGrant.SELECT_NOGRANT);
       }
-    } catch (IOException e) {
+    } catch (Exception e) {
       String msg = "Error getting permissions for " + filePath + ": " + e.getMessage();
       throw new HiveAuthzPluginException(msg, e);
     }
@@ -412,5 +412,8 @@ public class SQLAuthorizationUtils {
     }
   }
 
+  static HiveAuthzPluginException getPluginException(String prefix, Exception e) {
+    return new HiveAuthzPluginException(prefix + ": " + e.getMessage(), e);
+  }
 
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/SQLStdHiveAccessController.java Sun Aug 10 01:33:50 2014
@@ -123,8 +123,8 @@ public class SQLStdHiveAccessController 
       }
       return currentRoles;
     } catch (Exception e) {
-      throw new HiveAuthzPluginException("Failed to retrieve roles for " + currentUserName + ": "
-          + e.getMessage(), e);
+      throw SQLAuthorizationUtils.getPluginException("Failed to retrieve roles for "
+          + currentUserName, e);
     }
   }
 
@@ -179,7 +179,7 @@ public class SQLStdHiveAccessController 
     try {
       metastoreClient.grant_privileges(privBag);
     } catch (Exception e) {
-      throw new HiveAuthzPluginException("Error granting privileges: " + e.getMessage(), e);
+      throw SQLAuthorizationUtils.getPluginException("Error granting privileges", e);
     }
   }
 
@@ -239,7 +239,7 @@ public class SQLStdHiveAccessController 
       // that has desired behavior.
       metastoreClient.revoke_privileges(new PrivilegeBag(revokePrivs), grantOption);
     } catch (Exception e) {
-      throw new HiveAuthzPluginException("Error revoking privileges", e);
+      throw SQLAuthorizationUtils.getPluginException("Error revoking privileges", e);
     }
   }
 
@@ -260,7 +260,7 @@ public class SQLStdHiveAccessController 
       metastoreClientFactory.getHiveMetastoreClient().create_role(
         new Role(roleName, 0, grantorName));
     } catch (TException e) {
-      throw new HiveAuthzPluginException("Error create role : " + e.getMessage(), e);
+      throw SQLAuthorizationUtils.getPluginException("Error create role", e);
     }
   }
 
@@ -274,7 +274,7 @@ public class SQLStdHiveAccessController 
     try {
       metastoreClientFactory.getHiveMetastoreClient().drop_role(roleName);
     } catch (Exception e) {
-      throw new HiveAuthzPluginException("Error dropping role", e);
+      throw SQLAuthorizationUtils.getPluginException("Error dropping role", e);
     }
   }
 
@@ -295,11 +295,11 @@ public class SQLStdHiveAccessController 
               grantorPrinc.getName(),
               AuthorizationUtils.getThriftPrincipalType(grantorPrinc.getType()), grantOption);
         } catch (MetaException e) {
-          throw new HiveAuthzPluginException(e.getMessage(), e);
+          throw SQLAuthorizationUtils.getPluginException("Error granting role", e);
         } catch (Exception e) {
           String msg = "Error granting roles for " + hivePrincipal.getName() + " to role "
-              + roleName + ": " + e.getMessage();
-          throw new HiveAuthzPluginException(msg, e);
+              + roleName;
+          throw SQLAuthorizationUtils.getPluginException(msg, e);
         }
       }
     }
@@ -321,8 +321,8 @@ public class SQLStdHiveAccessController 
               AuthorizationUtils.getThriftPrincipalType(hivePrincipal.getType()), grantOption);
         } catch (Exception e) {
           String msg = "Error revoking roles for " + hivePrincipal.getName() + " to role "
-              + roleName + ": " + e.getMessage();
-          throw new HiveAuthzPluginException(msg, e);
+              + roleName;
+          throw SQLAuthorizationUtils.getPluginException(msg, e);
         }
       }
     }
@@ -338,7 +338,7 @@ public class SQLStdHiveAccessController 
     try {
       return metastoreClientFactory.getHiveMetastoreClient().listRoleNames();
     } catch (Exception e) {
-      throw new HiveAuthzPluginException("Error listing all roles", e);
+      throw SQLAuthorizationUtils.getPluginException("Error listing all roles", e);
     }
   }
 
@@ -353,10 +353,12 @@ public class SQLStdHiveAccessController 
     try {
       return getHiveRoleGrants(metastoreClientFactory.getHiveMetastoreClient(), roleName);
     } catch (Exception e) {
-      throw new HiveAuthzPluginException("Error getting principals for all roles", e);
+      throw SQLAuthorizationUtils.getPluginException("Error getting principals for all roles", e);
     }
   }
 
+
+
   public static List<HiveRoleGrant> getHiveRoleGrants(IMetaStoreClient client, String roleName)
       throws Exception {
     GetPrincipalsInRoleRequest request = new GetPrincipalsInRoleRequest(roleName);
@@ -435,7 +437,7 @@ public class SQLStdHiveAccessController 
       return resPrivInfos;
 
     } catch (Exception e) {
-      throw new HiveAuthzPluginException("Error showing privileges: "+ e.getMessage(), e);
+      throw SQLAuthorizationUtils.getPluginException("Error showing privileges", e);
     }
 
   }
@@ -550,11 +552,7 @@ public class SQLStdHiveAccessController 
    */
   boolean isUserAdmin() throws HiveAuthzPluginException {
     List<HiveRoleGrant> roles;
-    try {
-      roles = getCurrentRoles();
-    } catch (Exception e) {
-      throw new HiveAuthzPluginException(e);
-    }
+    roles = getCurrentRoles();
     for (HiveRoleGrant role : roles) {
       if (role.getRoleName().equalsIgnoreCase(HiveMetaStore.ADMIN)) {
         return true;
@@ -565,11 +563,7 @@ public class SQLStdHiveAccessController 
 
   private boolean doesUserHasAdminOption(List<String> roleNames) throws HiveAuthzPluginException {
     List<HiveRoleGrant> currentRoles;
-    try {
-      currentRoles = getCurrentRoles();
-    } catch (Exception e) {
-        throw new HiveAuthzPluginException(e);
-    }
+    currentRoles = getCurrentRoles();
     for (String roleName : roleNames) {
       boolean roleFound = false;
       for (HiveRoleGrant currentRole : currentRoles) {
@@ -606,8 +600,8 @@ public class SQLStdHiveAccessController 
       }
       return hiveRoleGrants;
     } catch (Exception e) {
-      throw new HiveAuthzPluginException("Error getting role grant information for user "
-          + principal.getName() + ": " + e.getMessage(), e);
+      throw SQLAuthorizationUtils.getPluginException("Error getting role grant information for user "
+          + principal.getName(), e);
     }
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java?rev=1617040&r1=1617039&r2=1617040&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java Sun Aug 10 01:33:50 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.stats;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -30,7 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
+import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -101,7 +100,7 @@ public class StatsUtils {
    * @throws HiveException
    */
   public static Statistics collectStatistics(HiveConf conf, PrunedPartitionList partList,
-      Table table, TableScanOperator tableScanOperator) {
+      Table table, TableScanOperator tableScanOperator) throws HiveException {
 
     Statistics stats = new Statistics();
 
@@ -197,7 +196,8 @@ public class StatsUtils {
       stats.addToDataSize(ds);
 
       // if at least a partition does not contain row count then mark basic stats state as PARTIAL
-      if (containsNonPositives(rowCounts)) {
+      if (containsNonPositives(rowCounts) &&
+          stats.getBasicStatsState().equals(State.COMPLETE)) {
         stats.setBasicStatsState(State.PARTIAL);
       }
       boolean haveFullStats = fetchColStats;
@@ -206,17 +206,26 @@ public class StatsUtils {
         for (Partition part : partList.getNotDeniedPartns()) {
           partNames.add(part.getName());
         }
-        Map<String, List<ColStatistics>> partStats =
-            getPartColumnStats(table, schema, partNames, neededColumns);
-        if (partStats != null) {
-          for (String partName : partNames) {
-            List<ColStatistics> partStat = partStats.get(partName);
-            haveFullStats &= (partStat != null);
-            if (partStat != null) {
-              stats.updateColumnStatsState(deriveStatType(partStat, neededColumns));
-              stats.addToColumnStats(partStat);
-            }
+        Map<String, String> colToTabAlias = new HashMap<String, String>();
+        neededColumns = processNeededColumns(schema, neededColumns, colToTabAlias);
+        AggrStats aggrStats = Hive.get().getAggrColStatsFor(table.getDbName(), table.getTableName(), neededColumns, partNames);
+        if (null == aggrStats) {
+          haveFullStats = false;
+        } else {
+          List<ColumnStatisticsObj> colStats = aggrStats.getColStats();
+          if (colStats.size() != neededColumns.size()) {
+            LOG.debug("Column stats requested for : " + neededColumns.size() + " columns. Able to retrieve"
+                + " for " + colStats.size() + " columns");
+          }
+          List<ColStatistics> columnStats = convertColStats(colStats, table.getTableName(), colToTabAlias);
+          stats.addToColumnStats(columnStats);
+          State colState = deriveStatType(columnStats, neededColumns);
+          if (aggrStats.getPartsFound() != partNames.size() && colState != State.NONE) {
+            LOG.debug("Column stats requested for : " + partNames.size() +" partitions. "
+              + "Able to retrieve for " + aggrStats.getPartsFound() + " partitions");
+            stats.updateColumnStatsState(State.PARTIAL);
           }
+          stats.setColumnStatsState(colState);
         }
       }
       // There are some partitions with no state (or we didn't fetch any state).
@@ -460,12 +469,7 @@ public class StatsUtils {
     try {
       List<ColumnStatisticsObj> colStat = Hive.get().getTableColumnStatistics(
           dbName, tabName, neededColsInTable);
-      stats = new ArrayList<ColStatistics>(colStat.size());
-      for (ColumnStatisticsObj statObj : colStat) {
-        ColStatistics cs = getColStatistics(statObj, tabName, statObj.getColName());
-        cs.setTableAlias(colToTabAlias.get(cs.getColumnName()));
-        stats.add(cs);
-      }
+      stats = convertColStats(colStat, tabName, colToTabAlias);
     } catch (HiveException e) {
       LOG.error("Failed to retrieve table statistics: ", e);
       stats = null;
@@ -473,43 +477,16 @@ public class StatsUtils {
     return stats;
   }
 
-  /**
-   * Get table level column statistics from metastore for needed columns
-   * @param table
-   *          - table
-   * @param schema
-   *          - output schema
-   * @param neededColumns
-   *          - list of needed columns
-   * @return column statistics
-   */
-  public static Map<String, List<ColStatistics>> getPartColumnStats(Table table,
-      List<ColumnInfo> schema, List<String> partNames, List<String> neededColumns) {
-    String dbName = table.getDbName();
-    String tabName = table.getTableName();
-    Map<String, String> colToTabAlias = new HashMap<String, String>(schema.size());
-    List<String> neededColsInTable = processNeededColumns(schema, neededColumns, colToTabAlias);
-    Map<String, List<ColStatistics>> stats = null;
-    try {
-      Map<String, List<ColumnStatisticsObj>> colStat = Hive.get().getPartitionColumnStatistics(
-          dbName, tabName, partNames, neededColsInTable);
-      stats = new HashMap<String, List<ColStatistics>>(colStat.size());
-      for (Map.Entry<String, List<ColumnStatisticsObj>> entry : colStat.entrySet()) {
-        List<ColStatistics> partStat = new ArrayList<ColStatistics>(entry.getValue().size());
-        for (ColumnStatisticsObj statObj : entry.getValue()) {
-          ColStatistics cs = getColStatistics(statObj, tabName, statObj.getColName());
-          cs.setTableAlias(colToTabAlias.get(cs.getColumnName()));
-          partStat.add(cs);
-        }
-        stats.put(entry.getKey(), partStat);
-      }
-    } catch (HiveException e) {
-      LOG.error("Failed to retrieve partitions statistics: ", e);
-      stats = null;
+  private static List<ColStatistics> convertColStats(List<ColumnStatisticsObj> colStats, String tabName,
+    Map<String,String> colToTabAlias) {
+    List<ColStatistics> stats = new ArrayList<ColStatistics>(colStats.size());
+    for (ColumnStatisticsObj statObj : colStats) {
+      ColStatistics cs = getColStatistics(statObj, tabName, statObj.getColName());
+      cs.setTableAlias(colToTabAlias.get(cs.getColumnName()));
+      stats.add(cs);
     }
     return stats;
   }
-
   private static List<String> processNeededColumns(List<ColumnInfo> schema,
       List<String> neededColumns, Map<String, String> colToTabAlias) {
     for (ColumnInfo col : schema) {
@@ -884,12 +861,9 @@ public class StatsUtils {
     if (colExprMap != null) {
       for (ColumnInfo ci : rowSchema.getSignature()) {
         String outColName = ci.getInternalName();
+        outColName = StatsUtils.stripPrefixFromColumnName(outColName);
         String outTabAlias = ci.getTabAlias();
         ExprNodeDesc end = colExprMap.get(outColName);
-        if (end == null) {
-          outColName = StatsUtils.stripPrefixFromColumnName(outColName);
-          end = colExprMap.get(outColName);
-        }
         ColStatistics colStat = getColStatisticsFromExpression(conf, parentStats, end);
         if (colStat != null) {
           outColName = StatsUtils.stripPrefixFromColumnName(outColName);
@@ -1150,7 +1124,7 @@ public class StatsUtils {
    */
   public static String stripPrefixFromColumnName(String colName) {
     String stripedName = colName;
-    if (colName.startsWith("KEY._") || colName.startsWith("VALUE._")) {
+    if (colName.startsWith("KEY") || colName.startsWith("VALUE")) {
       // strip off KEY./VALUE. from column name
       stripedName = colName.split("\\.")[1];
     }
@@ -1218,15 +1192,16 @@ public class StatsUtils {
         for (Map.Entry<String, ExprNodeDesc> entry : map.entrySet()) {
           if (entry.getValue().isSame(end)) {
             outColName = entry.getKey();
+            outColName = stripPrefixFromColumnName(outColName);
           }
         }
         if (end instanceof ExprNodeColumnDesc) {
           ExprNodeColumnDesc encd = (ExprNodeColumnDesc) end;
           if (outColName == null) {
             outColName = encd.getColumn();
+            outColName = stripPrefixFromColumnName(outColName);
           }
           String tabAlias = encd.getTabAlias();
-          outColName = stripPrefixFromColumnName(outColName);
           result.add(getFullyQualifiedColumnName(tabAlias, outColName));
         } else if (end instanceof ExprNodeGenericFuncDesc) {
           ExprNodeGenericFuncDesc enf = (ExprNodeGenericFuncDesc) end;