You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC

svn commit: r1623263 [22/28] - in /hive/branches/spark: ./ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/ ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/apache/hive/beeline/ bin/...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java Mon Sep  8 04:38:17 2014
@@ -23,13 +23,18 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
 /**
  * OptimizeTezProcContext. OptimizeTezProcContext maintains information
  * about the current operator plan as we walk the operator tree
@@ -47,19 +52,23 @@ public class OptimizeTezProcContext impl
   public final Set<ReduceSinkOperator> visitedReduceSinks
     = new HashSet<ReduceSinkOperator>();
 
+  public final Multimap<AppMasterEventOperator, TableScanOperator> eventOpToTableScanMap =
+      HashMultimap.create();
+
   // rootOperators are all the table scan operators in sequence
   // of traversal
-  public final Deque<Operator<? extends OperatorDesc>> rootOperators;
+  public Deque<Operator<? extends OperatorDesc>> rootOperators;
 
-  @SuppressWarnings("unchecked")
-  public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext,
-      Set<ReadEntity> inputs, Set<WriteEntity> outputs,
-      Deque<Operator<?>> rootOperators) {
+  public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, Set<ReadEntity> inputs,
+      Set<WriteEntity> outputs) {
 
     this.conf = conf;
     this.parseContext = parseContext;
     this.inputs = inputs;
     this.outputs = outputs;
-    this.rootOperators = rootOperators;
+  }
+
+  public void setRootOperators(Deque<Operator<? extends OperatorDesc>> roots) {
+    this.rootOperators = roots;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java Mon Sep  8 04:38:17 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.Jav
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -221,7 +222,7 @@ public final class ParseUtils {
       return null;
     }
     try {
-      Class.forName(className, true, JavaUtils.getClassLoader());
+      Class.forName(className, true, Utilities.getSessionSpecifiedClassLoader());
     } catch (ClassNotFoundException e) {
       throw new SemanticException("Cannot find class '" + className + "'", e);
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java Mon Sep  8 04:38:17 2014
@@ -66,26 +66,26 @@ public class QB {
   private HashMap<String, WindowingSpec> destToWindowingSpec;
 
   /*
-   * If this QB represents a SubQuery predicate then this will point to the SubQuery object.
+   * If this QB represents a  SubQuery predicate then this will point to the SubQuery object.
    */
   private QBSubQuery subQueryPredicateDef;
-  
-	/*
-	 * used to give a unique name to each SubQuery QB Currently there can be at
-	 * most 2 SubQueries in a Query: 1 in the Where clause, and 1 in the Having
-	 * clause.
-	 */
-	private int numSubQueryPredicates;
-	
-	/*
-	 * for now a top level QB can have 1 where clause SQ predicate.
-	 */
-	private QBSubQuery whereClauseSubQueryPredicate;
-	
+
+  /*
+   * used to give a unique name to each SubQuery QB Currently there can be at
+   * most 2 SubQueries in a Query: 1 in the Where clause, and 1 in the Having
+   * clause.
+   */
+  private int numSubQueryPredicates;
+
   /*
    * for now a top level QB can have 1 where clause SQ predicate.
    */
-	private QBSubQuery havingClauseSubQueryPredicate;
+  private QBSubQuery whereClauseSubQueryPredicate;
+
+  /*
+   * for now a top level QB can have 1 where clause SQ predicate.
+   */
+  private QBSubQuery havingClauseSubQueryPredicate;
 
   // results
 
