You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 05:44:26 UTC
svn commit: r1629562 [8/38] - in /hive/branches/spark: ./ accumulo-handler/
beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ext/ common/
common/src/java/org/apache/hadoop/hive/conf/
common/src/test/org/apache/hadoop/hive/common/type/ contrib/src...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Mon Oct 6 03:44:13 2014
@@ -28,8 +28,6 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
-import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -40,14 +38,11 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
-import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.plan.UnionWork;
/**
@@ -131,48 +126,6 @@ public class GenTezWork implements NodeP
context.childToWorkMap.get(operator).add(work);
}
- // this transformation needs to be first because it changes the work item itself.
- // which can affect the working of all downstream transformations.
- if (context.currentMergeJoinOperator != null) {
- // we are currently walking the big table side of the merge join. we need to create or hook up
- // merge join work.
- MergeJoinWork mergeJoinWork = null;
- if (context.opMergeJoinWorkMap.containsKey(operator)) {
- // we have found a merge work corresponding to this closing operator. Hook up this work.
- mergeJoinWork = context.opMergeJoinWorkMap.get(operator);
- } else {
- // we need to create the merge join work
- mergeJoinWork = new MergeJoinWork();
- mergeJoinWork.setMergeJoinOperator(context.currentMergeJoinOperator);
- tezWork.add(mergeJoinWork);
- context.opMergeJoinWorkMap.put(operator, mergeJoinWork);
- }
- // connect the work correctly.
- mergeJoinWork.addMergedWork(work, null);
- Operator<? extends OperatorDesc> parentOp =
- getParentFromStack(context.currentMergeJoinOperator, stack);
- int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp);
- work.setTag(pos);
- tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);
- for (BaseWork parentWork : tezWork.getParents(work)) {
- TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work);
- tezWork.disconnect(parentWork, work);
- tezWork.connect(parentWork, mergeJoinWork, edgeProp);
- }
-
- for (BaseWork childWork : tezWork.getChildren(work)) {
- TezEdgeProperty edgeProp = tezWork.getEdgeProperty(work, childWork);
- tezWork.disconnect(work, childWork);
- tezWork.connect(mergeJoinWork, childWork, edgeProp);
- }
- tezWork.remove(work);
- context.rootToWorkMap.put(root, mergeJoinWork);
- context.childToWorkMap.get(operator).remove(work);
- context.childToWorkMap.get(operator).add(mergeJoinWork);
- work = mergeJoinWork;
- context.currentMergeJoinOperator = null;
- }
-
// remember which mapjoin operator links with which work
if (!context.currentMapJoinOperators.isEmpty()) {
for (MapJoinOperator mj: context.currentMapJoinOperators) {
@@ -216,9 +169,6 @@ public class GenTezWork implements NodeP
LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
TezEdgeProperty edgeProp = parentWorkMap.getValue();
tezWork.connect(parentWork, work, edgeProp);
- if (edgeProp.getEdgeType() == EdgeType.CUSTOM_EDGE) {
- tezWork.setVertexType(work, VertexType.INITIALIZED_EDGES);
- }
// need to set up output name for reduce sink now that we know the name
// of the downstream work
@@ -242,6 +192,14 @@ public class GenTezWork implements NodeP
context.currentMapJoinOperators.clear();
}
+ // This is where we cut the tree as described above. We also remember that
+ // we might have to connect parent work with this work later.
+ for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) {
+ context.leafOperatorToFollowingWork.put(parent, work);
+ LOG.debug("Removing " + parent + " as parent from " + root);
+ root.removeParent(parent);
+ }
+
if (!context.currentUnionOperators.isEmpty()) {
// if there are union all operators we need to add the work to the set
// of union operators.
@@ -271,21 +229,6 @@ public class GenTezWork implements NodeP
work = unionWork;
}
-
- // This is where we cut the tree as described above. We also remember that
- // we might have to connect parent work with this work later.
- boolean removeParents = false;
- for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) {
- removeParents = true;
- context.leafOperatorToFollowingWork.put(parent, work);
- LOG.debug("Removing " + parent + " as parent from " + root);
- }
- if (removeParents) {
- for (Operator<?> parent : new ArrayList<Operator<?>>(root.getParentOperators())) {
- root.removeParent(parent);
- }
- }
-
// We're scanning a tree from roots to leaf (this is not technically
// correct, demux and mux operators might form a diamond shape, but
// we will only scan one path and ignore the others, because the
@@ -305,64 +248,31 @@ public class GenTezWork implements NodeP
LOG.debug("Second pass. Leaf operator: "+operator
+" has common downstream work:"+followingWork);
- if (operator instanceof DummyStoreOperator) {
- // this is the small table side.
- assert (followingWork instanceof MergeJoinWork);
- MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork;
- CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
- work.setTag(mergeJoinOp.getTagForOperator(operator));
- mergeJoinWork.addMergedWork(null, work);
- tezWork.setVertexType(mergeJoinWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);
- for (BaseWork parentWork : tezWork.getParents(work)) {
- TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work);
- tezWork.disconnect(parentWork, work);
- tezWork.connect(parentWork, mergeJoinWork, edgeProp);
- }
- work = mergeJoinWork;
- } else {
- // need to add this branch to the key + value info
- assert operator instanceof ReduceSinkOperator
- && ((followingWork instanceof ReduceWork) || (followingWork instanceof MergeJoinWork)
- || followingWork instanceof UnionWork);
- ReduceSinkOperator rs = (ReduceSinkOperator) operator;
- ReduceWork rWork = null;
- if (followingWork instanceof MergeJoinWork) {
- MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork;
- rWork = (ReduceWork) mergeJoinWork.getMainWork();
- } else if (followingWork instanceof UnionWork) {
- // this can only be possible if there is merge work followed by the union
- UnionWork unionWork = (UnionWork) followingWork;
- int index = getMergeIndex(tezWork, unionWork, rs);
- // guaranteed to be instance of MergeJoinWork if index is valid
- MergeJoinWork mergeJoinWork = (MergeJoinWork) tezWork.getChildren(unionWork).get(index);
- // disconnect the connection to union work and connect to merge work
- followingWork = mergeJoinWork;
- rWork = (ReduceWork) mergeJoinWork.getMainWork();
+ // need to add this branch to the key + value info
+ assert operator instanceof ReduceSinkOperator
+ && followingWork instanceof ReduceWork;
+ ReduceSinkOperator rs = (ReduceSinkOperator) operator;
+ ReduceWork rWork = (ReduceWork) followingWork;
+ GenMapRedUtils.setKeyAndValueDesc(rWork, rs);
+
+ // remember which parent belongs to which tag
+ rWork.getTagToInput().put(rs.getConf().getTag(), work.getName());
+
+ // remember the output name of the reduce sink
+ rs.getConf().setOutputName(rWork.getName());
+
+ if (!context.connectedReduceSinks.contains(rs)) {
+ // add dependency between the two work items
+ TezEdgeProperty edgeProp;
+ if (rWork.isAutoReduceParallelism()) {
+ edgeProp =
+ new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
+ rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer);
} else {
- rWork = (ReduceWork) followingWork;
- }
- GenMapRedUtils.setKeyAndValueDesc(rWork, rs);
-
- // remember which parent belongs to which tag
- int tag = rs.getConf().getTag();
- rWork.getTagToInput().put(tag == -1 ? 0 : tag, work.getName());
-
- // remember the output name of the reduce sink
- rs.getConf().setOutputName(rWork.getName());
-
- if (!context.connectedReduceSinks.contains(rs)) {
- // add dependency between the two work items
- TezEdgeProperty edgeProp;
- if (rWork.isAutoReduceParallelism()) {
- edgeProp =
- new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
- rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer);
- } else {
- edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
- }
- tezWork.connect(work, followingWork, edgeProp);
- context.connectedReduceSinks.add(rs);
+ edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
}
+ tezWork.connect(work, rWork, edgeProp);
+ context.connectedReduceSinks.add(rs);
}
} else {
LOG.debug("First pass. Leaf operator: "+operator);
@@ -379,28 +289,4 @@ public class GenTezWork implements NodeP
return null;
}
-
- private int getMergeIndex(TezWork tezWork, UnionWork unionWork, ReduceSinkOperator rs) {
- int index = 0;
- for (BaseWork baseWork : tezWork.getChildren(unionWork)) {
- if (baseWork instanceof MergeJoinWork) {
- MergeJoinWork mergeJoinWork = (MergeJoinWork) baseWork;
- int tag = mergeJoinWork.getMergeJoinOperator().getTagForOperator(rs);
- if (tag != -1) {
- return index;
- } else {
- index++;
- }
- }
- }
-
- return -1;
- }
-
- @SuppressWarnings("unchecked")
- private Operator<? extends OperatorDesc> getParentFromStack(Node currentMergeJoinOperator,
- Stack<Node> stack) {
- int pos = stack.indexOf(currentMergeJoinOperator);
- return (Operator<? extends OperatorDesc>) stack.get(pos - 1);
- }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Mon Oct 6 03:44:13 2014
@@ -479,9 +479,8 @@ import java.util.HashMap;
xlateMap.put("KW_SUBQUERY", "SUBQUERY");
xlateMap.put("KW_REWRITE", "REWRITE");
xlateMap.put("KW_UPDATE", "UPDATE");
- xlateMap.put("KW_VALUES", "VALUES");
- xlateMap.put("KW_PURGE", "PURGE");
+ xlateMap.put("KW_VALUES", "VALUES");
// Operators
xlateMap.put("DOT", ".");
@@ -930,7 +929,7 @@ dropIndexStatement
dropTableStatement
@init { pushMsg("drop statement", state); }
@after { popMsg(state); }
- : KW_DROP KW_TABLE ifExists? tableName KW_PURGE? -> ^(TOK_DROPTABLE tableName ifExists? KW_PURGE?)
+ : KW_DROP KW_TABLE ifExists? tableName -> ^(TOK_DROPTABLE tableName ifExists?)
;
alterStatement
@@ -946,6 +945,8 @@ alterTableStatementSuffix
@init { pushMsg("alter table statement", state); }
@after { popMsg(state); }
: alterStatementSuffixRename[true]
+ | alterStatementSuffixAddCol
+ | alterStatementSuffixRenameCol
| alterStatementSuffixUpdateStatsCol
| alterStatementSuffixDropPartitions[true]
| alterStatementSuffixAddPartitions[true]
@@ -973,8 +974,6 @@ alterTblPartitionStatementSuffix
| alterStatementSuffixClusterbySortby
| alterStatementSuffixCompact
| alterStatementSuffixUpdateStatsCol
- | alterStatementSuffixRenameCol
- | alterStatementSuffixAddCol
;
alterStatementPartitionKeyType
@@ -2238,7 +2237,7 @@ deleteStatement
/*SET <columName> = (3 + col2)*/
columnAssignmentClause
:
- tableOrColumn EQUAL^ precedencePlusExpression
+ tableOrColumn EQUAL^ atomExpression
;
/*SET col1 = 5, col2 = (4 + col4), ...*/
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java Mon Oct 6 03:44:13 2014
@@ -131,7 +131,7 @@ public class ParseDriver {
* so that the graph walking algorithms and the rules framework defined in
* ql.lib can be used with the AST Nodes.
*/
- public static final TreeAdaptor adaptor = new CommonTreeAdaptor() {
+ static final TreeAdaptor adaptor = new CommonTreeAdaptor() {
/**
* Creates an ASTNode for the given token. The ASTNode is a wrapper around
* antlr's CommonTree class that implements the Node interface.
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java Mon Oct 6 03:44:13 2014
@@ -111,7 +111,7 @@ public final class ParseUtils {
* @param tableFieldTypeInfo TypeInfo to convert to
* @return Expression converting column to the type specified by tableFieldTypeInfo
*/
- public static ExprNodeDesc createConversionCast(ExprNodeDesc column, PrimitiveTypeInfo tableFieldTypeInfo)
+ static ExprNodeDesc createConversionCast(ExprNodeDesc column, PrimitiveTypeInfo tableFieldTypeInfo)
throws SemanticException {
// Get base type, since type string may be parameterized
String baseType = TypeInfoUtils.getBaseName(tableFieldTypeInfo.getTypeName());
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Mon Oct 6 03:44:13 2014
@@ -49,7 +49,7 @@ public class RowResolver implements Seri
* The primary(first) mapping is still only held in
* invRslvMap.
*/
- private final Map<String, String[]> altInvRslvMap;
+ private Map<String, String[]> altInvRslvMap;
private Map<String, ASTNode> expressionMap;
// TODO: Refactor this and do in a more object oriented manner
@@ -351,73 +351,4 @@ public class RowResolver implements Seri
this.expressionMap = expressionMap;
}
-
- // TODO: 1) How to handle collisions? 2) Should we be cloning ColumnInfo or
- // not?
- public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
- int outputColPos, int numColumns) throws SemanticException {
- String tabAlias;
- String colAlias;
- String[] qualifiedColName;
- int i = 0;
-
- for (ColumnInfo cInfoFrmInput : rrToAddFrom.getRowSchema().getSignature()) {
- if ( numColumns >= 0 && i == numColumns ) {
- break;
- }
- ColumnInfo newCI = null;
- qualifiedColName = rrToAddFrom.getInvRslvMap().get(
- cInfoFrmInput.getInternalName());
- tabAlias = qualifiedColName[0];
- colAlias = qualifiedColName[1];
-
- newCI = new ColumnInfo(cInfoFrmInput);
- newCI.setInternalName(SemanticAnalyzer
- .getColumnInternalName(outputColPos));
-
- outputColPos++;
-
- if (rrToAddTo.get(tabAlias, colAlias) != null) {
- LOG.debug("Found duplicate column alias in RR: " + rrToAddTo.get(tabAlias, colAlias));
- } else {
- rrToAddTo.put(tabAlias, colAlias, newCI);
- }
-
- qualifiedColName = rrToAddFrom.getAlternateMappings(cInfoFrmInput
- .getInternalName());
- if (qualifiedColName != null) {
- tabAlias = qualifiedColName[0];
- colAlias = qualifiedColName[1];
- rrToAddTo.put(tabAlias, colAlias, newCI);
- }
- i++;
- }
-
- return outputColPos;
- }
-
- public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
- int outputColPos) throws SemanticException {
- return add(rrToAddTo, rrToAddFrom, outputColPos, -1);
- }
-
- /**
- * Return a new row resolver that is combination of left RR and right RR.
- * The schema will be schema of left, schema of right
- *
- * @param leftRR
- * @param rightRR
- * @return
- * @throws SemanticException
- */
- public static RowResolver getCombinedRR(RowResolver leftRR,
- RowResolver rightRR) throws SemanticException {
- int outputColPos = 0;
-
- RowResolver combinedRR = new RowResolver();
- outputColPos = add(combinedRR, leftRR, outputColPos);
- outputColPos = add(combinedRR, rightRR, outputColPos);
-
- return combinedRR;
- }
}