You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 00:26:58 UTC
svn commit: r1629544 [11/33] - in /hive/branches/spark-new: ./
accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/
bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/
common/src/test/org/apache/hadoop/hive/common/type/ c...
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Sun Oct 5 22:26:43 2014
@@ -167,7 +167,8 @@ public class GenTezUtils {
GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
// remember which parent belongs to which tag
- reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
+ int tag = reduceSink.getConf().getTag();
+ reduceWork.getTagToInput().put(tag == -1 ? 0 : tag,
context.preceedingWork.getName());
// remember the output name of the reduce sink
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Sun Oct 5 22:26:43 2014
@@ -28,6 +28,8 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -38,11 +40,14 @@ import org.apache.hadoop.hive.ql.lib.Nod
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.plan.UnionWork;
/**
@@ -126,6 +131,48 @@ public class GenTezWork implements NodeP
context.childToWorkMap.get(operator).add(work);
}
+ // this transformation needs to be first because it changes the work item itself.
+ // which can affect the working of all downstream transformations.
+ if (context.currentMergeJoinOperator != null) {
+ // we are currently walking the big table side of the merge join. we need to create or hook up
+ // merge join work.
+ MergeJoinWork mergeJoinWork = null;
+ if (context.opMergeJoinWorkMap.containsKey(operator)) {
+ // we have found a merge work corresponding to this closing operator. Hook up this work.
+ mergeJoinWork = context.opMergeJoinWorkMap.get(operator);
+ } else {
+ // we need to create the merge join work
+ mergeJoinWork = new MergeJoinWork();
+ mergeJoinWork.setMergeJoinOperator(context.currentMergeJoinOperator);
+ tezWork.add(mergeJoinWork);
+ context.opMergeJoinWorkMap.put(operator, mergeJoinWork);
+ }
+ // connect the work correctly.
+ mergeJoinWork.addMergedWork(work, null);
+ Operator<? extends OperatorDesc> parentOp =
+ getParentFromStack(context.currentMergeJoinOperator, stack);
+ int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp);
+ work.setTag(pos);
+ tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);
+ for (BaseWork parentWork : tezWork.getParents(work)) {
+ TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work);
+ tezWork.disconnect(parentWork, work);
+ tezWork.connect(parentWork, mergeJoinWork, edgeProp);
+ }
+
+ for (BaseWork childWork : tezWork.getChildren(work)) {
+ TezEdgeProperty edgeProp = tezWork.getEdgeProperty(work, childWork);
+ tezWork.disconnect(work, childWork);
+ tezWork.connect(mergeJoinWork, childWork, edgeProp);
+ }
+ tezWork.remove(work);
+ context.rootToWorkMap.put(root, mergeJoinWork);
+ context.childToWorkMap.get(operator).remove(work);
+ context.childToWorkMap.get(operator).add(mergeJoinWork);
+ work = mergeJoinWork;
+ context.currentMergeJoinOperator = null;
+ }
+
// remember which mapjoin operator links with which work
if (!context.currentMapJoinOperators.isEmpty()) {
for (MapJoinOperator mj: context.currentMapJoinOperators) {
@@ -169,6 +216,9 @@ public class GenTezWork implements NodeP
LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
TezEdgeProperty edgeProp = parentWorkMap.getValue();
tezWork.connect(parentWork, work, edgeProp);
+ if (edgeProp.getEdgeType() == EdgeType.CUSTOM_EDGE) {
+ tezWork.setVertexType(work, VertexType.INITIALIZED_EDGES);
+ }
// need to set up output name for reduce sink now that we know the name
// of the downstream work
@@ -192,14 +242,6 @@ public class GenTezWork implements NodeP
context.currentMapJoinOperators.clear();
}
- // This is where we cut the tree as described above. We also remember that
- // we might have to connect parent work with this work later.
- for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) {
- context.leafOperatorToFollowingWork.put(parent, work);
- LOG.debug("Removing " + parent + " as parent from " + root);
- root.removeParent(parent);
- }
-
if (!context.currentUnionOperators.isEmpty()) {
// if there are union all operators we need to add the work to the set
// of union operators.
@@ -229,6 +271,21 @@ public class GenTezWork implements NodeP
work = unionWork;
}
+
+ // This is where we cut the tree as described above. We also remember that
+ // we might have to connect parent work with this work later.
+ boolean removeParents = false;
+ for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) {
+ removeParents = true;
+ context.leafOperatorToFollowingWork.put(parent, work);
+ LOG.debug("Removing " + parent + " as parent from " + root);
+ }
+ if (removeParents) {
+ for (Operator<?> parent : new ArrayList<Operator<?>>(root.getParentOperators())) {
+ root.removeParent(parent);
+ }
+ }
+
// We're scanning a tree from roots to leaf (this is not technically
// correct, demux and mux operators might form a diamond shape, but
// we will only scan one path and ignore the others, because the
@@ -248,31 +305,64 @@ public class GenTezWork implements NodeP
LOG.debug("Second pass. Leaf operator: "+operator
+" has common downstream work:"+followingWork);
- // need to add this branch to the key + value info
- assert operator instanceof ReduceSinkOperator
- && followingWork instanceof ReduceWork;
- ReduceSinkOperator rs = (ReduceSinkOperator) operator;
- ReduceWork rWork = (ReduceWork) followingWork;
- GenMapRedUtils.setKeyAndValueDesc(rWork, rs);
-
- // remember which parent belongs to which tag
- rWork.getTagToInput().put(rs.getConf().getTag(), work.getName());
-
- // remember the output name of the reduce sink
- rs.getConf().setOutputName(rWork.getName());
-
- if (!context.connectedReduceSinks.contains(rs)) {
- // add dependency between the two work items
- TezEdgeProperty edgeProp;
- if (rWork.isAutoReduceParallelism()) {
- edgeProp =
- new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
- rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer);
+ if (operator instanceof DummyStoreOperator) {
+ // this is the small table side.
+ assert (followingWork instanceof MergeJoinWork);
+ MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork;
+ CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
+ work.setTag(mergeJoinOp.getTagForOperator(operator));
+ mergeJoinWork.addMergedWork(null, work);
+ tezWork.setVertexType(mergeJoinWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);
+ for (BaseWork parentWork : tezWork.getParents(work)) {
+ TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work);
+ tezWork.disconnect(parentWork, work);
+ tezWork.connect(parentWork, mergeJoinWork, edgeProp);
+ }
+ work = mergeJoinWork;
+ } else {
+ // need to add this branch to the key + value info
+ assert operator instanceof ReduceSinkOperator
+ && ((followingWork instanceof ReduceWork) || (followingWork instanceof MergeJoinWork)
+ || followingWork instanceof UnionWork);
+ ReduceSinkOperator rs = (ReduceSinkOperator) operator;
+ ReduceWork rWork = null;
+ if (followingWork instanceof MergeJoinWork) {
+ MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork;
+ rWork = (ReduceWork) mergeJoinWork.getMainWork();
+ } else if (followingWork instanceof UnionWork) {
+ // this can only be possible if there is merge work followed by the union
+ UnionWork unionWork = (UnionWork) followingWork;
+ int index = getMergeIndex(tezWork, unionWork, rs);
+ // guaranteed to be instance of MergeJoinWork if index is valid
+ MergeJoinWork mergeJoinWork = (MergeJoinWork) tezWork.getChildren(unionWork).get(index);
+ // disconnect the connection to union work and connect to merge work
+ followingWork = mergeJoinWork;
+ rWork = (ReduceWork) mergeJoinWork.getMainWork();
} else {
- edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ rWork = (ReduceWork) followingWork;
+ }
+ GenMapRedUtils.setKeyAndValueDesc(rWork, rs);
+
+ // remember which parent belongs to which tag
+ int tag = rs.getConf().getTag();
+ rWork.getTagToInput().put(tag == -1 ? 0 : tag, work.getName());
+
+ // remember the output name of the reduce sink
+ rs.getConf().setOutputName(rWork.getName());
+
+ if (!context.connectedReduceSinks.contains(rs)) {
+ // add dependency between the two work items
+ TezEdgeProperty edgeProp;
+ if (rWork.isAutoReduceParallelism()) {
+ edgeProp =
+ new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
+ rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer);
+ } else {
+ edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+ }
+ tezWork.connect(work, followingWork, edgeProp);
+ context.connectedReduceSinks.add(rs);
}
- tezWork.connect(work, rWork, edgeProp);
- context.connectedReduceSinks.add(rs);
}
} else {
LOG.debug("First pass. Leaf operator: "+operator);
@@ -289,4 +379,28 @@ public class GenTezWork implements NodeP
return null;
}
+
+ private int getMergeIndex(TezWork tezWork, UnionWork unionWork, ReduceSinkOperator rs) {
+ int index = 0;
+ for (BaseWork baseWork : tezWork.getChildren(unionWork)) {
+ if (baseWork instanceof MergeJoinWork) {
+ MergeJoinWork mergeJoinWork = (MergeJoinWork) baseWork;
+ int tag = mergeJoinWork.getMergeJoinOperator().getTagForOperator(rs);
+ if (tag != -1) {
+ return index;
+ } else {
+ index++;
+ }
+ }
+ }
+
+ return -1;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Operator<? extends OperatorDesc> getParentFromStack(Node currentMergeJoinOperator,
+ Stack<Node> stack) {
+ int pos = stack.indexOf(currentMergeJoinOperator);
+ return (Operator<? extends OperatorDesc>) stack.get(pos - 1);
+ }
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Sun Oct 5 22:26:43 2014
@@ -479,8 +479,9 @@ import java.util.HashMap;
xlateMap.put("KW_SUBQUERY", "SUBQUERY");
xlateMap.put("KW_REWRITE", "REWRITE");
xlateMap.put("KW_UPDATE", "UPDATE");
-
xlateMap.put("KW_VALUES", "VALUES");
+ xlateMap.put("KW_PURGE", "PURGE");
+
// Operators
xlateMap.put("DOT", ".");
@@ -929,7 +930,7 @@ dropIndexStatement
dropTableStatement
@init { pushMsg("drop statement", state); }
@after { popMsg(state); }
- : KW_DROP KW_TABLE ifExists? tableName -> ^(TOK_DROPTABLE tableName ifExists?)
+ : KW_DROP KW_TABLE ifExists? tableName KW_PURGE? -> ^(TOK_DROPTABLE tableName ifExists? KW_PURGE?)
;
alterStatement
@@ -945,8 +946,6 @@ alterTableStatementSuffix
@init { pushMsg("alter table statement", state); }
@after { popMsg(state); }
: alterStatementSuffixRename[true]
- | alterStatementSuffixAddCol
- | alterStatementSuffixRenameCol
| alterStatementSuffixUpdateStatsCol
| alterStatementSuffixDropPartitions[true]
| alterStatementSuffixAddPartitions[true]
@@ -974,6 +973,8 @@ alterTblPartitionStatementSuffix
| alterStatementSuffixClusterbySortby
| alterStatementSuffixCompact
| alterStatementSuffixUpdateStatsCol
+ | alterStatementSuffixRenameCol
+ | alterStatementSuffixAddCol
;
alterStatementPartitionKeyType
@@ -2237,7 +2238,7 @@ deleteStatement
/*SET <columName> = (3 + col2)*/
columnAssignmentClause
:
- tableOrColumn EQUAL^ atomExpression
+ tableOrColumn EQUAL^ precedencePlusExpression
;
/*SET col1 = 5, col2 = (4 + col4), ...*/
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java Sun Oct 5 22:26:43 2014
@@ -131,7 +131,7 @@ public class ParseDriver {
* so that the graph walking algorithms and the rules framework defined in
* ql.lib can be used with the AST Nodes.
*/
- static final TreeAdaptor adaptor = new CommonTreeAdaptor() {
+ public static final TreeAdaptor adaptor = new CommonTreeAdaptor() {
/**
* Creates an ASTNode for the given token. The ASTNode is a wrapper around
* antlr's CommonTree class that implements the Node interface.
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java Sun Oct 5 22:26:43 2014
@@ -111,7 +111,7 @@ public final class ParseUtils {
* @param tableFieldTypeInfo TypeInfo to convert to
* @return Expression converting column to the type specified by tableFieldTypeInfo
*/
- static ExprNodeDesc createConversionCast(ExprNodeDesc column, PrimitiveTypeInfo tableFieldTypeInfo)
+ public static ExprNodeDesc createConversionCast(ExprNodeDesc column, PrimitiveTypeInfo tableFieldTypeInfo)
throws SemanticException {
// Get base type, since type string may be parameterized
String baseType = TypeInfoUtils.getBaseName(tableFieldTypeInfo.getTypeName());
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java Sun Oct 5 22:26:43 2014
@@ -104,10 +104,18 @@ public class QBMetaData {
return nameToDestTable.get(alias.toLowerCase());
}
+ public Map<String, Table> getNameToDestTable() {
+ return nameToDestTable;
+ }
+
public Partition getDestPartitionForAlias(String alias) {
return nameToDestPartition.get(alias.toLowerCase());
}
+ public Map<String, Partition> getNameToDestPartition() {
+ return nameToDestPartition;
+ }
+
public String getDestFileForAlias(String alias) {
return nameToDestFile.get(alias.toLowerCase());
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Sun Oct 5 22:26:43 2014
@@ -49,7 +49,7 @@ public class RowResolver implements Seri
* The primary(first) mapping is still only held in
* invRslvMap.
*/
- private Map<String, String[]> altInvRslvMap;
+ private final Map<String, String[]> altInvRslvMap;
private Map<String, ASTNode> expressionMap;
// TODO: Refactor this and do in a more object oriented manner
@@ -351,4 +351,73 @@ public class RowResolver implements Seri
this.expressionMap = expressionMap;
}
+
+ // TODO: 1) How to handle collisions? 2) Should we be cloning ColumnInfo or
+ // not?
+ public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
+ int outputColPos, int numColumns) throws SemanticException {
+ String tabAlias;
+ String colAlias;
+ String[] qualifiedColName;
+ int i = 0;
+
+ for (ColumnInfo cInfoFrmInput : rrToAddFrom.getRowSchema().getSignature()) {
+ if ( numColumns >= 0 && i == numColumns ) {
+ break;
+ }
+ ColumnInfo newCI = null;
+ qualifiedColName = rrToAddFrom.getInvRslvMap().get(
+ cInfoFrmInput.getInternalName());
+ tabAlias = qualifiedColName[0];
+ colAlias = qualifiedColName[1];
+
+ newCI = new ColumnInfo(cInfoFrmInput);
+ newCI.setInternalName(SemanticAnalyzer
+ .getColumnInternalName(outputColPos));
+
+ outputColPos++;
+
+ if (rrToAddTo.get(tabAlias, colAlias) != null) {
+ LOG.debug("Found duplicate column alias in RR: " + rrToAddTo.get(tabAlias, colAlias));
+ } else {
+ rrToAddTo.put(tabAlias, colAlias, newCI);
+ }
+
+ qualifiedColName = rrToAddFrom.getAlternateMappings(cInfoFrmInput
+ .getInternalName());
+ if (qualifiedColName != null) {
+ tabAlias = qualifiedColName[0];
+ colAlias = qualifiedColName[1];
+ rrToAddTo.put(tabAlias, colAlias, newCI);
+ }
+ i++;
+ }
+
+ return outputColPos;
+ }
+
+ public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
+ int outputColPos) throws SemanticException {
+ return add(rrToAddTo, rrToAddFrom, outputColPos, -1);
+ }
+
+ /**
+ * Return a new row resolver that is combination of left RR and right RR.
+ * The schema will be schema of left, schema of right
+ *
+ * @param leftRR
+ * @param rightRR
+ * @return
+ * @throws SemanticException
+ */
+ public static RowResolver getCombinedRR(RowResolver leftRR,
+ RowResolver rightRR) throws SemanticException {
+ int outputColPos = 0;
+
+ RowResolver combinedRR = new RowResolver();
+ outputColPos = add(combinedRR, leftRR, outputColPos);
+ outputColPos = add(combinedRR, rightRR, outputColPos);
+
+ return combinedRR;
+ }
}