@@ -341,28 +341,28 @@ public class QB {
   protected QBSubQuery getSubQueryPredicateDef() {
     return subQueryPredicateDef;
   }
-  
-	protected int getNumSubQueryPredicates() {
-		return numSubQueryPredicates;
-	}
-
-	protected int incrNumSubQueryPredicates() {
-		return ++numSubQueryPredicates;
-	}
-	
-	void setWhereClauseSubQueryPredicate(QBSubQuery sq) {
-	  whereClauseSubQueryPredicate = sq;
-  }
-	
-	public QBSubQuery getWhereClauseSubQueryPredicate() {
-	  return whereClauseSubQueryPredicate;
-	}
-	
-	void setHavingClauseSubQueryPredicate(QBSubQuery sq) {
+
+  protected int getNumSubQueryPredicates() {
+    return numSubQueryPredicates;
+  }
+
+  protected int incrNumSubQueryPredicates() {
+    return ++numSubQueryPredicates;
+  }
+
+  void setWhereClauseSubQueryPredicate(QBSubQuery sq) {
+    whereClauseSubQueryPredicate = sq;
+  }
+
+  public QBSubQuery getWhereClauseSubQueryPredicate() {
+    return whereClauseSubQueryPredicate;
+  }
+
+  void setHavingClauseSubQueryPredicate(QBSubQuery sq) {
     havingClauseSubQueryPredicate = sq;
   }
-	
-	public QBSubQuery getHavingClauseSubQueryPredicate() {
+
+  public QBSubQuery getHavingClauseSubQueryPredicate() {
     return havingClauseSubQueryPredicate;
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java Mon Sep  8 04:38:17 2014
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.ql.parse.SubQueryDiagnostic.QBSubQueryRewrite;
 
 public class QBSubQuery implements ISubQueryJoinInfo {
-  
+
   public static enum SubQueryType {
     EXISTS,
     NOT_EXISTS,
@@ -149,16 +149,16 @@ public class QBSubQuery implements ISubQ
   }
 
   /*
-   * This class captures the information about a 
+   * This class captures the information about a
    * conjunct in the where clause of the SubQuery.
    * For a equality predicate it capture for each side:
    * - the AST
    * - the type of Expression (basically what columns are referenced)
-   * - for Expressions that refer the parent it captures the 
+   * - for Expressions that refer the parent it captures the
    *   parent's ColumnInfo. In case of outer Aggregation expressions
    *   we need this to introduce a new mapping in the OuterQuery
    *   RowResolver. A join condition must use qualified column references,
-   *   so we generate a new name for the aggr expression and use it in the 
+   *   so we generate a new name for the aggr expression and use it in the
    *   joining condition.
    *   For e.g.
    *   having exists ( select x from R2 where y = min(R1.z) )
@@ -174,8 +174,8 @@ public class QBSubQuery implements ISubQ
     private final ColumnInfo leftOuterColInfo;
     private final ColumnInfo rightOuterColInfo;
 
-   Conjunct(ASTNode leftExpr, 
-        ASTNode rightExpr, 
+   Conjunct(ASTNode leftExpr,
+        ASTNode rightExpr,
         ExprType leftExprType,
         ExprType rightExprType,
         ColumnInfo leftOuterColInfo,
@@ -239,8 +239,8 @@ public class QBSubQuery implements ISubQ
     Stack<Node> stack;
 
     ConjunctAnalyzer(RowResolver parentQueryRR,
-    		boolean forHavingClause,
-    		String parentQueryNewAlias) {
+        boolean forHavingClause,
+        String parentQueryNewAlias) {
       this.parentQueryRR = parentQueryRR;
       defaultExprProcessor = new DefaultExprProcessor();
       this.forHavingClause = forHavingClause;
@@ -260,13 +260,13 @@ public class QBSubQuery implements ISubQ
     private ObjectPair<ExprType,ColumnInfo> analyzeExpr(ASTNode expr) {
       ColumnInfo cInfo = null;
       if ( forHavingClause ) {
-      	try {
-      	  cInfo = parentQueryRR.getExpression(expr);
-      		if ( cInfo != null) {
-      		    return ObjectPair.create(ExprType.REFERS_PARENT, cInfo);
-      	    }
-      	} catch(SemanticException se) {
-      	}
+        try {
+          cInfo = parentQueryRR.getExpression(expr);
+          if ( cInfo != null) {
+              return ObjectPair.create(ExprType.REFERS_PARENT, cInfo);
+            }
+        } catch(SemanticException se) {
+        }
       }
       if ( expr.getType() == HiveParser.DOT) {
         ASTNode dot = firstDot(expr);
@@ -308,12 +308,12 @@ public class QBSubQuery implements ISubQ
         ObjectPair<ExprType,ColumnInfo> leftInfo = analyzeExpr(left);
         ObjectPair<ExprType,ColumnInfo> rightInfo = analyzeExpr(right);
 
-        return new Conjunct(left, right, 
+        return new Conjunct(left, right,
             leftInfo.getFirst(), rightInfo.getFirst(),
             leftInfo.getSecond(), rightInfo.getSecond());
       } else {
         ObjectPair<ExprType,ColumnInfo> sqExprInfo = analyzeExpr(conjunct);
-        return new Conjunct(conjunct, null, 
+        return new Conjunct(conjunct, null,
             sqExprInfo.getFirst(), null,
             sqExprInfo.getSecond(), sqExprInfo.getSecond());
       }
@@ -354,86 +354,86 @@ public class QBSubQuery implements ISubQ
   }
 
   /*
-   * When transforming a Not In SubQuery we need to check for nulls in the 
+   * When transforming a Not In SubQuery we need to check for nulls in the
    * Joining expressions of the SubQuery. If there are nulls then the SubQuery always
-   * return false. For more details see 
+   * return false. For more details see
    * https://issues.apache.org/jira/secure/attachment/12614003/SubQuerySpec.pdf
-   * 
+   *
    * Basically, SQL semantics say that:
    * - R1.A not in (null, 1, 2, ...)
-   *   is always false. 
-   *   A 'not in' operator is equivalent to a '<> all'. Since a not equal check with null 
+   *   is always false.
+   *   A 'not in' operator is equivalent to a '<> all'. Since a not equal check with null
    *   returns false, a not in predicate against aset with a 'null' value always returns false.
-   *   
+   *
    * So for not in SubQuery predicates:
    * - we join in a null count predicate.
    * - And the joining condition is that the 'Null Count' query has a count of 0.
-   *   
+   *
    */
   class NotInCheck implements ISubQueryJoinInfo {
-    
+
     private static final String CNT_ALIAS = "c1";
-    
+
     /*
      * expressions in SubQ that are joined to the Outer Query.
      */
     List<ASTNode> subQryCorrExprs;
-    
+
     /*
      * row resolver of the SubQuery.
      * Set by the SemanticAnalyzer after the Plan for the SubQuery is genned.
      * This is neede in case the SubQuery select list contains a TOK_ALLCOLREF
      */
     RowResolver sqRR;
-    
+
     NotInCheck() {
       subQryCorrExprs = new ArrayList<ASTNode>();
     }
-    
+
     void addCorrExpr(ASTNode corrExpr) {
       subQryCorrExprs.add(corrExpr);
     }
-    
+
     public ASTNode getSubQueryAST() {
       ASTNode ast = SubQueryUtils.buildNotInNullCheckQuery(
-          QBSubQuery.this.getSubQueryAST(), 
-          QBSubQuery.this.getAlias(), 
-          CNT_ALIAS, 
+          QBSubQuery.this.getSubQueryAST(),
+          QBSubQuery.this.getAlias(),
+          CNT_ALIAS,
           subQryCorrExprs,
           sqRR);
       SubQueryUtils.setOriginDeep(ast, QBSubQuery.this.originalSQASTOrigin);
       return ast;
     }
-    
+
     public String getAlias() {
       return QBSubQuery.this.getAlias() + "_notin_nullcheck";
     }
-    
+
     public JoinType getJoinType() {
       return JoinType.LEFTSEMI;
     }
-    
+
     public ASTNode getJoinConditionAST() {
-      ASTNode ast = 
+      ASTNode ast =
           SubQueryUtils.buildNotInNullJoinCond(getAlias(), CNT_ALIAS);
       SubQueryUtils.setOriginDeep(ast, QBSubQuery.this.originalSQASTOrigin);
       return ast;
     }
-    
+
     public QBSubQuery getSubQuery() {
       return QBSubQuery.this;
     }
-    
+
     public String getOuterQueryId() {
       return QBSubQuery.this.getOuterQueryId();
     }
-    
+
     void setSQRR(RowResolver sqRR) {
       this.sqRR = sqRR;
     }
-        
+
   }
-  
+
   private final String outerQueryId;
   private final int sqIdx;
   private final String alias;
@@ -455,11 +455,11 @@ public class QBSubQuery implements ISubQ
   private int numOfCorrelationExprsAddedToSQSelect;
 
   private boolean groupbyAddedToSQ;
-  
+
   private int numOuterCorrExprsForHaving;
-  
+
   private NotInCheck notInCheck;
-  
+
   private QBSubQueryRewrite subQueryDiagnostic;
 
   public QBSubQuery(String outerQueryId,
@@ -483,11 +483,11 @@ public class QBSubQuery implements ISubQ
     originalSQASTOrigin = new ASTNodeOrigin("SubQuery", alias, s, alias, originalSQAST);
     numOfCorrelationExprsAddedToSQSelect = 0;
     groupbyAddedToSQ = false;
-    
+
     if ( operator.getType() == SubQueryType.NOT_IN ) {
       notInCheck = new NotInCheck();
     }
-    
+
     subQueryDiagnostic = SubQueryDiagnostic.getRewrite(this, ctx.getTokenRewriteStream(), ctx);
   }
 
@@ -500,18 +500,18 @@ public class QBSubQuery implements ISubQ
   public SubQueryTypeDef getOperator() {
     return operator;
   }
-  
+
   public ASTNode getOriginalSubQueryASTForRewrite() {
     return (operator.getType() == SubQueryType.NOT_EXISTS
-        || operator.getType() == SubQueryType.NOT_IN ? 
-        (ASTNode) originalSQASTOrigin.getUsageNode().getParent() : 
+        || operator.getType() == SubQueryType.NOT_IN ?
+        (ASTNode) originalSQASTOrigin.getUsageNode().getParent() :
         originalSQASTOrigin.getUsageNode());
   }
 
   void validateAndRewriteAST(RowResolver outerQueryRR,
-		  boolean forHavingClause,
-		  String outerQueryAlias,
-		  Set<String> outerQryAliases) throws SemanticException {
+      boolean forHavingClause,
+      String outerQueryAlias,
+      Set<String> outerQryAliases) throws SemanticException {
 
     ASTNode selectClause = (ASTNode) subQueryAST.getChild(1).getChild(1);
 
@@ -519,12 +519,12 @@ public class QBSubQuery implements ISubQ
     if ( selectClause.getChild(0).getType() == HiveParser.TOK_HINTLIST ) {
       selectExprStart = 1;
     }
-    
+
     /*
      * Restriction.16.s :: Correlated Expression in Outer Query must not contain
      * unqualified column references.
      */
-    if ( parentQueryExpression != null && !forHavingClause ) { 
+    if ( parentQueryExpression != null && !forHavingClause ) {
         ASTNode u = SubQueryUtils.hasUnQualifiedColumnReferences(parentQueryExpression);
         if ( u != null ) {
           subQueryAST.setOrigin(originalSQASTOrigin);
@@ -532,7 +532,7 @@ public class QBSubQuery implements ISubQ
               u, "Correlating expression cannot contain unqualified column references."));
         }
     }
-    
+
     /*
      * Restriction 17.s :: SubQuery cannot use the same table alias as one used in
      * the Outer Query.
@@ -546,14 +546,14 @@ public class QBSubQuery implements ISubQ
     }
     if ( sharedAlias != null) {
       ASTNode whereClause = SubQueryUtils.subQueryWhere(subQueryAST);
-      
+
       if ( whereClause != null ) {
         ASTNode u = SubQueryUtils.hasUnQualifiedColumnReferences(whereClause);
         if ( u != null ) {
           subQueryAST.setOrigin(originalSQASTOrigin);
           throw new SemanticException(ErrorMsg.UNSUPPORTED_SUBQUERY_EXPRESSION.getMsg(
               u, "SubQuery cannot use the table alias: " + sharedAlias + "; " +
-              		"this is also an alias in the Outer Query and SubQuery contains a unqualified column reference"));
+                  "this is also an alias in the Outer Query and SubQuery contains a unqualified column reference"));
         }
       }
     }
@@ -641,25 +641,25 @@ public class QBSubQuery implements ISubQ
   }
 
   void buildJoinCondition(RowResolver outerQueryRR, RowResolver sqRR,
-		  boolean forHavingClause,
-		  String outerQueryAlias) throws SemanticException {
+      boolean forHavingClause,
+      String outerQueryAlias) throws SemanticException {
     ASTNode parentQueryJoinCond = null;
 
     if ( parentQueryExpression != null ) {
-      
+
       ColumnInfo outerQueryCol = null;
       try {
         outerQueryCol = outerQueryRR.getExpression(parentQueryExpression);
       } catch(SemanticException se) {
       }
-      
+
       parentQueryJoinCond = SubQueryUtils.buildOuterQryToSQJoinCond(
         getOuterQueryExpression(),
         alias,
         sqRR);
-      
+
       if ( outerQueryCol != null ) {
-        rewriteCorrConjunctForHaving(parentQueryJoinCond, true, 
+        rewriteCorrConjunctForHaving(parentQueryJoinCond, true,
             outerQueryAlias, outerQueryRR, outerQueryCol);
       }
       subQueryDiagnostic.addJoinCondition(parentQueryJoinCond, outerQueryCol != null, true);
@@ -682,10 +682,10 @@ public class QBSubQuery implements ISubQ
   ASTNode updateOuterQueryFilter(ASTNode outerQryFilter) {
     if (postJoinConditionAST == null ) {
       return outerQryFilter;
-    }  
-    
+    }
+
     subQueryDiagnostic.addPostJoinCondition(postJoinConditionAST);
-    
+
     if ( outerQryFilter == null ) {
       return postJoinConditionAST;
     }
@@ -738,7 +738,7 @@ public class QBSubQuery implements ISubQ
    * Additional things for Having clause:
    * - A correlation predicate may refer to an aggregation expression.
    * - This introduces 2 twists to the rewrite:
-   *   a. When analyzing equality predicates we need to analyze each side 
+   *   a. When analyzing equality predicates we need to analyze each side
    *      to see if it is an aggregation expression from the Outer Query.
    *      So for e.g. this is a valid correlation predicate:
    *         R2.x = min(R1.y)
@@ -748,12 +748,12 @@ public class QBSubQuery implements ISubQ
    *      to contain a qualified column references.
    *      We handle this by generating a new name for the aggregation expression,
    *      like R1._gby_sq_col_1 and adding this mapping to the Outer Query's
-   *      Row Resolver. Then we construct a joining predicate using this new 
+   *      Row Resolver. Then we construct a joining predicate using this new
    *      name; so in our e.g. the condition would be: R2.x = R1._gby_sq_col_1
    */
   private void rewrite(RowResolver parentQueryRR,
-		  boolean forHavingClause,
-		  String outerQueryAlias) throws SemanticException {
+      boolean forHavingClause,
+      String outerQueryAlias) throws SemanticException {
     ASTNode selectClause = (ASTNode) subQueryAST.getChild(1).getChild(1);
     ASTNode whereClause = SubQueryUtils.subQueryWhere(subQueryAST);
 
@@ -766,7 +766,7 @@ public class QBSubQuery implements ISubQ
     SubQueryUtils.extractConjuncts(searchCond, conjuncts);
 
     ConjunctAnalyzer conjunctAnalyzer = new ConjunctAnalyzer(parentQueryRR,
-    		forHavingClause, outerQueryAlias);
+        forHavingClause, outerQueryAlias);
     ASTNode sqNewSearchCond = null;
 
     for(ASTNode conjunctAST : conjuncts) {
@@ -805,7 +805,7 @@ public class QBSubQuery implements ISubQ
           corrCondLeftIsRewritten = true;
           if ( forHavingClause && conjunct.getRightOuterColInfo() != null ) {
             corrCondRightIsRewritten = true;
-            rewriteCorrConjunctForHaving(conjunctAST, false, outerQueryAlias, 
+            rewriteCorrConjunctForHaving(conjunctAST, false, outerQueryAlias,
                 parentQueryRR, conjunct.getRightOuterColInfo());
           }
           ASTNode joinPredciate = SubQueryUtils.alterCorrelatedPredicate(
@@ -829,7 +829,7 @@ public class QBSubQuery implements ISubQ
           corrCondRightIsRewritten = true;
           if ( forHavingClause && conjunct.getLeftOuterColInfo() != null ) {
             corrCondLeftIsRewritten = true;
-            rewriteCorrConjunctForHaving(conjunctAST, true, outerQueryAlias, 
+            rewriteCorrConjunctForHaving(conjunctAST, true, outerQueryAlias,
                 parentQueryRR, conjunct.getLeftOuterColInfo());
           }
           ASTNode joinPredciate = SubQueryUtils.alterCorrelatedPredicate(
@@ -901,7 +901,7 @@ public class QBSubQuery implements ISubQ
     for(ASTNode child : newChildren ) {
       subQueryAST.addChild(child);
     }
-    
+
     subQueryDiagnostic.setAddGroupByClause();
 
     return groupBy;
@@ -927,26 +927,26 @@ public class QBSubQuery implements ISubQ
   public int getNumOfCorrelationExprsAddedToSQSelect() {
     return numOfCorrelationExprsAddedToSQSelect;
   }
-  
-    
+
+
   public QBSubQueryRewrite getDiagnostic() {
     return subQueryDiagnostic;
   }
-  
+
   public QBSubQuery getSubQuery() {
     return this;
   }
-  
+
   NotInCheck getNotInCheck() {
     return notInCheck;
   }
-  
+
   private void rewriteCorrConjunctForHaving(ASTNode conjunctASTNode,
       boolean refersLeft,
       String outerQueryAlias,
       RowResolver outerQueryRR,
       ColumnInfo outerQueryCol) {
-    
+
     String newColAlias = "_gby_sq_col_" + numOuterCorrExprsForHaving++;
     ASTNode outerExprForCorr = SubQueryUtils.createColRefAST(outerQueryAlias, newColAlias);
     if ( refersLeft ) {
@@ -956,5 +956,5 @@ public class QBSubQuery implements ISubQ
     }
     outerQueryRR.put(outerQueryAlias, newColAlias, outerQueryCol);
   }
-      
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Mon Sep  8 04:38:17 2014
@@ -98,7 +98,7 @@ public class RowResolver implements Seri
 
   public void put(String tab_alias, String col_alias, ColumnInfo colInfo) {
     if (!addMappingOnly(tab_alias, col_alias, colInfo)) {
-    	rowSchema.getSignature().add(colInfo);
+      rowSchema.getSignature().add(colInfo);
     }
   }
 
@@ -289,7 +289,7 @@ public class RowResolver implements Seri
   public boolean getIsExprResolver() {
     return isExprResolver;
   }
-  
+
   public String[] getAlternateMappings(String internalName) {
     return altInvRslvMap.get(internalName);
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Sep  8 04:38:17 2014
@@ -2239,8 +2239,8 @@ public class SemanticAnalyzer extends Ba
         String havingInputAlias = null;
 
         if ( forHavingClause ) {
-        	havingInputAlias = "gby_sq" + sqIdx;
-        	aliasToOpInfo.put(havingInputAlias, input);
+          havingInputAlias = "gby_sq" + sqIdx;
+          aliasToOpInfo.put(havingInputAlias, input);
         }
 
         subQuery.validateAndRewriteAST(inputRR, forHavingClause, havingInputAlias, aliasToOpInfo.keySet());
@@ -2351,7 +2351,10 @@ public class SemanticAnalyzer extends Ba
     ExprNodeDesc filterPred = null;
     List<Boolean> nullSafes = joinTree.getNullSafes();
     for (int i = 0; i < joinKeys.length; i++) {
-      if ( nullSafes.get(i)) {
+      if (nullSafes.get(i) || (joinKeys[i] instanceof ExprNodeColumnDesc &&
+         ((ExprNodeColumnDesc)joinKeys[i]).getIsPartitionColOrVirtualCol())) {
+        // no need to generate is not null predicate for partitioning or
+        // virtual column, since those columns can never be null.
         continue;
       }
       List<ExprNodeDesc> args = new ArrayList<ExprNodeDesc>();
@@ -2531,7 +2534,7 @@ public class SemanticAnalyzer extends Ba
 
       try {
         serdeClass = (Class<? extends Deserializer>) Class.forName(serdeName,
-            true, JavaUtils.getClassLoader());
+            true, Utilities.getSessionSpecifiedClassLoader());
       } catch (ClassNotFoundException e) {
         throw new SemanticException(e);
       }
@@ -2720,7 +2723,7 @@ public class SemanticAnalyzer extends Ba
 
     try {
       serde = (Class<? extends Deserializer>) Class.forName(defaultSerdeName,
-          true, JavaUtils.getClassLoader());
+          true, Utilities.getSessionSpecifiedClassLoader());
     } catch (ClassNotFoundException e) {
       throw new SemanticException(e);
     }
@@ -2787,7 +2790,7 @@ public class SemanticAnalyzer extends Ba
 
     try {
       return (Class<? extends RecordReader>) Class.forName(name, true,
-          JavaUtils.getClassLoader());
+          Utilities.getSessionSpecifiedClassLoader());
     } catch (ClassNotFoundException e) {
       throw new SemanticException(e);
     }
@@ -2801,7 +2804,7 @@ public class SemanticAnalyzer extends Ba
 
     try {
       return (Class<? extends RecordReader>) Class.forName(name, true,
-          JavaUtils.getClassLoader());
+          Utilities.getSessionSpecifiedClassLoader());
     } catch (ClassNotFoundException e) {
       throw new SemanticException(e);
     }
@@ -2819,7 +2822,7 @@ public class SemanticAnalyzer extends Ba
 
     try {
       return (Class<? extends RecordWriter>) Class.forName(name, true,
-          JavaUtils.getClassLoader());
+          Utilities.getSessionSpecifiedClassLoader());
     } catch (ClassNotFoundException e) {
       throw new SemanticException(e);
     }
@@ -11586,40 +11589,40 @@ public class SemanticAnalyzer extends Ba
   }
 
   private void addAlternateGByKeyMappings(ASTNode gByExpr, ColumnInfo colInfo,
-		  Operator<? extends OperatorDesc> reduceSinkOp, RowResolver gByRR) {
-	  if ( gByExpr.getType() == HiveParser.DOT
+      Operator<? extends OperatorDesc> reduceSinkOp, RowResolver gByRR) {
+    if ( gByExpr.getType() == HiveParser.DOT
           && gByExpr.getChild(0).getType() == HiveParser.TOK_TABLE_OR_COL ) {
-		  String tab_alias = BaseSemanticAnalyzer.unescapeIdentifier(gByExpr
-		            .getChild(0).getChild(0).getText());
-		  String col_alias = BaseSemanticAnalyzer.unescapeIdentifier(
-				  gByExpr.getChild(1).getText());
-		  gByRR.put(tab_alias, col_alias, colInfo);
-	  } else if ( gByExpr.getType() == HiveParser.TOK_TABLE_OR_COL ) {
-		  String col_alias = BaseSemanticAnalyzer.unescapeIdentifier(gByExpr
-		          .getChild(0).getText());
-		  String tab_alias = null;
-		  /*
-		   * If the input to the GBy has a tab alias for the column, then add an entry
-		   * based on that tab_alias.
-		   * For e.g. this query:
-		   * select b.x, count(*) from t1 b group by x
-		   * needs (tab_alias=b, col_alias=x) in the GBy RR.
-		   * tab_alias=b comes from looking at the RowResolver that is the ancestor
-		   * before any GBy/ReduceSinks added for the GBY operation.
-		   */
-		  Operator<? extends OperatorDesc> parent = reduceSinkOp;
-		  while ( parent instanceof ReduceSinkOperator ||
-				  parent instanceof GroupByOperator ) {
-			  parent = parent.getParentOperators().get(0);
-		  }
-		  RowResolver parentRR = opParseCtx.get(parent).getRowResolver();
-		  try {
-			  ColumnInfo pColInfo = parentRR.get(tab_alias, col_alias);
-			  tab_alias = pColInfo == null ? null : pColInfo.getTabAlias();
-		  } catch(SemanticException se) {
-		  }
-		  gByRR.put(tab_alias, col_alias, colInfo);
-	  }
+      String tab_alias = BaseSemanticAnalyzer.unescapeIdentifier(gByExpr
+                .getChild(0).getChild(0).getText());
+      String col_alias = BaseSemanticAnalyzer.unescapeIdentifier(
+          gByExpr.getChild(1).getText());
+      gByRR.put(tab_alias, col_alias, colInfo);
+    } else if ( gByExpr.getType() == HiveParser.TOK_TABLE_OR_COL ) {
+      String col_alias = BaseSemanticAnalyzer.unescapeIdentifier(gByExpr
+              .getChild(0).getText());
+      String tab_alias = null;
+      /*
+       * If the input to the GBy has a tab alias for the column, then add an entry
+       * based on that tab_alias.
+       * For e.g. this query:
+       * select b.x, count(*) from t1 b group by x
+       * needs (tab_alias=b, col_alias=x) in the GBy RR.
+       * tab_alias=b comes from looking at the RowResolver that is the ancestor
+       * before any GBy/ReduceSinks added for the GBY operation.
+       */
+      Operator<? extends OperatorDesc> parent = reduceSinkOp;
+      while ( parent instanceof ReduceSinkOperator ||
+          parent instanceof GroupByOperator ) {
+        parent = parent.getParentOperators().get(0);
+      }
+      RowResolver parentRR = opParseCtx.get(parent).getRowResolver();
+      try {
+        ColumnInfo pColInfo = parentRR.get(tab_alias, col_alias);
+        tab_alias = pColInfo == null ? null : pColInfo.getTabAlias();
+      } catch(SemanticException se) {
+      }
+      gByRR.put(tab_alias, col_alias, colInfo);
+    }
   }
 
   private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Mon Sep  8 04:38:17 2014
@@ -21,20 +21,24 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.Stack;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -52,20 +56,25 @@ import org.apache.hadoop.hive.ql.lib.For
 import org.apache.hadoop.hive.ql.lib.GraphWalker;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
 import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
 import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
+import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization;
 import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
+import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize;
 import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
+import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
 import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
 import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
-import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
 import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
+import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
+import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -85,7 +94,7 @@ public class TezCompiler extends TaskCom
   @Override
   public void init(HiveConf conf, LogHelper console, Hive db) {
     super.init(conf, console, db);
-    
+
     // Tez requires us to use RPC for the query plan
     HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
 
@@ -98,31 +107,203 @@ public class TezCompiler extends TaskCom
   protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
       Set<WriteEntity> outputs) throws SemanticException {
 
-    // Sequence of TableScan operators to be walked
+    // Create the context for the walker
+    OptimizeTezProcContext procCtx = new OptimizeTezProcContext(conf, pCtx, inputs, outputs);
+
+    // setup dynamic partition pruning where possible
+    runDynamicPartitionPruning(procCtx, inputs, outputs);
+
+    // setup stats in the operator plan
+    runStatsAnnotation(procCtx);
+
+    // run the optimizations that use stats for optimization
+    runStatsDependentOptimizations(procCtx, inputs, outputs);
+
+    // after the stats phase we might have some cyclic dependencies that we need
+    // to take care of.
+    runCycleAnalysisForPartitionPruning(procCtx, inputs, outputs);
+
+  }
+
+  private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx,
+      Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+
+    if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+      return;
+    }
+
+    boolean cycleFree = false;
+    while (!cycleFree) {
+      cycleFree = true;
+      Set<Set<Operator<?>>> components = getComponents(procCtx);
+      for (Set<Operator<?>> component : components) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Component: ");
+          for (Operator<?> co : component) {
+            LOG.debug("Operator: " + co.getName() + ", " + co.getIdentifier());
+          }
+        }
+        if (component.size() != 1) {
+          LOG.info("Found cycle in operator plan...");
+          cycleFree = false;
+          removeEventOperator(component);
+        }
+      }
+      LOG.info("Cycle free: " + cycleFree);
+    }
+  }
+
+  private void removeEventOperator(Set<Operator<?>> component) {
+    AppMasterEventOperator victim = null;
+    for (Operator<?> o : component) {
+      if (o instanceof AppMasterEventOperator) {
+        if (victim == null
+            || o.getConf().getStatistics().getDataSize() < victim.getConf().getStatistics()
+                .getDataSize()) {
+          victim = (AppMasterEventOperator) o;
+        }
+      }
+    }
+
+    Operator<?> child = victim;
+    Operator<?> curr = victim;
+
+    while (curr.getChildOperators().size() <= 1) {
+      child = curr;
+      curr = curr.getParentOperators().get(0);
+    }
+
+    // at this point we've found the fork in the op pipeline that has the
+    // pruning as a child plan.
+    LOG.info("Disabling dynamic pruning for: "
+        + ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString()
+        + ". Needed to break cyclic dependency");
+    curr.removeChild(child);
+  }
+
+  // Tarjan's algo
+  private Set<Set<Operator<?>>> getComponents(OptimizeTezProcContext procCtx) {
     Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
-    deque.addAll(pCtx.getTopOps().values());
+    deque.addAll(procCtx.parseContext.getTopOps().values());
 
-    // Create the context for the walker
-    OptimizeTezProcContext procCtx
-      = new OptimizeTezProcContext(conf, pCtx, inputs, outputs, deque);
+    AtomicInteger index = new AtomicInteger();
+    Map<Operator<?>, Integer> indexes = new HashMap<Operator<?>, Integer>();
+    Map<Operator<?>, Integer> lowLinks = new HashMap<Operator<?>, Integer>();
+    Stack<Operator<?>> nodes = new Stack<Operator<?>>();
+    Set<Set<Operator<?>>> components = new HashSet<Set<Operator<?>>>();
+
+    for (Operator<?> o : deque) {
+      if (!indexes.containsKey(o)) {
+        connect(o, index, nodes, indexes, lowLinks, components);
+      }
+    }
+
+    return components;
+  }
+
+  private void connect(Operator<?> o, AtomicInteger index, Stack<Operator<?>> nodes,
+      Map<Operator<?>, Integer> indexes, Map<Operator<?>, Integer> lowLinks,
+      Set<Set<Operator<?>>> components) {
+
+    indexes.put(o, index.get());
+    lowLinks.put(o, index.get());
+    index.incrementAndGet();
+    nodes.push(o);
+
+    List<Operator<?>> children;
+    if (o instanceof AppMasterEventOperator) {
+      children = new ArrayList<Operator<?>>();
+      children.addAll(o.getChildOperators());
+      TableScanOperator ts = ((DynamicPruningEventDesc) o.getConf()).getTableScan();
+      LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
+      children.add(ts);
+    } else {
+      children = o.getChildOperators();
+    }
+
+    for (Operator<?> child : children) {
+      if (!indexes.containsKey(child)) {
+        connect(child, index, nodes, indexes, lowLinks, components);
+        lowLinks.put(child, Math.min(lowLinks.get(o), lowLinks.get(child)));
+      } else if (nodes.contains(child)) {
+        lowLinks.put(o, Math.min(lowLinks.get(o), indexes.get(child)));
+      }
+    }
+
+    if (lowLinks.get(o).equals(indexes.get(o))) {
+      Set<Operator<?>> component = new HashSet<Operator<?>>();
+      components.add(component);
+      Operator<?> current;
+      do {
+        current = nodes.pop();
+        component.add(current);
+      } while (current != o);
+    }
+  }
+
+  private void runStatsAnnotation(OptimizeTezProcContext procCtx) throws SemanticException {
+    new AnnotateWithStatistics().transform(procCtx.parseContext);
+    new AnnotateWithOpTraits().transform(procCtx.parseContext);
+  }
+
+  private void runStatsDependentOptimizations(OptimizeTezProcContext procCtx,
+      Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+
+    // Sequence of TableScan operators to be walked
+    Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+    deque.addAll(procCtx.parseContext.getTopOps().values());
 
     // create a walker which walks the tree in a DFS manner while maintaining
     // the operator stack.
     Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-    opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"),
+    opRules.put(new RuleRegExp("Set parallelism - ReduceSink",
         ReduceSinkOperator.getOperatorName() + "%"),
         new SetReducerParallelism());
 
-    opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
+    opRules.put(new RuleRegExp("Convert Join to Map-join",
         JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
 
+    opRules.put(
+        new RuleRegExp("Remove dynamic pruning by size",
+        AppMasterEventOperator.getOperatorName() + "%"),
+        new RemoveDynamicPruningBySize());
+
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
     Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
     List<Node> topNodes = new ArrayList<Node>();
-    topNodes.addAll(pCtx.getTopOps().values());
+    topNodes.addAll(procCtx.parseContext.getTopOps().values());
+    GraphWalker ogw = new ForwardWalker(disp);
+    ogw.startWalking(topNodes, null);
+  }
+
+  private void runDynamicPartitionPruning(OptimizeTezProcContext procCtx, Set<ReadEntity> inputs,
+      Set<WriteEntity> outputs) throws SemanticException {
+
+    if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+      return;
+    }
+
+    // Sequence of TableScan operators to be walked
+    Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+    deque.addAll(procCtx.parseContext.getTopOps().values());
+
+    Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+    opRules.put(
+        new RuleRegExp(new String("Dynamic Partition Pruning"), FilterOperator.getOperatorName()
+            + "%"), new DynamicPartitionPruningOptimization());
+
+    // The dispatcher fires the processor corresponding to the closest matching
+    // rule and passes the context along
+    Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(procCtx.parseContext.getTopOps().values());
     GraphWalker ogw = new ForwardWalker(disp);
     ogw.startWalking(topNodes, null);
+
+    // need a new run of the constant folding because we might have created lots
+    // of "and true and true" conditions.
+    new ConstantPropagate().transform(procCtx.parseContext);
   }
 
   @Override
@@ -158,19 +339,12 @@ public class TezCompiler extends TaskCom
         new ProcessAnalyzeTable(GenTezUtils.getUtils()));
 
     opRules.put(new RuleRegExp("Remember union",
-        UnionOperator.getOperatorName() + "%"), new NodeProcessor()
-    {
-      @Override
-      public Object process(Node n, Stack<Node> s,
-          NodeProcessorCtx procCtx, Object... os) throws SemanticException {
-        GenTezProcContext context = (GenTezProcContext) procCtx;
-        UnionOperator union = (UnionOperator) n;
-
-        // simply need to remember that we've seen a union.
-        context.currentUnionOperators.add(union);
-        return null;
-      }
-    });
+        UnionOperator.getOperatorName() + "%"),
+        new UnionProcessor());
+
+    opRules.put(new RuleRegExp("AppMasterEventOperator",
+        AppMasterEventOperator.getOperatorName() + "%"),
+        new AppMasterEventProcessor());
 
     // The dispatcher fires the processor corresponding to the closest matching
     // rule and passes the context along
@@ -185,10 +359,17 @@ public class TezCompiler extends TaskCom
       GenTezUtils.getUtils().removeUnionOperators(conf, procCtx, w);
     }
 
