You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/09/08 06:38:26 UTC
svn commit: r1623263 [22/28] - in /hive/branches/spark: ./
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/
ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/
beeline/src/test/org/apache/hive/beeline/ bin/...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/OptimizeTezProcContext.java Mon Sep 8 04:38:17 2014
@@ -23,13 +23,18 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
/**
* OptimizeTezProcContext. OptimizeTezProcContext maintains information
* about the current operator plan as we walk the operator tree
@@ -47,19 +52,23 @@ public class OptimizeTezProcContext impl
public final Set<ReduceSinkOperator> visitedReduceSinks
= new HashSet<ReduceSinkOperator>();
+ public final Multimap<AppMasterEventOperator, TableScanOperator> eventOpToTableScanMap =
+ HashMultimap.create();
+
// rootOperators are all the table scan operators in sequence
// of traversal
- public final Deque<Operator<? extends OperatorDesc>> rootOperators;
+ public Deque<Operator<? extends OperatorDesc>> rootOperators;
- @SuppressWarnings("unchecked")
- public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext,
- Set<ReadEntity> inputs, Set<WriteEntity> outputs,
- Deque<Operator<?>> rootOperators) {
+ public OptimizeTezProcContext(HiveConf conf, ParseContext parseContext, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs) {
this.conf = conf;
this.parseContext = parseContext;
this.inputs = inputs;
this.outputs = outputs;
- this.rootOperators = rootOperators;
+ }
+
+ public void setRootOperators(Deque<Operator<? extends OperatorDesc>> roots) {
+ this.rootOperators = roots;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java Mon Sep 8 04:38:17 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.Jav
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
@@ -221,7 +222,7 @@ public final class ParseUtils {
return null;
}
try {
- Class.forName(className, true, JavaUtils.getClassLoader());
+ Class.forName(className, true, Utilities.getSessionSpecifiedClassLoader());
} catch (ClassNotFoundException e) {
throw new SemanticException("Cannot find class '" + className + "'", e);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QB.java Mon Sep 8 04:38:17 2014
@@ -66,26 +66,26 @@ public class QB {
private HashMap<String, WindowingSpec> destToWindowingSpec;
/*
- * If this QB represents a SubQuery predicate then this will point to the SubQuery object.
+ * If this QB represents a SubQuery predicate then this will point to the SubQuery object.
*/
private QBSubQuery subQueryPredicateDef;
-
- /*
- * used to give a unique name to each SubQuery QB Currently there can be at
- * most 2 SubQueries in a Query: 1 in the Where clause, and 1 in the Having
- * clause.
- */
- private int numSubQueryPredicates;
-
- /*
- * for now a top level QB can have 1 where clause SQ predicate.
- */
- private QBSubQuery whereClauseSubQueryPredicate;
-
+
+ /*
+ * used to give a unique name to each SubQuery QB Currently there can be at
+ * most 2 SubQueries in a Query: 1 in the Where clause, and 1 in the Having
+ * clause.
+ */
+ private int numSubQueryPredicates;
+
/*
* for now a top level QB can have 1 where clause SQ predicate.
*/
- private QBSubQuery havingClauseSubQueryPredicate;
+ private QBSubQuery whereClauseSubQueryPredicate;
+
+ /*
+ * for now a top level QB can have 1 where clause SQ predicate.
+ */
+ private QBSubQuery havingClauseSubQueryPredicate;
// results
@@ -341,28 +341,28 @@ public class QB {
protected QBSubQuery getSubQueryPredicateDef() {
return subQueryPredicateDef;
}
-
- protected int getNumSubQueryPredicates() {
- return numSubQueryPredicates;
- }
-
- protected int incrNumSubQueryPredicates() {
- return ++numSubQueryPredicates;
- }
-
- void setWhereClauseSubQueryPredicate(QBSubQuery sq) {
- whereClauseSubQueryPredicate = sq;
- }
-
- public QBSubQuery getWhereClauseSubQueryPredicate() {
- return whereClauseSubQueryPredicate;
- }
-
- void setHavingClauseSubQueryPredicate(QBSubQuery sq) {
+
+ protected int getNumSubQueryPredicates() {
+ return numSubQueryPredicates;
+ }
+
+ protected int incrNumSubQueryPredicates() {
+ return ++numSubQueryPredicates;
+ }
+
+ void setWhereClauseSubQueryPredicate(QBSubQuery sq) {
+ whereClauseSubQueryPredicate = sq;
+ }
+
+ public QBSubQuery getWhereClauseSubQueryPredicate() {
+ return whereClauseSubQueryPredicate;
+ }
+
+ void setHavingClauseSubQueryPredicate(QBSubQuery sq) {
havingClauseSubQueryPredicate = sq;
}
-
- public QBSubQuery getHavingClauseSubQueryPredicate() {
+
+ public QBSubQuery getHavingClauseSubQueryPredicate() {
return havingClauseSubQueryPredicate;
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/QBSubQuery.java Mon Sep 8 04:38:17 2014
@@ -38,7 +38,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.ql.parse.SubQueryDiagnostic.QBSubQueryRewrite;
public class QBSubQuery implements ISubQueryJoinInfo {
-
+
public static enum SubQueryType {
EXISTS,
NOT_EXISTS,
@@ -149,16 +149,16 @@ public class QBSubQuery implements ISubQ
}
/*
- * This class captures the information about a
+ * This class captures the information about a
* conjunct in the where clause of the SubQuery.
* For a equality predicate it capture for each side:
* - the AST
* - the type of Expression (basically what columns are referenced)
- * - for Expressions that refer the parent it captures the
+ * - for Expressions that refer the parent it captures the
* parent's ColumnInfo. In case of outer Aggregation expressions
* we need this to introduce a new mapping in the OuterQuery
* RowResolver. A join condition must use qualified column references,
- * so we generate a new name for the aggr expression and use it in the
+ * so we generate a new name for the aggr expression and use it in the
* joining condition.
* For e.g.
* having exists ( select x from R2 where y = min(R1.z) )
@@ -174,8 +174,8 @@ public class QBSubQuery implements ISubQ
private final ColumnInfo leftOuterColInfo;
private final ColumnInfo rightOuterColInfo;
- Conjunct(ASTNode leftExpr,
- ASTNode rightExpr,
+ Conjunct(ASTNode leftExpr,
+ ASTNode rightExpr,
ExprType leftExprType,
ExprType rightExprType,
ColumnInfo leftOuterColInfo,
@@ -239,8 +239,8 @@ public class QBSubQuery implements ISubQ
Stack<Node> stack;
ConjunctAnalyzer(RowResolver parentQueryRR,
- boolean forHavingClause,
- String parentQueryNewAlias) {
+ boolean forHavingClause,
+ String parentQueryNewAlias) {
this.parentQueryRR = parentQueryRR;
defaultExprProcessor = new DefaultExprProcessor();
this.forHavingClause = forHavingClause;
@@ -260,13 +260,13 @@ public class QBSubQuery implements ISubQ
private ObjectPair<ExprType,ColumnInfo> analyzeExpr(ASTNode expr) {
ColumnInfo cInfo = null;
if ( forHavingClause ) {
- try {
- cInfo = parentQueryRR.getExpression(expr);
- if ( cInfo != null) {
- return ObjectPair.create(ExprType.REFERS_PARENT, cInfo);
- }
- } catch(SemanticException se) {
- }
+ try {
+ cInfo = parentQueryRR.getExpression(expr);
+ if ( cInfo != null) {
+ return ObjectPair.create(ExprType.REFERS_PARENT, cInfo);
+ }
+ } catch(SemanticException se) {
+ }
}
if ( expr.getType() == HiveParser.DOT) {
ASTNode dot = firstDot(expr);
@@ -308,12 +308,12 @@ public class QBSubQuery implements ISubQ
ObjectPair<ExprType,ColumnInfo> leftInfo = analyzeExpr(left);
ObjectPair<ExprType,ColumnInfo> rightInfo = analyzeExpr(right);
- return new Conjunct(left, right,
+ return new Conjunct(left, right,
leftInfo.getFirst(), rightInfo.getFirst(),
leftInfo.getSecond(), rightInfo.getSecond());
} else {
ObjectPair<ExprType,ColumnInfo> sqExprInfo = analyzeExpr(conjunct);
- return new Conjunct(conjunct, null,
+ return new Conjunct(conjunct, null,
sqExprInfo.getFirst(), null,
sqExprInfo.getSecond(), sqExprInfo.getSecond());
}
@@ -354,86 +354,86 @@ public class QBSubQuery implements ISubQ
}
/*
- * When transforming a Not In SubQuery we need to check for nulls in the
+ * When transforming a Not In SubQuery we need to check for nulls in the
* Joining expressions of the SubQuery. If there are nulls then the SubQuery always
- * return false. For more details see
+ * return false. For more details see
* https://issues.apache.org/jira/secure/attachment/12614003/SubQuerySpec.pdf
- *
+ *
* Basically, SQL semantics say that:
* - R1.A not in (null, 1, 2, ...)
- * is always false.
- * A 'not in' operator is equivalent to a '<> all'. Since a not equal check with null
+ * is always false.
+ * A 'not in' operator is equivalent to a '<> all'. Since a not equal check with null
* returns false, a not in predicate against aset with a 'null' value always returns false.
- *
+ *
* So for not in SubQuery predicates:
* - we join in a null count predicate.
* - And the joining condition is that the 'Null Count' query has a count of 0.
- *
+ *
*/
class NotInCheck implements ISubQueryJoinInfo {
-
+
private static final String CNT_ALIAS = "c1";
-
+
/*
* expressions in SubQ that are joined to the Outer Query.
*/
List<ASTNode> subQryCorrExprs;
-
+
/*
* row resolver of the SubQuery.
* Set by the SemanticAnalyzer after the Plan for the SubQuery is genned.
* This is neede in case the SubQuery select list contains a TOK_ALLCOLREF
*/
RowResolver sqRR;
-
+
NotInCheck() {
subQryCorrExprs = new ArrayList<ASTNode>();
}
-
+
void addCorrExpr(ASTNode corrExpr) {
subQryCorrExprs.add(corrExpr);
}
-
+
public ASTNode getSubQueryAST() {
ASTNode ast = SubQueryUtils.buildNotInNullCheckQuery(
- QBSubQuery.this.getSubQueryAST(),
- QBSubQuery.this.getAlias(),
- CNT_ALIAS,
+ QBSubQuery.this.getSubQueryAST(),
+ QBSubQuery.this.getAlias(),
+ CNT_ALIAS,
subQryCorrExprs,
sqRR);
SubQueryUtils.setOriginDeep(ast, QBSubQuery.this.originalSQASTOrigin);
return ast;
}
-
+
public String getAlias() {
return QBSubQuery.this.getAlias() + "_notin_nullcheck";
}
-
+
public JoinType getJoinType() {
return JoinType.LEFTSEMI;
}
-
+
public ASTNode getJoinConditionAST() {
- ASTNode ast =
+ ASTNode ast =
SubQueryUtils.buildNotInNullJoinCond(getAlias(), CNT_ALIAS);
SubQueryUtils.setOriginDeep(ast, QBSubQuery.this.originalSQASTOrigin);
return ast;
}
-
+
public QBSubQuery getSubQuery() {
return QBSubQuery.this;
}
-
+
public String getOuterQueryId() {
return QBSubQuery.this.getOuterQueryId();
}
-
+
void setSQRR(RowResolver sqRR) {
this.sqRR = sqRR;
}
-
+
}
-
+
private final String outerQueryId;
private final int sqIdx;
private final String alias;
@@ -455,11 +455,11 @@ public class QBSubQuery implements ISubQ
private int numOfCorrelationExprsAddedToSQSelect;
private boolean groupbyAddedToSQ;
-
+
private int numOuterCorrExprsForHaving;
-
+
private NotInCheck notInCheck;
-
+
private QBSubQueryRewrite subQueryDiagnostic;
public QBSubQuery(String outerQueryId,
@@ -483,11 +483,11 @@ public class QBSubQuery implements ISubQ
originalSQASTOrigin = new ASTNodeOrigin("SubQuery", alias, s, alias, originalSQAST);
numOfCorrelationExprsAddedToSQSelect = 0;
groupbyAddedToSQ = false;
-
+
if ( operator.getType() == SubQueryType.NOT_IN ) {
notInCheck = new NotInCheck();
}
-
+
subQueryDiagnostic = SubQueryDiagnostic.getRewrite(this, ctx.getTokenRewriteStream(), ctx);
}
@@ -500,18 +500,18 @@ public class QBSubQuery implements ISubQ
public SubQueryTypeDef getOperator() {
return operator;
}
-
+
public ASTNode getOriginalSubQueryASTForRewrite() {
return (operator.getType() == SubQueryType.NOT_EXISTS
- || operator.getType() == SubQueryType.NOT_IN ?
- (ASTNode) originalSQASTOrigin.getUsageNode().getParent() :
+ || operator.getType() == SubQueryType.NOT_IN ?
+ (ASTNode) originalSQASTOrigin.getUsageNode().getParent() :
originalSQASTOrigin.getUsageNode());
}
void validateAndRewriteAST(RowResolver outerQueryRR,
- boolean forHavingClause,
- String outerQueryAlias,
- Set<String> outerQryAliases) throws SemanticException {
+ boolean forHavingClause,
+ String outerQueryAlias,
+ Set<String> outerQryAliases) throws SemanticException {
ASTNode selectClause = (ASTNode) subQueryAST.getChild(1).getChild(1);
@@ -519,12 +519,12 @@ public class QBSubQuery implements ISubQ
if ( selectClause.getChild(0).getType() == HiveParser.TOK_HINTLIST ) {
selectExprStart = 1;
}
-
+
/*
* Restriction.16.s :: Correlated Expression in Outer Query must not contain
* unqualified column references.
*/
- if ( parentQueryExpression != null && !forHavingClause ) {
+ if ( parentQueryExpression != null && !forHavingClause ) {
ASTNode u = SubQueryUtils.hasUnQualifiedColumnReferences(parentQueryExpression);
if ( u != null ) {
subQueryAST.setOrigin(originalSQASTOrigin);
@@ -532,7 +532,7 @@ public class QBSubQuery implements ISubQ
u, "Correlating expression cannot contain unqualified column references."));
}
}
-
+
/*
* Restriction 17.s :: SubQuery cannot use the same table alias as one used in
* the Outer Query.
@@ -546,14 +546,14 @@ public class QBSubQuery implements ISubQ
}
if ( sharedAlias != null) {
ASTNode whereClause = SubQueryUtils.subQueryWhere(subQueryAST);
-
+
if ( whereClause != null ) {
ASTNode u = SubQueryUtils.hasUnQualifiedColumnReferences(whereClause);
if ( u != null ) {
subQueryAST.setOrigin(originalSQASTOrigin);
throw new SemanticException(ErrorMsg.UNSUPPORTED_SUBQUERY_EXPRESSION.getMsg(
u, "SubQuery cannot use the table alias: " + sharedAlias + "; " +
- "this is also an alias in the Outer Query and SubQuery contains a unqualified column reference"));
+ "this is also an alias in the Outer Query and SubQuery contains a unqualified column reference"));
}
}
}
@@ -641,25 +641,25 @@ public class QBSubQuery implements ISubQ
}
void buildJoinCondition(RowResolver outerQueryRR, RowResolver sqRR,
- boolean forHavingClause,
- String outerQueryAlias) throws SemanticException {
+ boolean forHavingClause,
+ String outerQueryAlias) throws SemanticException {
ASTNode parentQueryJoinCond = null;
if ( parentQueryExpression != null ) {
-
+
ColumnInfo outerQueryCol = null;
try {
outerQueryCol = outerQueryRR.getExpression(parentQueryExpression);
} catch(SemanticException se) {
}
-
+
parentQueryJoinCond = SubQueryUtils.buildOuterQryToSQJoinCond(
getOuterQueryExpression(),
alias,
sqRR);
-
+
if ( outerQueryCol != null ) {
- rewriteCorrConjunctForHaving(parentQueryJoinCond, true,
+ rewriteCorrConjunctForHaving(parentQueryJoinCond, true,
outerQueryAlias, outerQueryRR, outerQueryCol);
}
subQueryDiagnostic.addJoinCondition(parentQueryJoinCond, outerQueryCol != null, true);
@@ -682,10 +682,10 @@ public class QBSubQuery implements ISubQ
ASTNode updateOuterQueryFilter(ASTNode outerQryFilter) {
if (postJoinConditionAST == null ) {
return outerQryFilter;
- }
-
+ }
+
subQueryDiagnostic.addPostJoinCondition(postJoinConditionAST);
-
+
if ( outerQryFilter == null ) {
return postJoinConditionAST;
}
@@ -738,7 +738,7 @@ public class QBSubQuery implements ISubQ
* Additional things for Having clause:
* - A correlation predicate may refer to an aggregation expression.
* - This introduces 2 twists to the rewrite:
- * a. When analyzing equality predicates we need to analyze each side
+ * a. When analyzing equality predicates we need to analyze each side
* to see if it is an aggregation expression from the Outer Query.
* So for e.g. this is a valid correlation predicate:
* R2.x = min(R1.y)
@@ -748,12 +748,12 @@ public class QBSubQuery implements ISubQ
* to contain a qualified column references.
* We handle this by generating a new name for the aggregation expression,
* like R1._gby_sq_col_1 and adding this mapping to the Outer Query's
- * Row Resolver. Then we construct a joining predicate using this new
+ * Row Resolver. Then we construct a joining predicate using this new
* name; so in our e.g. the condition would be: R2.x = R1._gby_sq_col_1
*/
private void rewrite(RowResolver parentQueryRR,
- boolean forHavingClause,
- String outerQueryAlias) throws SemanticException {
+ boolean forHavingClause,
+ String outerQueryAlias) throws SemanticException {
ASTNode selectClause = (ASTNode) subQueryAST.getChild(1).getChild(1);
ASTNode whereClause = SubQueryUtils.subQueryWhere(subQueryAST);
@@ -766,7 +766,7 @@ public class QBSubQuery implements ISubQ
SubQueryUtils.extractConjuncts(searchCond, conjuncts);
ConjunctAnalyzer conjunctAnalyzer = new ConjunctAnalyzer(parentQueryRR,
- forHavingClause, outerQueryAlias);
+ forHavingClause, outerQueryAlias);
ASTNode sqNewSearchCond = null;
for(ASTNode conjunctAST : conjuncts) {
@@ -805,7 +805,7 @@ public class QBSubQuery implements ISubQ
corrCondLeftIsRewritten = true;
if ( forHavingClause && conjunct.getRightOuterColInfo() != null ) {
corrCondRightIsRewritten = true;
- rewriteCorrConjunctForHaving(conjunctAST, false, outerQueryAlias,
+ rewriteCorrConjunctForHaving(conjunctAST, false, outerQueryAlias,
parentQueryRR, conjunct.getRightOuterColInfo());
}
ASTNode joinPredciate = SubQueryUtils.alterCorrelatedPredicate(
@@ -829,7 +829,7 @@ public class QBSubQuery implements ISubQ
corrCondRightIsRewritten = true;
if ( forHavingClause && conjunct.getLeftOuterColInfo() != null ) {
corrCondLeftIsRewritten = true;
- rewriteCorrConjunctForHaving(conjunctAST, true, outerQueryAlias,
+ rewriteCorrConjunctForHaving(conjunctAST, true, outerQueryAlias,
parentQueryRR, conjunct.getLeftOuterColInfo());
}
ASTNode joinPredciate = SubQueryUtils.alterCorrelatedPredicate(
@@ -901,7 +901,7 @@ public class QBSubQuery implements ISubQ
for(ASTNode child : newChildren ) {
subQueryAST.addChild(child);
}
-
+
subQueryDiagnostic.setAddGroupByClause();
return groupBy;
@@ -927,26 +927,26 @@ public class QBSubQuery implements ISubQ
public int getNumOfCorrelationExprsAddedToSQSelect() {
return numOfCorrelationExprsAddedToSQSelect;
}
-
-
+
+
public QBSubQueryRewrite getDiagnostic() {
return subQueryDiagnostic;
}
-
+
public QBSubQuery getSubQuery() {
return this;
}
-
+
NotInCheck getNotInCheck() {
return notInCheck;
}
-
+
private void rewriteCorrConjunctForHaving(ASTNode conjunctASTNode,
boolean refersLeft,
String outerQueryAlias,
RowResolver outerQueryRR,
ColumnInfo outerQueryCol) {
-
+
String newColAlias = "_gby_sq_col_" + numOuterCorrExprsForHaving++;
ASTNode outerExprForCorr = SubQueryUtils.createColRefAST(outerQueryAlias, newColAlias);
if ( refersLeft ) {
@@ -956,5 +956,5 @@ public class QBSubQuery implements ISubQ
}
outerQueryRR.put(outerQueryAlias, newColAlias, outerQueryCol);
}
-
+
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Mon Sep 8 04:38:17 2014
@@ -98,7 +98,7 @@ public class RowResolver implements Seri
public void put(String tab_alias, String col_alias, ColumnInfo colInfo) {
if (!addMappingOnly(tab_alias, col_alias, colInfo)) {
- rowSchema.getSignature().add(colInfo);
+ rowSchema.getSignature().add(colInfo);
}
}
@@ -289,7 +289,7 @@ public class RowResolver implements Seri
public boolean getIsExprResolver() {
return isExprResolver;
}
-
+
public String[] getAlternateMappings(String internalName) {
return altInvRslvMap.get(internalName);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java Mon Sep 8 04:38:17 2014
@@ -2239,8 +2239,8 @@ public class SemanticAnalyzer extends Ba
String havingInputAlias = null;
if ( forHavingClause ) {
- havingInputAlias = "gby_sq" + sqIdx;
- aliasToOpInfo.put(havingInputAlias, input);
+ havingInputAlias = "gby_sq" + sqIdx;
+ aliasToOpInfo.put(havingInputAlias, input);
}
subQuery.validateAndRewriteAST(inputRR, forHavingClause, havingInputAlias, aliasToOpInfo.keySet());
@@ -2351,7 +2351,10 @@ public class SemanticAnalyzer extends Ba
ExprNodeDesc filterPred = null;
List<Boolean> nullSafes = joinTree.getNullSafes();
for (int i = 0; i < joinKeys.length; i++) {
- if ( nullSafes.get(i)) {
+ if (nullSafes.get(i) || (joinKeys[i] instanceof ExprNodeColumnDesc &&
+ ((ExprNodeColumnDesc)joinKeys[i]).getIsPartitionColOrVirtualCol())) {
+ // no need to generate is not null predicate for partitioning or
+ // virtual column, since those columns can never be null.
continue;
}
List<ExprNodeDesc> args = new ArrayList<ExprNodeDesc>();
@@ -2531,7 +2534,7 @@ public class SemanticAnalyzer extends Ba
try {
serdeClass = (Class<? extends Deserializer>) Class.forName(serdeName,
- true, JavaUtils.getClassLoader());
+ true, Utilities.getSessionSpecifiedClassLoader());
} catch (ClassNotFoundException e) {
throw new SemanticException(e);
}
@@ -2720,7 +2723,7 @@ public class SemanticAnalyzer extends Ba
try {
serde = (Class<? extends Deserializer>) Class.forName(defaultSerdeName,
- true, JavaUtils.getClassLoader());
+ true, Utilities.getSessionSpecifiedClassLoader());
} catch (ClassNotFoundException e) {
throw new SemanticException(e);
}
@@ -2787,7 +2790,7 @@ public class SemanticAnalyzer extends Ba
try {
return (Class<? extends RecordReader>) Class.forName(name, true,
- JavaUtils.getClassLoader());
+ Utilities.getSessionSpecifiedClassLoader());
} catch (ClassNotFoundException e) {
throw new SemanticException(e);
}
@@ -2801,7 +2804,7 @@ public class SemanticAnalyzer extends Ba
try {
return (Class<? extends RecordReader>) Class.forName(name, true,
- JavaUtils.getClassLoader());
+ Utilities.getSessionSpecifiedClassLoader());
} catch (ClassNotFoundException e) {
throw new SemanticException(e);
}
@@ -2819,7 +2822,7 @@ public class SemanticAnalyzer extends Ba
try {
return (Class<? extends RecordWriter>) Class.forName(name, true,
- JavaUtils.getClassLoader());
+ Utilities.getSessionSpecifiedClassLoader());
} catch (ClassNotFoundException e) {
throw new SemanticException(e);
}
@@ -11586,40 +11589,40 @@ public class SemanticAnalyzer extends Ba
}
private void addAlternateGByKeyMappings(ASTNode gByExpr, ColumnInfo colInfo,
- Operator<? extends OperatorDesc> reduceSinkOp, RowResolver gByRR) {
- if ( gByExpr.getType() == HiveParser.DOT
+ Operator<? extends OperatorDesc> reduceSinkOp, RowResolver gByRR) {
+ if ( gByExpr.getType() == HiveParser.DOT
&& gByExpr.getChild(0).getType() == HiveParser.TOK_TABLE_OR_COL ) {
- String tab_alias = BaseSemanticAnalyzer.unescapeIdentifier(gByExpr
- .getChild(0).getChild(0).getText());
- String col_alias = BaseSemanticAnalyzer.unescapeIdentifier(
- gByExpr.getChild(1).getText());
- gByRR.put(tab_alias, col_alias, colInfo);
- } else if ( gByExpr.getType() == HiveParser.TOK_TABLE_OR_COL ) {
- String col_alias = BaseSemanticAnalyzer.unescapeIdentifier(gByExpr
- .getChild(0).getText());
- String tab_alias = null;
- /*
- * If the input to the GBy has a tab alias for the column, then add an entry
- * based on that tab_alias.
- * For e.g. this query:
- * select b.x, count(*) from t1 b group by x
- * needs (tab_alias=b, col_alias=x) in the GBy RR.
- * tab_alias=b comes from looking at the RowResolver that is the ancestor
- * before any GBy/ReduceSinks added for the GBY operation.
- */
- Operator<? extends OperatorDesc> parent = reduceSinkOp;
- while ( parent instanceof ReduceSinkOperator ||
- parent instanceof GroupByOperator ) {
- parent = parent.getParentOperators().get(0);
- }
- RowResolver parentRR = opParseCtx.get(parent).getRowResolver();
- try {
- ColumnInfo pColInfo = parentRR.get(tab_alias, col_alias);
- tab_alias = pColInfo == null ? null : pColInfo.getTabAlias();
- } catch(SemanticException se) {
- }
- gByRR.put(tab_alias, col_alias, colInfo);
- }
+ String tab_alias = BaseSemanticAnalyzer.unescapeIdentifier(gByExpr
+ .getChild(0).getChild(0).getText());
+ String col_alias = BaseSemanticAnalyzer.unescapeIdentifier(
+ gByExpr.getChild(1).getText());
+ gByRR.put(tab_alias, col_alias, colInfo);
+ } else if ( gByExpr.getType() == HiveParser.TOK_TABLE_OR_COL ) {
+ String col_alias = BaseSemanticAnalyzer.unescapeIdentifier(gByExpr
+ .getChild(0).getText());
+ String tab_alias = null;
+ /*
+ * If the input to the GBy has a tab alias for the column, then add an entry
+ * based on that tab_alias.
+ * For e.g. this query:
+ * select b.x, count(*) from t1 b group by x
+ * needs (tab_alias=b, col_alias=x) in the GBy RR.
+ * tab_alias=b comes from looking at the RowResolver that is the ancestor
+ * before any GBy/ReduceSinks added for the GBY operation.
+ */
+ Operator<? extends OperatorDesc> parent = reduceSinkOp;
+ while ( parent instanceof ReduceSinkOperator ||
+ parent instanceof GroupByOperator ) {
+ parent = parent.getParentOperators().get(0);
+ }
+ RowResolver parentRR = opParseCtx.get(parent).getRowResolver();
+ try {
+ ColumnInfo pColInfo = parentRR.get(tab_alias, col_alias);
+ tab_alias = pColInfo == null ? null : pColInfo.getTabAlias();
+ } catch(SemanticException se) {
+ }
+ gByRR.put(tab_alias, col_alias, colInfo);
+ }
}
private WriteEntity.WriteType determineWriteType(LoadTableDesc ltd, boolean isNonNativeTable) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/TezCompiler.java Mon Sep 8 04:38:17 2014
@@ -21,20 +21,24 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.ConditionalTask;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
import org.apache.hadoop.hive.ql.exec.JoinOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -52,20 +56,25 @@ import org.apache.hadoop.hive.ql.lib.For
import org.apache.hadoop.hive.ql.lib.GraphWalker;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.optimizer.ConstantPropagate;
import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
+import org.apache.hadoop.hive.ql.optimizer.DynamicPartitionPruningOptimization;
import org.apache.hadoop.hive.ql.optimizer.ReduceSinkMapJoinProc;
+import org.apache.hadoop.hive.ql.optimizer.RemoveDynamicPruningBySize;
import org.apache.hadoop.hive.ql.optimizer.SetReducerParallelism;
+import org.apache.hadoop.hive.ql.optimizer.metainfo.annotation.AnnotateWithOpTraits;
import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
import org.apache.hadoop.hive.ql.optimizer.physical.MetadataOnlyOptimizer;
import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
-import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
+import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
+import org.apache.hadoop.hive.ql.optimizer.stats.annotation.AnnotateWithStatistics;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MoveWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
@@ -85,7 +94,7 @@ public class TezCompiler extends TaskCom
@Override
public void init(HiveConf conf, LogHelper console, Hive db) {
super.init(conf, console, db);
-
+
// Tez requires us to use RPC for the query plan
HiveConf.setBoolVar(conf, ConfVars.HIVE_RPC_QUERY_PLAN, true);
@@ -98,31 +107,203 @@ public class TezCompiler extends TaskCom
protected void optimizeOperatorPlan(ParseContext pCtx, Set<ReadEntity> inputs,
Set<WriteEntity> outputs) throws SemanticException {
- // Sequence of TableScan operators to be walked
+ // Create the context for the walker
+ OptimizeTezProcContext procCtx = new OptimizeTezProcContext(conf, pCtx, inputs, outputs);
+
+ // setup dynamic partition pruning where possible
+ runDynamicPartitionPruning(procCtx, inputs, outputs);
+
+ // setup stats in the operator plan
+ runStatsAnnotation(procCtx);
+
+ // run the optimizations that use stats for optimization
+ runStatsDependentOptimizations(procCtx, inputs, outputs);
+
+ // after the stats phase we might have some cyclic dependencies that we need
+ // to take care of.
+ runCycleAnalysisForPartitionPruning(procCtx, inputs, outputs);
+
+ }
+
+ private void runCycleAnalysisForPartitionPruning(OptimizeTezProcContext procCtx,
+ Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+
+ if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+ return;
+ }
+
+ boolean cycleFree = false;
+ while (!cycleFree) {
+ cycleFree = true;
+ Set<Set<Operator<?>>> components = getComponents(procCtx);
+ for (Set<Operator<?>> component : components) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Component: ");
+ for (Operator<?> co : component) {
+ LOG.debug("Operator: " + co.getName() + ", " + co.getIdentifier());
+ }
+ }
+ if (component.size() != 1) {
+ LOG.info("Found cycle in operator plan...");
+ cycleFree = false;
+ removeEventOperator(component);
+ }
+ }
+ LOG.info("Cycle free: " + cycleFree);
+ }
+ }
+
+ private void removeEventOperator(Set<Operator<?>> component) {
+ AppMasterEventOperator victim = null;
+ for (Operator<?> o : component) {
+ if (o instanceof AppMasterEventOperator) {
+ if (victim == null
+ || o.getConf().getStatistics().getDataSize() < victim.getConf().getStatistics()
+ .getDataSize()) {
+ victim = (AppMasterEventOperator) o;
+ }
+ }
+ }
+
+ Operator<?> child = victim;
+ Operator<?> curr = victim;
+
+ while (curr.getChildOperators().size() <= 1) {
+ child = curr;
+ curr = curr.getParentOperators().get(0);
+ }
+
+ // at this point we've found the fork in the op pipeline that has the
+ // pruning as a child plan.
+ LOG.info("Disabling dynamic pruning for: "
+ + ((DynamicPruningEventDesc) victim.getConf()).getTableScan().toString()
+ + ". Needed to break cyclic dependency");
+ curr.removeChild(child);
+ }
+
+ // Tarjan's algo
+ private Set<Set<Operator<?>>> getComponents(OptimizeTezProcContext procCtx) {
Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
- deque.addAll(pCtx.getTopOps().values());
+ deque.addAll(procCtx.parseContext.getTopOps().values());
- // Create the context for the walker
- OptimizeTezProcContext procCtx
- = new OptimizeTezProcContext(conf, pCtx, inputs, outputs, deque);
+ AtomicInteger index = new AtomicInteger();
+ Map<Operator<?>, Integer> indexes = new HashMap<Operator<?>, Integer>();
+ Map<Operator<?>, Integer> lowLinks = new HashMap<Operator<?>, Integer>();
+ Stack<Operator<?>> nodes = new Stack<Operator<?>>();
+ Set<Set<Operator<?>>> components = new HashSet<Set<Operator<?>>>();
+
+ for (Operator<?> o : deque) {
+ if (!indexes.containsKey(o)) {
+ connect(o, index, nodes, indexes, lowLinks, components);
+ }
+ }
+
+ return components;
+ }
+
+ private void connect(Operator<?> o, AtomicInteger index, Stack<Operator<?>> nodes,
+ Map<Operator<?>, Integer> indexes, Map<Operator<?>, Integer> lowLinks,
+ Set<Set<Operator<?>>> components) {
+
+ indexes.put(o, index.get());
+ lowLinks.put(o, index.get());
+ index.incrementAndGet();
+ nodes.push(o);
+
+ List<Operator<?>> children;
+ if (o instanceof AppMasterEventOperator) {
+ children = new ArrayList<Operator<?>>();
+ children.addAll(o.getChildOperators());
+ TableScanOperator ts = ((DynamicPruningEventDesc) o.getConf()).getTableScan();
+ LOG.debug("Adding special edge: " + o.getName() + " --> " + ts.toString());
+ children.add(ts);
+ } else {
+ children = o.getChildOperators();
+ }
+
+ for (Operator<?> child : children) {
+ if (!indexes.containsKey(child)) {
+ connect(child, index, nodes, indexes, lowLinks, components);
+ lowLinks.put(child, Math.min(lowLinks.get(o), lowLinks.get(child)));
+ } else if (nodes.contains(child)) {
+ lowLinks.put(o, Math.min(lowLinks.get(o), indexes.get(child)));
+ }
+ }
+
+ if (lowLinks.get(o).equals(indexes.get(o))) {
+ Set<Operator<?>> component = new HashSet<Operator<?>>();
+ components.add(component);
+ Operator<?> current;
+ do {
+ current = nodes.pop();
+ component.add(current);
+ } while (current != o);
+ }
+ }
+
+ private void runStatsAnnotation(OptimizeTezProcContext procCtx) throws SemanticException {
+ new AnnotateWithStatistics().transform(procCtx.parseContext);
+ new AnnotateWithOpTraits().transform(procCtx.parseContext);
+ }
+
+ private void runStatsDependentOptimizations(OptimizeTezProcContext procCtx,
+ Set<ReadEntity> inputs, Set<WriteEntity> outputs) throws SemanticException {
+
+ // Sequence of TableScan operators to be walked
+ Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+ deque.addAll(procCtx.parseContext.getTopOps().values());
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack.
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- opRules.put(new RuleRegExp(new String("Set parallelism - ReduceSink"),
+ opRules.put(new RuleRegExp("Set parallelism - ReduceSink",
ReduceSinkOperator.getOperatorName() + "%"),
new SetReducerParallelism());
- opRules.put(new RuleRegExp(new String("Convert Join to Map-join"),
+ opRules.put(new RuleRegExp("Convert Join to Map-join",
JoinOperator.getOperatorName() + "%"), new ConvertJoinMapJoin());
+ opRules.put(
+ new RuleRegExp("Remove dynamic pruning by size",
+ AppMasterEventOperator.getOperatorName() + "%"),
+ new RemoveDynamicPruningBySize());
+
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
List<Node> topNodes = new ArrayList<Node>();
- topNodes.addAll(pCtx.getTopOps().values());
+ topNodes.addAll(procCtx.parseContext.getTopOps().values());
+ GraphWalker ogw = new ForwardWalker(disp);
+ ogw.startWalking(topNodes, null);
+ }
+
+ private void runDynamicPartitionPruning(OptimizeTezProcContext procCtx, Set<ReadEntity> inputs,
+ Set<WriteEntity> outputs) throws SemanticException {
+
+ if (!procCtx.conf.getBoolVar(ConfVars.TEZ_DYNAMIC_PARTITION_PRUNING)) {
+ return;
+ }
+
+ // Sequence of TableScan operators to be walked
+ Deque<Operator<?>> deque = new LinkedList<Operator<?>>();
+ deque.addAll(procCtx.parseContext.getTopOps().values());
+
+ Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
+ opRules.put(
+ new RuleRegExp(new String("Dynamic Partition Pruning"), FilterOperator.getOperatorName()
+ + "%"), new DynamicPartitionPruningOptimization());
+
+ // The dispatcher fires the processor corresponding to the closest matching
+ // rule and passes the context along
+ Dispatcher disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+ List<Node> topNodes = new ArrayList<Node>();
+ topNodes.addAll(procCtx.parseContext.getTopOps().values());
GraphWalker ogw = new ForwardWalker(disp);
ogw.startWalking(topNodes, null);
+
+ // need a new run of the constant folding because we might have created lots
+ // of "and true and true" conditions.
+ new ConstantPropagate().transform(procCtx.parseContext);
}
@Override
@@ -158,19 +339,12 @@ public class TezCompiler extends TaskCom
new ProcessAnalyzeTable(GenTezUtils.getUtils()));
opRules.put(new RuleRegExp("Remember union",
- UnionOperator.getOperatorName() + "%"), new NodeProcessor()
- {
- @Override
- public Object process(Node n, Stack<Node> s,
- NodeProcessorCtx procCtx, Object... os) throws SemanticException {
- GenTezProcContext context = (GenTezProcContext) procCtx;
- UnionOperator union = (UnionOperator) n;
-
- // simply need to remember that we've seen a union.
- context.currentUnionOperators.add(union);
- return null;
- }
- });
+ UnionOperator.getOperatorName() + "%"),
+ new UnionProcessor());
+
+ opRules.put(new RuleRegExp("AppMasterEventOperator",
+ AppMasterEventOperator.getOperatorName() + "%"),
+ new AppMasterEventProcessor());
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
@@ -185,10 +359,17 @@ public class TezCompiler extends TaskCom
GenTezUtils.getUtils().removeUnionOperators(conf, procCtx, w);
}
- // finally make sure the file sink operators are set up right
+ // then we make sure the file sink operators are set up right
for (FileSinkOperator fileSink: procCtx.fileSinkSet) {
GenTezUtils.getUtils().processFileSink(procCtx, fileSink);
}
+
+ // and finally we hook up any events that need to be sent to the tez AM
+ LOG.debug("There are " + procCtx.eventOperatorSet.size() + " app master events.");
+ for (AppMasterEventOperator event : procCtx.eventOperatorSet) {
+ LOG.debug("Handling AppMasterEventOperator: " + event);
+ GenTezUtils.getUtils().processAppMasterEvent(procCtx, event);
+ }
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/AggregationDesc.java Mon Sep 8 04:38:17 2014
@@ -23,6 +23,7 @@ import java.io.Serializable;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.ql.exec.PTFUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.util.ReflectionUtils;
@@ -93,7 +94,7 @@ public class AggregationDesc implements
try {
return genericUDAFEvaluator =
ReflectionUtils.newInstance(Class.forName(genericUDAFEvaluatorClassName, true,
- JavaUtils.getClassLoader()).asSubclass(GenericUDAFEvaluator.class), null);
+ Utilities.getSessionSpecifiedClassLoader()).asSubclass(GenericUDAFEvaluator.class), null);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ColStatistics.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ColStatistics.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ColStatistics.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ColStatistics.java Mon Sep 8 04:38:17 2014
@@ -32,6 +32,7 @@ public class ColStatistics {
private double avgColLen;
private long numTrues;
private long numFalses;
+ private Range range;
public ColStatistics(String tabAlias, String colName, String colType) {
this.setTableAlias(tabAlias);
@@ -118,6 +119,17 @@ public class ColStatistics {
this.numFalses = numFalses;
}
+ public Range getRange() {
+ return range;
+ }
+
+ public void setRange(Number minVal, Number maxVal) {
+ this.range = new Range(minVal, maxVal);
+ }
+
+ public void setRange(Range r) {
+ this.range = r;
+ }
@Override
public String toString() {
@@ -150,7 +162,24 @@ public class ColStatistics {
clone.setNumNulls(numNulls);
clone.setNumTrues(numTrues);
clone.setNumFalses(numFalses);
+ if (range != null ) {
+ clone.setRange(range.clone());
+ }
return clone;
}
+ public static class Range {
+ public final Number minValue;
+ public final Number maxValue;
+ Range(Number minValue, Number maxValue) {
+ super();
+ this.minValue = minValue;
+ this.maxValue = maxValue;
+ }
+ @Override
+ public Range clone() {
+ return new Range(minValue, maxValue);
+ }
+ }
+
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java Mon Sep 8 04:38:17 2014
@@ -419,7 +419,7 @@ public class CreateTableDesc extends DDL
if (this.getStorageHandler() == null) {
try {
Class<?> origin = Class.forName(this.getOutputFormat(), true,
- JavaUtils.getClassLoader());
+ Utilities.getSessionSpecifiedClassLoader());
Class<? extends HiveOutputFormat> replaced = HiveFileFormatUtils
.getOutputFormatSubstitute(origin,false);
if (replaced == null) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java Mon Sep 8 04:38:17 2014
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
/**
* FileSinkDesc.
@@ -84,6 +85,10 @@ public class FileSinkDesc extends Abstra
private boolean statsCollectRawDataSize;
+ // Record what type of write this is. Default is non-ACID (ie old style).
+ private AcidUtils.Operation writeType = AcidUtils.Operation.NOT_ACID;
+ private long txnId = 0; // transaction id for this operation
+
public FileSinkDesc() {
}
@@ -137,6 +142,8 @@ public class FileSinkDesc extends Abstra
ret.setMaxStatsKeyPrefixLength(maxStatsKeyPrefixLength);
ret.setStatsCollectRawDataSize(statsCollectRawDataSize);
ret.setDpSortState(dpSortState);
+ ret.setWriteType(writeType);
+ ret.setTransactionId(txnId);
return (Object) ret;
}
@@ -398,4 +405,20 @@ public class FileSinkDesc extends Abstra
public void setDpSortState(DPSortState dpSortState) {
this.dpSortState = dpSortState;
}
+
+ public void setWriteType(AcidUtils.Operation type) {
+ writeType = type;
+ }
+
+ public AcidUtils.Operation getWriteType() {
+ return writeType;
+ }
+
+ public void setTransactionId(long id) {
+ txnId = id;
+ }
+
+ public long getTransactionId() {
+ return txnId;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java Mon Sep 8 04:38:17 2014
@@ -37,7 +37,7 @@ public class LoadFileDesc extends LoadDe
private String destinationCreateTable;
static {
- PTFUtils.makeTransient(LoadFileDesc.class, "targetDir");
+ PTFUtils.makeTransient(LoadFileDesc.class, "targetDir");
}
public LoadFileDesc() {
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadMultiFilesDesc.java Mon Sep 8 04:38:17 2014
@@ -38,7 +38,7 @@ public class LoadMultiFilesDesc implemen
private transient List<Path> srcDirs;
static {
- PTFUtils.makeTransient(LoadMultiFilesDesc.class, "targetDirs");
+ PTFUtils.makeTransient(LoadMultiFilesDesc.class, "targetDirs");
}
public LoadMultiFilesDesc() {
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Mon Sep 8 04:38:17 2014
@@ -26,9 +26,9 @@ import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Set;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -116,6 +116,16 @@ public class MapWork extends BaseWork {
private boolean useOneNullRowInputFormat;
+ private boolean dummyTableScan = false;
+
+ // used for dynamic partitioning
+ private Map<String, List<TableDesc>> eventSourceTableDescMap =
+ new LinkedHashMap<String, List<TableDesc>>();
+ private Map<String, List<String>> eventSourceColumnNameMap =
+ new LinkedHashMap<String, List<String>>();
+ private Map<String, List<ExprNodeDesc>> eventSourcePartKeyExprMap =
+ new LinkedHashMap<String, List<ExprNodeDesc>>();
+
public MapWork() {}
public MapWork(String name) {
@@ -525,4 +535,36 @@ public class MapWork extends BaseWork {
}
}
}
+
+ public void setDummyTableScan(boolean dummyTableScan) {
+ this.dummyTableScan = dummyTableScan;
+ }
+
+ public boolean getDummyTableScan() {
+ return dummyTableScan;
+ }
+
+ public void setEventSourceTableDescMap(Map<String, List<TableDesc>> map) {
+ this.eventSourceTableDescMap = map;
+ }
+
+ public Map<String, List<TableDesc>> getEventSourceTableDescMap() {
+ return eventSourceTableDescMap;
+ }
+
+ public void setEventSourceColumnNameMap(Map<String, List<String>> map) {
+ this.eventSourceColumnNameMap = map;
+ }
+
+ public Map<String, List<String>> getEventSourceColumnNameMap() {
+ return eventSourceColumnNameMap;
+ }
+
+ public Map<String, List<ExprNodeDesc>> getEventSourcePartKeyExprMap() {
+ return eventSourcePartKeyExprMap;
+ }
+
+ public void setEventSourcePartKeyExprMap(Map<String, List<ExprNodeDesc>> map) {
+ this.eventSourcePartKeyExprMap = map;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceWork.java Mon Sep 8 04:38:17 2014
@@ -20,8 +20,8 @@ package org.apache.hadoop.hive.ql.plan;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedHashSet;
import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -32,13 +32,9 @@ import org.apache.hadoop.hive.ql.exec.Fi
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.lib.NodeProcessor;
-import org.apache.hadoop.hive.ql.lib.Rule;
import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.mapred.JobConf;
@@ -99,7 +95,7 @@ public class ReduceWork extends BaseWork
private ObjectInspector keyObjectInspector = null;
private ObjectInspector valueObjectInspector = null;
- private Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>();
+ private final Map<String, Integer> reduceColumnNameMap = new LinkedHashMap<String, Integer>();
/**
* If the plan has a reducer and correspondingly a reduce-sink, then store the TableDesc pointing
@@ -118,7 +114,7 @@ public class ReduceWork extends BaseWork
private ObjectInspector getObjectInspector(TableDesc desc) {
ObjectInspector objectInspector;
try {
- Deserializer deserializer = (SerDe) ReflectionUtils.newInstance(desc
+ Deserializer deserializer = ReflectionUtils.newInstance(desc
.getDeserializerClass(), null);
SerDeUtils.initializeSerDe(deserializer, null, desc.getProperties(), null);
objectInspector = deserializer.getObjectInspector();
@@ -239,7 +235,6 @@ public class ReduceWork extends BaseWork
@Override
public void replaceRoots(Map<Operator<?>, Operator<?>> replacementMap) {
- assert replacementMap.size() == 1;
setReducer(replacementMap.get(getReducer()));
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java Mon Sep 8 04:38:17 2014
@@ -26,6 +26,7 @@ import java.util.Properties;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
@@ -65,7 +66,7 @@ public class TableDesc implements Serial
public Class<? extends Deserializer> getDeserializerClass() {
try {
return (Class<? extends Deserializer>) Class.forName(
- getSerdeClassName(), true, JavaUtils.getClassLoader());
+ getSerdeClassName(), true, Utilities.getSessionSpecifiedClassLoader());
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/ppd/OpProcFactory.java Mon Sep 8 04:38:17 2014
@@ -906,11 +906,8 @@ public final class OpProcFactory {
}
ExprNodeDesc condn = ExprNodeDescUtils.mergePredicates(preds);
- if(!(condn instanceof ExprNodeGenericFuncDesc)) {
- return null;
- }
- if (op instanceof TableScanOperator) {
+ if (op instanceof TableScanOperator && condn instanceof ExprNodeGenericFuncDesc) {
boolean pushFilterToStorage;
HiveConf hiveConf = owi.getParseContext().getConf();
pushFilterToStorage =
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java Mon Sep 8 04:38:17 2014
@@ -80,6 +80,8 @@ public final class CommandProcessorFacto
return new DeleteResourceProcessor();
case COMPILE:
return new CompileProcessor();
+ case RELOAD:
+ return new ReloadProcessor();
default:
throw new AssertionError("Unknown HiveCommand " + hiveCommand);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorResponse.java Mon Sep 8 04:38:17 2014
@@ -76,4 +76,9 @@ public class CommandProcessorResponse {
public String getSQLState() { return SQLState; }
public Schema getSchema() { return resSchema; }
public Throwable getException() { return exception; }
+ public String toString() {
+ return "(" + responseCode + "," + errorMessage + "," + SQLState +
+ (resSchema == null ? "" : ",") +
+ (exception == null ? "" : exception.getMessage()) + ")";
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java Mon Sep 8 04:38:17 2014
@@ -31,6 +31,7 @@ public enum HiveCommand {
DFS(),
ADD(),
LIST(),
+ RELOAD(),
DELETE(),
COMPILE();
private static final Set<String> COMMANDS = new HashSet<String>();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Mon Sep 8 04:38:17 2014
@@ -24,14 +24,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
+import java.net.URLClassLoader;
+import java.util.*;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
@@ -45,7 +39,6 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.ql.MapRedStats;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -56,6 +49,9 @@ import org.apache.hadoop.hive.ql.exec.te
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.history.HiveHistoryImpl;
import org.apache.hadoop.hive.ql.history.HiveHistoryProxyHandler;
+import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
+import org.apache.hadoop.hive.ql.lockmgr.TxnManagerFactory;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -71,6 +67,7 @@ import org.apache.hadoop.hive.ql.securit
import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactoryImpl;
import org.apache.hadoop.hive.ql.util.DosToUnix;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.base.Preconditions;
@@ -212,6 +209,36 @@ public class SessionState {
*/
private Path localSessionPath;
+ private String hdfsScratchDirURIString;
+
+ /**
+ * Transaction manager to use for this session. This is instantiated lazily by
+ * {@link #initTxnMgr(org.apache.hadoop.hive.conf.HiveConf)}
+ */
+ private HiveTxnManager txnMgr = null;
+
+ /**
+ * When {@link #setCurrentTxn(long)} is set to this or {@link #getCurrentTxn()}} returns this it
+ * indicates that there is not a current transaction in this session.
+ */
+ public static final long NO_CURRENT_TXN = -1L;
+
+ /**
+ * Transaction currently open
+ */
+ private long currentTxn = NO_CURRENT_TXN;
+
+ /**
+ * Whether we are in auto-commit state or not. Currently we are always in auto-commit,
+ * so there are not setters for this yet.
+ */
+ private boolean txnAutoCommit = true;
+
+ /**
+ * store the jars loaded last time
+ */
+ private final Set<String> preReloadableAuxJars = new HashSet<String>();
+
/**
* Get the lineage state stored in this session.
*
@@ -314,6 +341,37 @@ public class SessionState {
}
/**
+ * Initialize the transaction manager. This is done lazily to avoid hard wiring one
+ * transaction manager at the beginning of the session. In general users shouldn't change
+ * this, but it's useful for testing.
+ * @param conf Hive configuration to initialize transaction manager
+ * @return transaction manager
+ * @throws LockException
+ */
+ public HiveTxnManager initTxnMgr(HiveConf conf) throws LockException {
+ if (txnMgr == null) {
+ txnMgr = TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ }
+ return txnMgr;
+ }
+
+ public HiveTxnManager getTxnMgr() {
+ return txnMgr;
+ }
+
+ public long getCurrentTxn() {
+ return currentTxn;
+ }
+
+ public void setCurrentTxn(long currTxn) {
+ currentTxn = currTxn;
+ }
+
+ public boolean isAutoCommit() {
+ return txnAutoCommit;
+ }
+
+ /**
* Singleton Session object per thread.
*
**/
@@ -348,36 +406,39 @@ public class SessionState {
setCurrentSessionState(startSs);
- if(startSs.hiveHist == null){
+ if (startSs.hiveHist == null){
if (startSs.getConf().getBoolVar(HiveConf.ConfVars.HIVE_SESSION_HISTORY_ENABLED)) {
startSs.hiveHist = new HiveHistoryImpl(startSs);
- }else {
- //Hive history is disabled, create a no-op proxy
+ } else {
+ // Hive history is disabled, create a no-op proxy
startSs.hiveHist = HiveHistoryProxyHandler.getNoOpHiveHistoryProxy();
}
}
- if (startSs.getTmpOutputFile() == null) {
- // set temp file containing results to be sent to HiveClient
- try {
- startSs.setTmpOutputFile(createTempFile(startSs.getConf()));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
// Get the following out of the way when you start the session these take a
// while and should be done when we start up.
try {
- //Hive object instance should be created with a copy of the conf object. If the conf is
+ // Hive object instance should be created with a copy of the conf object. If the conf is
// shared with SessionState, other parts of the code might update the config, but
// Hive.get(HiveConf) would not recognize the case when it needs refreshing
Hive.get(new HiveConf(startSs.conf)).getMSC();
- ShimLoader.getHadoopShims().getUGIForConf(startSs.conf);
+ UserGroupInformation sessionUGI = ShimLoader.getHadoopShims().getUGIForConf(startSs.conf);
FileSystem.get(startSs.conf);
- startSs.createSessionPaths(startSs.conf);
+
+ // Create scratch dirs for this session
+ startSs.createSessionDirs(sessionUGI.getShortUserName());
+
+ // Set temp file containing results to be sent to HiveClient
+ if (startSs.getTmpOutputFile() == null) {
+ try {
+ startSs.setTmpOutputFile(createTempFile(startSs.getConf()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
} catch (Exception e) {
- // catch-all due to some exec time dependencies on session state
+ // Catch-all due to some exec time dependencies on session state
// that would cause ClassNoFoundException otherwise
throw new RuntimeException(e);
}
@@ -400,6 +461,88 @@ public class SessionState {
return startSs;
}
+ /**
+ * Create dirs & session paths for this session:
+ * 1. HDFS scratch dir
+ * 2. Local scratch dir
+ * 3. Local downloaded resource dir
+ * 4. HDFS session path
+ * 5. Local session path
+ * 6. HDFS temp table space
+ * @param userName
+ * @throws IOException
+ */
+ private void createSessionDirs(String userName) throws IOException {
+ HiveConf conf = getConf();
+ // First create the root scratch dir on hdfs (if it doesn't already exist) and make it writable
+ Path rootHDFSDirPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR));
+ String rootHDFSDirPermission = "777";
+ createPath(conf, rootHDFSDirPath, rootHDFSDirPermission, false, false);
+ // Now create session specific dirs
+ String scratchDirPermission = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION);
+ Path path;
+ // 1. HDFS scratch dir
+ path = new Path(rootHDFSDirPath, userName);
+ hdfsScratchDirURIString = path.toUri().toString();
+ createPath(conf, path, scratchDirPermission, false, false);
+ // 2. Local scratch dir
+ path = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR));
+ createPath(conf, path, scratchDirPermission, true, false);
+ // 3. Download resources dir
+ path = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));
+ createPath(conf, path, scratchDirPermission, true, false);
+ // Finally, create session paths for this session
+ // Local & non-local tmp location is configurable. however it is the same across
+ // all external file systems
+ String sessionId = getSessionId();
+ // 4. HDFS session path
+ hdfsSessionPath = new Path(hdfsScratchDirURIString, sessionId);
+ createPath(conf, hdfsSessionPath, scratchDirPermission, false, true);
+ conf.set(HDFS_SESSION_PATH_KEY, hdfsSessionPath.toUri().toString());
+ // 5. Local session path
+ localSessionPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR), sessionId);
+ createPath(conf, localSessionPath, scratchDirPermission, true, true);
+ conf.set(LOCAL_SESSION_PATH_KEY, localSessionPath.toUri().toString());
+ // 6. HDFS temp table space
+ hdfsTmpTableSpace = new Path(hdfsSessionPath, TMP_PREFIX);
+ createPath(conf, hdfsTmpTableSpace, scratchDirPermission, false, true);
+ conf.set(TMP_TABLE_SPACE_KEY, hdfsTmpTableSpace.toUri().toString());
+ }
+
+ /**
+ * Create a given path if it doesn't exist.
+ *
+ * @param conf
+ * @param pathString
+ * @param permission
+ * @param isLocal
+ * @param isCleanUp
+ * @return
+ * @throws IOException
+ */
+ private void createPath(HiveConf conf, Path path, String permission, boolean isLocal,
+ boolean isCleanUp) throws IOException {
+ FsPermission fsPermission = new FsPermission(permission);
+ FileSystem fs;
+ if (isLocal) {
+ fs = FileSystem.getLocal(conf);
+ } else {
+ fs = path.getFileSystem(conf);
+ }
+ if (!fs.exists(path)) {
+ fs.mkdirs(path, fsPermission);
+ String dirType = isLocal ? "local" : "HDFS";
+ LOG.info("Created " + dirType + " directory: " + path.toString());
+ }
+ if (isCleanUp) {
+ fs.deleteOnExit(path);
+ }
+ }
+
+ public String getHdfsScratchDirURIString() {
+ return hdfsScratchDirURIString;
+ }
+
public static Path getLocalSessionPath(Configuration conf) {
SessionState ss = SessionState.get();
if (ss == null) {
@@ -452,43 +595,6 @@ public class SessionState {
}
}
- private void createSessionPaths(Configuration conf) throws IOException {
-
- String scratchDirPermission = HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIRPERMISSION);
- String sessionId = getSessionId();
-
- // local & non-local tmp location is configurable. however it is the same across
- // all external file systems
- hdfsSessionPath =
- new Path(HiveConf.getVar(conf, HiveConf.ConfVars.SCRATCHDIR),
- sessionId);
- createPath(conf, hdfsSessionPath, scratchDirPermission);
- conf.set(HDFS_SESSION_PATH_KEY, hdfsSessionPath.toUri().toString());
-
- localSessionPath = new Path(HiveConf.getVar(conf, HiveConf.ConfVars.LOCALSCRATCHDIR),
- sessionId);
- createPath(conf, localSessionPath, scratchDirPermission);
- conf.set(LOCAL_SESSION_PATH_KEY, localSessionPath.toUri().toString());
- hdfsTmpTableSpace = new Path(hdfsSessionPath, TMP_PREFIX);
- createPath(conf, hdfsTmpTableSpace, scratchDirPermission);
- conf.set(TMP_TABLE_SPACE_KEY, hdfsTmpTableSpace.toUri().toString());
- }
-
- private void createPath(Configuration conf, Path p, String perm) throws IOException {
- FileSystem fs = p.getFileSystem(conf);
- p = new Path(fs.makeQualified(p).toString());
- FsPermission fsPermission = new FsPermission(Short.parseShort(perm.trim(), 8));
-
- if (!Utilities.createDirsWithPermission(conf, p, fsPermission)) {
- throw new IOException("Cannot create directory: "
- + p.toString());
- }
-
- // best effort to clean up if we don't shut down properly
- fs.deleteOnExit(p);
- }
-
-
/**
* Setup authentication and authorization plugins for this session.
*/
@@ -727,7 +833,6 @@ public class SessionState {
SessionState ss = SessionState.get();
Configuration conf = (ss == null) ? new Configuration() : ss.getConf();
- LogHelper console = getConsole();
for (String newFile : newFiles) {
try {
if (Utilities.realFile(newFile, conf) == null) {
@@ -741,6 +846,52 @@ public class SessionState {
}
}
+ // reloading the jars under the path specified in hive.reloadable.aux.jars.path property
+ public void reloadAuxJars() throws IOException {
+ final Set<String> reloadedAuxJars = new HashSet<String>();
+
+ final String renewableJarPath = conf.getVar(ConfVars.HIVERELOADABLEJARS);
+ // do nothing if this property is not specified or empty
+ if (renewableJarPath == null || renewableJarPath.isEmpty()) {
+ return;
+ }
+
+ Set<String> jarPaths = Utilities.getJarFilesByPath(renewableJarPath);
+
+ // load jars under the hive.reloadable.aux.jars.path
+ if(!jarPaths.isEmpty()){
+ reloadedAuxJars.addAll(jarPaths);
+ }
+
+ // remove the previous renewable jars
+ try {
+ if (preReloadableAuxJars != null && !preReloadableAuxJars.isEmpty()) {
+ Utilities.removeFromClassPath(preReloadableAuxJars.toArray(new String[0]));
+ }
+ } catch (Exception e) {
+ String msg = "Fail to remove the reloaded jars loaded last time: " + e;
+ throw new IOException(msg, e);
+ }
+
+ try {
+ if (reloadedAuxJars != null && !reloadedAuxJars.isEmpty()) {
+ URLClassLoader currentCLoader =
+ (URLClassLoader) SessionState.get().getConf().getClassLoader();
+ currentCLoader =
+ (URLClassLoader) Utilities.addToClassPath(currentCLoader,
+ reloadedAuxJars.toArray(new String[0]));
+ conf.setClassLoader(currentCLoader);
+ Thread.currentThread().setContextClassLoader(currentCLoader);
+ }
+ preReloadableAuxJars.clear();
+ preReloadableAuxJars.addAll(reloadedAuxJars);
+ } catch (Exception e) {
+ String msg =
+ "Fail to add jars from the path specified in hive.reloadable.aux.jars.path property: " + e;
+ throw new IOException(msg, e);
+ }
+ }
+
static void registerJars(List<String> newJars) throws IllegalArgumentException {
LogHelper console = getConsole();
try {
@@ -1054,6 +1205,7 @@ public class SessionState {
}
public void close() throws IOException {
+ if (txnMgr != null) txnMgr.closeTxnManager();
JavaUtils.closeClassLoadersTo(conf.getClassLoader(), parentLoader);
File resourceDir =
new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR));
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsFactory.java Mon Sep 8 04:38:17 2014
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.StatsSetupConst.StatDB;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.util.ReflectionUtils;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVESTATSDBCLASS;
@@ -87,7 +88,7 @@ public final class StatsFactory {
}
private boolean initialize(String type) {
- ClassLoader classLoader = JavaUtils.getClassLoader();
+ ClassLoader classLoader = Utilities.getSessionSpecifiedClassLoader();
try {
StatDB statDB = type.startsWith("jdbc") ? StatDB.jdbc : StatDB.valueOf(type);
publisherImplementation = (Class<? extends Serializable>)
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/StatsUtils.java Mon Sep 8 04:38:17 2014
@@ -25,10 +25,12 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AggrStats;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
+import org.apache.hadoop.hive.metastore.api.Decimal;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -39,8 +41,10 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
import org.apache.hadoop.hive.ql.plan.ColStatistics;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnListDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeFieldDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
import org.apache.hadoop.hive.ql.plan.Statistics;
@@ -76,6 +80,8 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableTimestampObjectInspector;
import org.apache.hadoop.io.BytesWritable;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -404,18 +410,22 @@ public class StatsUtils {
cs.setCountDistint(csd.getLongStats().getNumDVs());
cs.setNumNulls(csd.getLongStats().getNumNulls());
cs.setAvgColLen(JavaDataModel.get().primitive1());
+ cs.setRange(csd.getLongStats().getLowValue(), csd.getLongStats().getHighValue());
} else if (colType.equalsIgnoreCase(serdeConstants.BIGINT_TYPE_NAME)) {
cs.setCountDistint(csd.getLongStats().getNumDVs());
cs.setNumNulls(csd.getLongStats().getNumNulls());
cs.setAvgColLen(JavaDataModel.get().primitive2());
+ cs.setRange(csd.getLongStats().getLowValue(), csd.getLongStats().getHighValue());
} else if (colType.equalsIgnoreCase(serdeConstants.FLOAT_TYPE_NAME)) {
cs.setCountDistint(csd.getDoubleStats().getNumDVs());
cs.setNumNulls(csd.getDoubleStats().getNumNulls());
cs.setAvgColLen(JavaDataModel.get().primitive1());
+ cs.setRange(csd.getDoubleStats().getLowValue(), csd.getDoubleStats().getHighValue());
} else if (colType.equalsIgnoreCase(serdeConstants.DOUBLE_TYPE_NAME)) {
cs.setCountDistint(csd.getDoubleStats().getNumDVs());
cs.setNumNulls(csd.getDoubleStats().getNumNulls());
cs.setAvgColLen(JavaDataModel.get().primitive2());
+ cs.setRange(csd.getDoubleStats().getLowValue(), csd.getDoubleStats().getHighValue());
} else if (colType.equalsIgnoreCase(serdeConstants.STRING_TYPE_NAME)
|| colType.startsWith(serdeConstants.CHAR_TYPE_NAME)
|| colType.startsWith(serdeConstants.VARCHAR_TYPE_NAME)) {
@@ -441,6 +451,13 @@ public class StatsUtils {
cs.setAvgColLen(JavaDataModel.get().lengthOfDecimal());
cs.setCountDistint(csd.getDecimalStats().getNumDVs());
cs.setNumNulls(csd.getDecimalStats().getNumNulls());
+ Decimal val = csd.getDecimalStats().getHighValue();
+ BigDecimal maxVal = HiveDecimal.
+ create(new BigInteger(val.getUnscaled()), val.getScale()).bigDecimalValue();
+ val = csd.getDecimalStats().getLowValue();
+ BigDecimal minVal = HiveDecimal.
+ create(new BigInteger(val.getUnscaled()), val.getScale()).bigDecimalValue();
+ cs.setRange(minVal, maxVal);
} else if (colType.equalsIgnoreCase(serdeConstants.DATE_TYPE_NAME)) {
cs.setAvgColLen(JavaDataModel.get().lengthOfDate());
} else {
@@ -960,6 +977,22 @@ public class StatsUtils {
colName = ennd.getName();
colType = "null";
numNulls = numRows;
+ } else if (end instanceof ExprNodeColumnListDesc) {
+
+ // column list
+ ExprNodeColumnListDesc encd = (ExprNodeColumnListDesc) end;
+ colName = Joiner.on(",").join(encd.getCols());
+ colType = "array";
+ countDistincts = numRows;
+ oi = encd.getWritableObjectInspector();
+ } else if (end instanceof ExprNodeFieldDesc) {
+
+ // field within complex type
+ ExprNodeFieldDesc enfd = (ExprNodeFieldDesc) end;
+ colName = enfd.getFieldName();
+ colType = enfd.getTypeString();
+ countDistincts = numRows;
+ oi = enfd.getWritableObjectInspector();
}
if (colType.equalsIgnoreCase(serdeConstants.STRING_TYPE_NAME)
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsAggregator.java Mon Sep 8 04:38:17 2014
@@ -27,6 +27,7 @@ import java.sql.SQLRecoverableException;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,7 +47,8 @@ public class JDBCStatsAggregator impleme
private final Log LOG = LogFactory.getLog(this.getClass().getName());
private int timeout = 30;
private final String comment = "Hive stats aggregation: " + this.getClass().getName();
- private int maxRetries, waitWindow;
+ private int maxRetries;
+ private long waitWindow;
private final Random r;
public JDBCStatsAggregator() {
@@ -57,11 +59,14 @@ public class JDBCStatsAggregator impleme
@Override
public boolean connect(Configuration hiveconf, Task sourceTask) {
this.hiveconf = hiveconf;
- timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
+ timeout = (int) HiveConf.getTimeVar(
+ hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT, TimeUnit.SECONDS);
connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX);
- waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT);
+ waitWindow = HiveConf.getTimeVar(
+ hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS);
+ this.sourceTask = sourceTask;
try {
Class.forName(driver).newInstance();
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java?rev=1623263&r1=1623262&r2=1623263&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/stats/jdbc/JDBCStatsPublisher.java Mon Sep 8 04:38:17 2014
@@ -30,6 +30,7 @@ import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -48,7 +49,8 @@ public class JDBCStatsPublisher implemen
private int timeout; // default timeout in sec. for JDBC connection and statements
// SQL comment that identifies where the SQL statement comes from
private final String comment = "Hive stats publishing: " + this.getClass().getName();
- private int maxRetries, waitWindow;
+ private int maxRetries;
+ private long waitWindow;
private final Random r;
public JDBCStatsPublisher() {
@@ -59,9 +61,11 @@ public class JDBCStatsPublisher implemen
public boolean connect(Configuration hiveconf) {
this.hiveconf = hiveconf;
maxRetries = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_MAX);
- waitWindow = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT);
+ waitWindow = HiveConf.getTimeVar(
+ hiveconf, HiveConf.ConfVars.HIVE_STATS_RETRIES_WAIT, TimeUnit.MILLISECONDS);
connectionString = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSDBCONNECTIONSTRING);
- timeout = HiveConf.getIntVar(hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT);
+ timeout = (int) HiveConf.getTimeVar(
+ hiveconf, HiveConf.ConfVars.HIVE_STATS_JDBC_TIMEOUT, TimeUnit.SECONDS);
String driver = HiveConf.getVar(hiveconf, HiveConf.ConfVars.HIVESTATSJDBCDRIVER);
try {