-    // finally make sure the file sink operators are set up right
+    // then we make sure the file sink operators are set up right
     for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
       GenTezUtils.getUtils().processFileSink(procCtx, fileSink);
     }
+
+    // and finally we hook up any events that need to be sent to the tez AM
+    LOG.debug("There are " + procCtx.eventOperatorSet.size() + " app master events.");
+    for (AppMasterEventOperator event : procCtx.eventOperatorSet) {
+      LOG.debug("Handling AppMasterEventOperator: " + event);
+      GenTezUtils.getUtils().processAppMasterEvent(procCtx, event);
+    }
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java Mon Sep  8 04:38:17 2014
@@ -23,6 +23,7 @@ import java.io.Serializable;
 
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.ql.exec.PTFUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -93,7 +94,7 @@ public class AggregationDesc implements 
     try {
       return genericUDAFEvaluator =
           ReflectionUtils.newInstance(Class.forName(genericUDAFEvaluatorClassName, true,
-          JavaUtils.getClassLoader()).asSubclass(GenericUDAFEvaluator.class), null);
+          Utilities.getSessionSpecifiedClassLoader()).asSubclass(GenericUDAFEvaluator.class), null);
     } catch (ClassNotFoundException e) {
       throw new RuntimeException(e);
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ColStatistics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ColStatistics.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ColStatistics.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ColStatistics.java Mon Sep  8 04:38:17 2014
@@ -32,6 +32,7 @@ public class ColStatistics {
   private double avgColLen;
   private long numTrues;
   private long numFalses;
+  private Range range;
 
   public ColStatistics(String tabAlias, String colName, String colType) {
     this.setTableAlias(tabAlias);
@@ -118,6 +119,17 @@ public class ColStatistics {
     this.numFalses = numFalses;
   }
 
+  public Range getRange() {
+    return range;
+  }
+
+  public void setRange(Number minVal, Number maxVal) {
+    this.range = new Range(minVal, maxVal);
+  }
+
+  public void setRange(Range r) {
+    this.range = r;
+  }
 
   @Override
   public String toString() {
@@ -150,7 +162,24 @@ public class ColStatistics {
     clone.setNumNulls(numNulls);
     clone.setNumTrues(numTrues);
     clone.setNumFalses(numFalses);
+    if (range != null ) {
+      clone.setRange(range.clone());
+    }
     return clone;
   }
 
+  public static class Range {
+    public final Number minValue;
+    public final Number maxValue;
+    Range(Number minValue, Number maxValue) {
+      super();
+      this.minValue = minValue;
+      this.maxValue = maxValue;
+    }
+    @Override
+    public Range clone() {
+      return new Range(minValue, maxValue);
+    }
+  }
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java Mon Sep  8 04:38:17 2014
@@ -419,7 +419,7 @@ public class CreateTableDesc extends DDL
     if (this.getStorageHandler() == null) {
       try {
         Class<?> origin = Class.forName(this.getOutputFormat(), true,
-          JavaUtils.getClassLoader());
+          Utilities.getSessionSpecifiedClassLoader());
         Class<? extends HiveOutputFormat> replaced = HiveFileFormatUtils
           .getOutputFormatSubstitute(origin,false);
         if (replaced == null) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Mon Sep  8 04:38:17 2014
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 
 /**
  * FileSinkDesc.
@@ -84,6 +85,10 @@ public class FileSinkDesc extends Abstra
 
   private boolean statsCollectRawDataSize;
 
+  // Record what type of write this is.  Default is non-ACID (ie old style).
+  private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
+  private long txnId = 0;  // transaction id for this operation
+
   public FileSinkDesc() {
   }
 
@@ -137,6 +142,8 @@ public class FileSinkDesc extends Abstra
     ret.setMaxStatsKeyPrefixLength(maxStatsKeyPrefixLength);
     ret.setStatsCollectRawDataSize(statsCollectRawDataSize);
     ret.setDpSortState(dpSortState);
+    ret.setWriteType(writeType);
+    ret.setTransactionId(txnId);
     return (Object) ret;
   }
 
@@ -398,4 +405,20 @@ public class FileSinkDesc extends Abstra
   public void setDpSortState(DPSortState dpSortState) {
     this.dpSortState = dpSortState;
   }
+
+  public void setWriteType(AcidUtils.Operation type) {
+    writeType = type;
+  }
+
+  public AcidUtils.Operation getWriteType() {
+    return writeType;
+  }
+
+  public void setTransactionId(long id) {
+    txnId = id;
+  }
+
+  public long getTransactionId() {
+    return txnId;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java Mon Sep  8 04:38:17 2014
@@ -37,7 +37,7 @@ public class LoadFileDesc extends LoadDe
   private String destinationCreateTable;
 
   static {
-	  PTFUtils.makeTransient(LoadFileDesc.class, "targetDir");
+    PTFUtils.makeTransient(LoadFileDesc.class, "targetDir");
   }
   public LoadFileDesc() {
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java Mon Sep  8 04:38:17 2014
@@ -38,7 +38,7 @@ public class LoadMultiFilesDesc implemen
   private transient List<Path> srcDirs;
 
   static {
-	  PTFUtils.makeTransient(LoadMultiFilesDesc.class, "targetDirs");
+    PTFUtils.makeTransient(LoadMultiFilesDesc.class, "targetDirs");
   }
   public LoadMultiFilesDesc() {
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Mon Sep  8 04:38:17 2014
@@ -26,9 +26,9 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -116,6 +116,16 @@ public class MapWork extends BaseWork {
 
   private boolean useOneNullRowInputFormat;
 
+  private boolean dummyTableScan = false;
+
+  // used for dynamic partitioning
+  private Map<String, List<TableDesc>> eventSourceTableDescMap =
+      new LinkedHashMap<String, List<TableDesc>>();
+  private Map<String, List<String>> eventSourceColumnNameMap =
+      new LinkedHashMap<String, List<String>>();
+  private Map<String, List<ExprNodeDesc>> eventSourcePartKeyExprMap =
+      new LinkedHashMap<String, List<ExprNodeDesc>>();
+
   public MapWork() {}
 
   public MapWork(String name) {
@@ -525,4 +535,36 @@ public class MapWork extends BaseWork {
       }
     }
   }
+
+  public void setDummyTableScan(boolean dummyTableScan) {
+    this.dummyTableScan = dummyTableScan;
+  }
+
+  public boolean getDummyTableScan() {
+    return dummyTableScan;
+  }
+
+  public void setEventSourceTableDescMap(Map<String, List<TableDesc>> map) {
+    this.eventSourceTableDescMap = map;
+  }
+
+  public Map<String, List<TableDesc>> getEventSourceTableDescMap() {
+    return eventSourceTableDescMap;
+  }
+
+  public void setEventSourceColumnNameMap(Map<String, List<String>> map) {
+    this.eventSourceColumnNameMap = map;
+  }
+
+  public Map<String, List<String>> getEventSourceColumnNameMap() {
+    return eventSourceColumnNameMap;
+  }
+
+  public Map<String, List<ExprNodeDesc>> getEventSourcePartKeyExprMap() {
+    return eventSourcePartKeyExprMap;
+  }
+
+  public void setEventSourcePartKeyExprMap(Map<String, List<ExprNodeDesc>> map) {
+    this.eventSourcePartKeyExprMap = map;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java Mon Sep  8 04:38:17 2014
@@ -20,8 +20,8 @@ package org.apache.hadoop.hive.ql.plan;
 
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.LinkedHashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -32,13 +32,9 @@ import org.apache.hadoop.hive.ql.exec.Fi
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
@@ -99,7 +95,7 @@ public class ReduceWork extends BaseWork
   private ObjectInspector keyObjectInspector = null;
   private ObjectInspector valueObjectInspector = null;
 
-  private Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>();
+  private final Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>();
 
   /**
    * If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing
@@ -118,7 +114,7 @@ public class ReduceWork extends BaseWork
   private ObjectInspector getObjectInspector(TableDesc desc) {
     ObjectInspector objectInspector;
     try {
-      Deserializer deserializer = (SerDe) ReflectionUtils.newInstance(desc
+      Deserializer deserializer = ReflectionUtils.newInstance(desc
                 .getDeserializerClass(), null);
       SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), null);
       objectInspector = deserializer.getObjectInspector();
@@ -239,7 +235,6 @@ public class ReduceWork extends BaseWork
 
   @Override
   public void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap) {
-    assert replacementMap.size() == 1;
     setReducer(replacementMap.get(getReducer()));
   }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java Mon Sep  8 04:38:17 2014
@@ -26,6 +26,7 @@ import java.util.Properties;
 
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
@@ -65,7 +66,7 @@ public class TableDesc implements Serial
   public Class<? extends Deserializer> getDeserializerClass() {
     try {
       return (Class<? extends Deserializer>) Class.forName(
-          getSerdeClassName(), true, JavaUtils.getClassLoader());
+          getSerdeClassName(), true, Utilities.getSessionSpecifiedClassLoader());
     } catch (ClassNotFoundException e) {
       throw new RuntimeException(e);
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Mon Sep  8 04:38:17 2014
@@ -906,11 +906,8 @@ public final class OpProcFactory {
     }
 
     ExprNodeDesc condn = ExprNodeDescUtils.mergePredicates(preds);
-    if(!(condn instanceof ExprNodeGenericFuncDesc)) {
-      return null;
-    }
 
-    if (op instanceof TableScanOperator) {
+    if (op instanceof TableScanOperator && condn instanceof ExprNodeGenericFuncDesc) {
       boolean pushFilterToStorage;
       HiveConf hiveConf = owi.getParseContext().getConf();
       pushFilterToStorage =

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java Mon Sep  8 04:38:17 2014
@@ -80,6 +80,8 @@ public final class CommandProcessorFacto
         return new DeleteResourceProcessor();
       case COMPILE:
         return new CompileProcessor();
+      case RELOAD:
+        return new ReloadProcessor();
       default:
         throw new AssertionError("Unknown HiveCommand " + hiveCommand);
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java Mon Sep  8 04:38:17 2014
@@ -76,4 +76,9 @@ public class CommandProcessorResponse {
   public String getSQLState() { return SQLState; }
   public Schema getSchema() { return resSchema; }
   public Throwable getException() { return exception; }
+  public String toString() {
+    return "(" + responseCode + "," + errorMessage + "," + SQLState + 
+      (resSchema == null ? "" : ",") +
+      (exception == null ? "" : exception.getMessage()) + ")";
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java Mon Sep  8 04:38:17 2014
@@ -31,6 +31,7 @@ public enum HiveCommand {
   DFS(),
   ADD(),
   LIST(),
+  RELOAD(),
   DELETE(),
   COMPILE();
   private static final Set<String> COMMANDS = new HashSet<String>();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Sep  8 04:38:17 2014
@@ -24,14 +24,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintStream;
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.net.URLClassLoader;
+import java.util.*;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
@@ -45,7 +39,6 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.ql.MapRedStats;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -56,6 +49,9 @@ import org.apache.hadoop.hive.ql.exec.te
 import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.history.HiveHistoryImpl;
 import org.apache.hadoop.hive.ql.history.HiveHistoryProxyHandler;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -71,6 +67,7 @@ import org.apache.hadoop.hive.ql.securit
 import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactoryImpl;
 import org.apache.hadoop.hive.ql.util.DosToUnix;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.base.Preconditions;
@@ -212,6 +209,36 @@ public class SessionState {
    */
   private Path localSessionPath;
 
+  private String hdfsScratchDirURIString;
+
+  /**
+   * Transaction manager to use for this session.  This is instantiated lazily by
+   * {@link #initTxnMgr(org.apache.hadoop.hive.conf.HiveConf)}
+   */
+  private HiveTxnManager txnMgr = null;
+
+  /**
+   * When {@link #setCurrentTxn(long)} is set to this or {@link #getCurrentTxn()}} returns this it
+   * indicates that there is not a current transaction in this session.
+  */
+  public static final long NO_CURRENT_TXN = -1L;
+
+  /**
+   * Transaction currently open
+   */
+  private long currentTxn = NO_CURRENT_TXN;
+
+  /**
+   * Whether we are in auto-commit state or not.  Currently we are always in auto-commit,
+   * so there are not setters for this yet.
+   */
+  private boolean txnAutoCommit = true;
+
+  /**
+   * store the jars loaded last time
+   */
+  private final Set<String> preReloadableAuxJars = new HashSet<String>();
+
   /**
    * Get the lineage state stored in this session.
    *
@@ -314,6 +341,37 @@ public class SessionState {
   }
 
   /**
+   * Initialize the transaction manager.  This is done lazily to avoid hard wiring one
+   * transaction manager at the beginning of the session.  In general users shouldn't change
+   * this, but it's useful for testing.
+   * @param conf Hive configuration to initialize transaction manager
+   * @return transaction manager
+   * @throws LockException
+   */
+  public HiveTxnManager initTxnMgr(HiveConf conf) throws LockException {
+    if (txnMgr == null) {
+      txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    }
+    return txnMgr;
+  }
+
+  public HiveTxnManager getTxnMgr() {
+    return txnMgr;
+  }
+
+  public long getCurrentTxn() {
+    return currentTxn;
+  }
+
+  public void setCurrentTxn(long currTxn) {
+    currentTxn = currTxn;
+  }
+
+  public boolean isAutoCommit() {
+    return txnAutoCommit;
+  }
+
+  /**
    * Singleton Session object per thread.
    *
    **/
@@ -348,36 +406,39 @@ public class SessionState {
 
     setCurrentSessionState(startSs);
 
-    if(startSs.hiveHist == null){
+    if (startSs.hiveHist == null){
       if (startSs.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SESSION_HISTORY_ENABLED)) {
         startSs.hiveHist = new HiveHistoryImpl(startSs);
-      }else {
-        //Hive history is disabled, create a no-op proxy
+      } else {
+        // Hive history is disabled, create a no-op proxy
         startSs.hiveHist = HiveHistoryProxyHandler.getNoOpHiveHistoryProxy();
       }
     }
 
-    if (startSs.getTmpOutputFile() == null) {
-      // set temp file containing results to be sent to HiveClient
-      try {
-        startSs.setTmpOutputFile(createTempFile(startSs.getConf()));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
     // Get the following out of the way when you start the session these take a
     // while and should be done when we start up.
     try {
-      //Hive object instance should be created with a copy of the conf object. If the conf is
+      // Hive object instance should be created with a copy of the conf object. If the conf is
       // shared with SessionState, other parts of the code might update the config, but
       // Hive.get(HiveConf) would not recognize the case when it needs refreshing
       Hive.get(new HiveConf(startSs.conf)).getMSC();
-      ShimLoader.getHadoopShims().getUGIForConf(startSs.conf);
+      UserGroupInformation sessionUGI = ShimLoader.getHadoopShims().getUGIForConf(startSs.conf);
       FileSystem.get(startSs.conf);
-      startSs.createSessionPaths(startSs.conf);
+
+      // Create scratch dirs for this session
+      startSs.createSessionDirs(sessionUGI.getShortUserName());
+
+      // Set temp file containing results to be sent to HiveClient
+      if (startSs.getTmpOutputFile() == null) {
+        try {
+          startSs.setTmpOutputFile(createTempFile(startSs.getConf()));
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
     } catch (Exception e) {
-      // catch-all due to some exec time dependencies on session state
+      // Catch-all due to some exec time dependencies on session state
       // that would cause ClassNoFoundException otherwise
       throw new RuntimeException(e);
     }
@@ -400,6 +461,88 @@ public class SessionState {
     return startSs;
   }
 
+  /**
+   * Create dirs & session paths for this session:
+   * 1. HDFS scratch dir
+   * 2. Local scratch dir
+   * 3. Local downloaded resource dir
+   * 4. HDFS session path
+   * 5. Local session path
+   * 6. HDFS temp table space
+   * @param userName
+   * @throws IOException
+   */
+  private void createSessionDirs(String userName) throws IOException {
+    HiveConf conf = getConf();
+    // First create the root scratch dir on hdfs (if it doesn't already exist) and make it writable
+    Path rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
+    String rootHDFSDirPermission = "777";
+    createPath(conf, rootHDFSDirPath, rootHDFSDirPermission, false, false);
+    // Now create session specific dirs
+    String scratchDirPermission = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION);
+    Path path;
+    // 1. HDFS scratch dir
+    path = new Path(rootHDFSDirPath, userName);
+    hdfsScratchDirURIString = path.toUri().toString();
+    createPath(conf, path, scratchDirPermission, false, false);
+    // 2. Local scratch dir
+    path = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR));
+    createPath(conf, path, scratchDirPermission, true, false);
+    // 3. Download resources dir
+    path = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));
+    createPath(conf, path, scratchDirPermission, true, false);
+    // Finally, create session paths for this session
+    // Local & non-local tmp location is configurable. however it is the same across
+    // all external file systems
+    String sessionId = getSessionId();
+    // 4. HDFS session path
+    hdfsSessionPath = new Path(hdfsScratchDirURIString, sessionId);
+    createPath(conf, hdfsSessionPath, scratchDirPermission, false, true);
+    conf.set(HDFS_SESSION_PATH_KEY, hdfsSessionPath.toUri().toString());
+    // 5. Local session path
+    localSessionPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR), sessionId);
+    createPath(conf, localSessionPath, scratchDirPermission, true, true);
+    conf.set(LOCAL_SESSION_PATH_KEY, localSessionPath.toUri().toString());
+    // 6. HDFS temp table space
+    hdfsTmpTableSpace = new Path(hdfsSessionPath, TMP_PREFIX);
+    createPath(conf, hdfsTmpTableSpace, scratchDirPermission, false, true);
+    conf.set(TMP_TABLE_SPACE_KEY, hdfsTmpTableSpace.toUri().toString());
+  }
+
+  /**
+   * Create a given path if it doesn't exist.
+   *
+   * @param conf
+   * @param pathString
+   * @param permission
+   * @param isLocal
+   * @param isCleanUp
+   * @return
+   * @throws IOException
+   */
+  private void createPath(HiveConf conf, Path path, String permission, boolean isLocal,
+      boolean isCleanUp) throws IOException {
+    FsPermission fsPermission = new FsPermission(permission);
+    FileSystem fs;
+    if (isLocal) {
+      fs = FileSystem.getLocal(conf);
+    } else {
+      fs = path.getFileSystem(conf);
+    }
+    if (!fs.exists(path)) {
+      fs.mkdirs(path, fsPermission);
+      String dirType = isLocal ? "local" : "HDFS";
+      LOG.info("Created " + dirType + " directory: " + path.toString());
+    }
+    if (isCleanUp) {
+      fs.deleteOnExit(path);
+    }
+  }
+
+  public String getHdfsScratchDirURIString() {
+    return hdfsScratchDirURIString;
+  }
+
   public static Path getLocalSessionPath(Configuration conf) {
     SessionState ss = SessionState.get();
     if (ss == null) {
@@ -452,43 +595,6 @@ public class SessionState {
     }
   }
 
-  private void createSessionPaths(Configuration conf) throws IOException {
-
-    String scratchDirPermission = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION);
-    String sessionId = getSessionId();
-
-    // local & non-local tmp location is configurable. however it is the same across
-    // all external file systems
-    hdfsSessionPath =
-      new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
-               sessionId);
-    createPath(conf, hdfsSessionPath, scratchDirPermission);
-    conf.set(HDFS_SESSION_PATH_KEY, hdfsSessionPath.toUri().toString());
-
-    localSessionPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR),
-                                sessionId);
-    createPath(conf, localSessionPath, scratchDirPermission);
-    conf.set(LOCAL_SESSION_PATH_KEY, localSessionPath.toUri().toString());
-    hdfsTmpTableSpace = new Path(hdfsSessionPath, TMP_PREFIX);
-    createPath(conf, hdfsTmpTableSpace, scratchDirPermission);
-    conf.set(TMP_TABLE_SPACE_KEY, hdfsTmpTableSpace.toUri().toString());
-  }
-
-  private void createPath(Configuration conf, Path p, String perm) throws IOException {
-    FileSystem fs = p.getFileSystem(conf);
-    p = new Path(fs.makeQualified(p).toString());
-    FsPermission fsPermission = new FsPermission(Short.parseShort(perm.trim(), 8));
-
-    if (!Utilities.createDirsWithPermission(conf, p, fsPermission)) {
-      throw new IOException("Cannot create directory: "
-                            + p.toString());
-    }
-
-    // best effort to clean up if we don't shut down properly
-    fs.deleteOnExit(p);
-  }
-
-
   /**
    * Setup authentication and authorization plugins for this session.
    */
@@ -727,7 +833,6 @@ public class SessionState {
     SessionState ss = SessionState.get();
     Configuration conf = (ss == null) ? new Configuration() : ss.getConf();
 
-    LogHelper console = getConsole();
     for (String newFile : newFiles) {
       try {
         if (Utilities.realFile(newFile, conf) == null) {
@@ -741,6 +846,52 @@ public class SessionState {
     }
   }
 
+  // reloading the jars under the path specified in hive.reloadable.aux.jars.path property
+  public void reloadAuxJars() throws IOException {
+    final Set<String> reloadedAuxJars = new HashSet<String>();
+
+    final String renewableJarPath = conf.getVar(ConfVars.HIVERELOADABLEJARS);
+    // do nothing if this property is not specified or empty
+    if (renewableJarPath == null || renewableJarPath.isEmpty()) {
+      return;
+    }
+
+    Set<String> jarPaths = Utilities.getJarFilesByPath(renewableJarPath);
+
+    // load jars under the hive.reloadable.aux.jars.path
+    if(!jarPaths.isEmpty()){
+      reloadedAuxJars.addAll(jarPaths);
+    }
+
+    // remove the previous renewable jars
+    try {
+      if (preReloadableAuxJars != null && !preReloadableAuxJars.isEmpty()) {
+        Utilities.removeFromClassPath(preReloadableAuxJars.toArray(new String[0]));
+      }
+    } catch (Exception e) {
+      String msg = "Fail to remove the reloaded jars loaded last time: " + e;
+      throw new IOException(msg, e);
+    }
+
+    try {
+      if (reloadedAuxJars != null && !reloadedAuxJars.isEmpty()) {
+        URLClassLoader currentCLoader =
+            (URLClassLoader) SessionState.get().getConf().getClassLoader();
+        currentCLoader =
+            (URLClassLoader) Utilities.addToClassPath(currentCLoader,
+                reloadedAuxJars.toArray(new String[0]));
+        conf.setClassLoader(currentCLoader);
+        Thread.currentThread().setContextClassLoader(currentCLoader);
+      }
+      preReloadableAuxJars.clear();
+      preReloadableAuxJars.addAll(reloadedAuxJars);
+    } catch (Exception e) {
+      String msg =
+          "Fail to add jars from the path specified in hive.reloadable.aux.jars.path property: " + e;
+      throw new IOException(msg, e);
+    }
+  }
+
   static void registerJars(List<String> newJars) throws IllegalArgumentException {
     LogHelper console = getConsole();
     try {
@@ -1054,6 +1205,7 @@ public class SessionState {
   }
 
   public void close() throws IOException {
+    if (txnMgr != null) txnMgr.closeTxnManager();
     JavaUtils.closeClassLoadersTo(conf.getClassLoader(), parentLoader);
     File resourceDir =
         new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java Mon Sep  8 04:38:17 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst.StatDB;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESTATSDBCLASS;
@@ -87,7 +88,7 @@ public final class StatsFactory {
   }
 
   private boolean initialize(String type) {
-    ClassLoader classLoader = JavaUtils.getClassLoader();
+    ClassLoader classLoader = Utilities.getSessionSpecifiedClassLoader();
     try {
       StatDB statDB = type.startsWith("jdbc") ? StatDB.jdbc : StatDB.valueOf(type);
       publisherImplementation = (Class<? extends Serializable>)

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java Mon Sep  8 04:38:17 2014
@@ -25,10 +25,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AggrStats;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Decimal;
 import org.apache.hadoop.hive.ql.exec.ColumnInfo;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -39,8 +41,10 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.plan.ColStatistics;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnListDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
@@ -76,6 +80,8 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -404,18 +410,22 @@ public class StatsUtils {
       cs.setCountDistint(csd.getLongStats().getNumDVs());
       cs.setNumNulls(csd.getLongStats().getNumNulls());
       cs.setAvgColLen(JavaDataModel.get().primitive1());
+      cs.setRange(csd.getLongStats().getLowValue(), csd.getLongStats().getHighValue());
     } else if (colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)) {
       cs.setCountDistint(csd.getLongStats().getNumDVs());
       cs.setNumNulls(csd.getLongStats().getNumNulls());
       cs.setAvgColLen(JavaDataModel.get().primitive2());
+      cs.setRange(csd.getLongStats().getLowValue(), csd.getLongStats().getHighValue());
     } else if (colType.equalsIgnoreCase(serdeConstants.FLOAT_TYPE_NAME)) {
       cs.setCountDistint(csd.getDoubleStats().getNumDVs());
       cs.setNumNulls(csd.getDoubleStats().getNumNulls());
       cs.setAvgColLen(JavaDataModel.get().primitive1());
+      cs.setRange(csd.getDoubleStats().getLowValue(), csd.getDoubleStats().getHighValue());
     } else if (colType.equalsIgnoreCase(serdeConstants.DOUBLE_TYPE_NAME)) {
       cs.setCountDistint(csd.getDoubleStats().getNumDVs());
       cs.setNumNulls(csd.getDoubleStats().getNumNulls());
       cs.setAvgColLen(JavaDataModel.get().primitive2());
+      cs.setRange(csd.getDoubleStats().getLowValue(), csd.getDoubleStats().getHighValue());
     } else if (colType.equalsIgnoreCase(serdeConstants.STRING_TYPE_NAME)
         || colType.startsWith(serdeConstants.CHAR_TYPE_NAME)
         || colType.startsWith(serdeConstants.VARCHAR_TYPE_NAME)) {
@@ -441,6 +451,13 @@ public class StatsUtils {
       cs.setAvgColLen(JavaDataModel.get().lengthOfDecimal());
       cs.setCountDistint(csd.getDecimalStats().getNumDVs());
       cs.setNumNulls(csd.getDecimalStats().getNumNulls());
+      Decimal val = csd.getDecimalStats().getHighValue();
+      BigDecimal maxVal = HiveDecimal.
+          create(new BigInteger(val.getUnscaled()), val.getScale()).bigDecimalValue();
+      val = csd.getDecimalStats().getLowValue();
+      BigDecimal minVal = HiveDecimal.
+          create(new BigInteger(val.getUnscaled()), val.getScale()).bigDecimalValue();
+      cs.setRange(minVal, maxVal);
     } else if (colType.equalsIgnoreCase(serdeConstants.DATE_TYPE_NAME)) {
       cs.setAvgColLen(JavaDataModel.get().lengthOfDate());
     } else {
@@ -960,6 +977,22 @@ public class StatsUtils {
       colName = ennd.getName();
       colType = "null";
       numNulls = numRows;
+    } else if (end instanceof ExprNodeColumnListDesc) {
+
+      // column list
+      ExprNodeColumnListDesc encd = (ExprNodeColumnListDesc) end;
+      colName = Joiner.on(",").join(encd.getCols());
+      colType = "array";
+      countDistincts = numRows;
+      oi = encd.getWritableObjectInspector();
+    } else if (end instanceof ExprNodeFieldDesc) {
+
+      // field within complex type
+      ExprNodeFieldDesc enfd = (ExprNodeFieldDesc) end;
+      colName = enfd.getFieldName();
+      colType = enfd.getTypeString();
+      countDistincts = numRows;
+      oi = enfd.getWritableObjectInspector();
     }
 
     if (colType.equalsIgnoreCase(serdeConstants.STRING_TYPE_NAME)

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java Mon Sep  8 04:38:17 2014
@@ -27,6 +27,7 @@ import java.sql.SQLRecoverableException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -46,7 +47,8 @@ public class JDBCStatsAggregator impleme
   private final Log LOG = LogFactory.getLog(this.getClass().getName());
   private int timeout = 30;
   private final String comment = "Hive stats aggregation: " + this.getClass().getName();
-  private int maxRetries, waitWindow;
+  private int maxRetries;
+  private long waitWindow;
   private final Random r;
 
   public JDBCStatsAggregator() {
@@ -57,11 +59,14 @@ public class JDBCStatsAggregator impleme
   @Override
   public boolean connect(Configuration hiveconf, Task sourceTask) {
     this.hiveconf = hiveconf;
-    timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
+    timeout = (int) HiveConf.getTimeVar(
+        hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT, TimeUnit.SECONDS);
     connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
     String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
     maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX);
-    waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT);
+    waitWindow = HiveConf.getTimeVar(
+        hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS);
+    this.sourceTask = sourceTask;
 
     try {
       Class.forName(driver).newInstance();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java Mon Sep  8 04:38:17 2014
@@ -30,6 +30,7 @@ import java.sql.Statement;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,7 +49,8 @@ public class JDBCStatsPublisher implemen
   private int timeout; // default timeout in sec. for JDBC connection and statements
   // SQL comment that identifies where the SQL statement comes from
   private final String comment = "Hive stats publishing: " + this.getClass().getName();
-  private int maxRetries, waitWindow;
+  private int maxRetries;
+  private long waitWindow;
   private final Random r;
 
   public JDBCStatsPublisher() {
@@ -59,9 +61,11 @@ public class JDBCStatsPublisher implemen
   public boolean connect(Configuration hiveconf) {
     this.hiveconf = hiveconf;
     maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX);
-    waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT);
+    waitWindow = HiveConf.getTimeVar(
+        hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS);
     connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
-    timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
+    timeout = (int) HiveConf.getTimeVar(
+        hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT, TimeUnit.SECONDS);
     String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
 
     try {