You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 00:26:58 UTC

svn commit: r1629544 [11/33] - in /hive/branches/spark-new: ./ accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/common/type/ c...

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezUtils.java Sun Oct  5 22:26:43 2014
@@ -167,7 +167,8 @@ public class GenTezUtils {
     GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
 
     // remember which parent belongs to which tag
-    reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
+    int tag = reduceSink.getConf().getTag();
+    reduceWork.getTagToInput().put(tag == -1 ? 0 : tag,
          context.preceedingWork.getName());
 
     // remember the output name of the reduce sink

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Sun Oct  5 22:26:43 2014
@@ -28,6 +28,8 @@ import java.util.Stack;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
@@ -38,11 +40,14 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
 
 /**
@@ -126,6 +131,48 @@ public class GenTezWork implements NodeP
       context.childToWorkMap.get(operator).add(work);
     }
 
+    // this transformation needs to be first because it changes the work item itself.
+    // which can affect the working of all downstream transformations.
+    if (context.currentMergeJoinOperator != null) {
+      // we are currently walking the big table side of the merge join. we need to create or hook up
+      // merge join work.
+      MergeJoinWork mergeJoinWork = null;
+      if (context.opMergeJoinWorkMap.containsKey(operator)) {
+        // we have found a merge work corresponding to this closing operator. Hook up this work.
+        mergeJoinWork = context.opMergeJoinWorkMap.get(operator);
+      } else {
+        // we need to create the merge join work
+        mergeJoinWork = new MergeJoinWork();
+        mergeJoinWork.setMergeJoinOperator(context.currentMergeJoinOperator);
+        tezWork.add(mergeJoinWork);
+        context.opMergeJoinWorkMap.put(operator, mergeJoinWork);
+      }
+      // connect the work correctly.
+      mergeJoinWork.addMergedWork(work, null);
+      Operator<? extends OperatorDesc> parentOp =
+          getParentFromStack(context.currentMergeJoinOperator, stack);
+      int pos = context.currentMergeJoinOperator.getTagForOperator(parentOp);
+      work.setTag(pos);
+      tezWork.setVertexType(work, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);
+      for (BaseWork parentWork : tezWork.getParents(work)) {
+        TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work);
+        tezWork.disconnect(parentWork, work);
+        tezWork.connect(parentWork, mergeJoinWork, edgeProp);
+      }
+
+      for (BaseWork childWork : tezWork.getChildren(work)) {
+        TezEdgeProperty edgeProp = tezWork.getEdgeProperty(work, childWork);
+        tezWork.disconnect(work, childWork);
+        tezWork.connect(mergeJoinWork, childWork, edgeProp);
+      }
+      tezWork.remove(work);
+      context.rootToWorkMap.put(root, mergeJoinWork);
+      context.childToWorkMap.get(operator).remove(work);
+      context.childToWorkMap.get(operator).add(mergeJoinWork);
+      work = mergeJoinWork;
+      context.currentMergeJoinOperator = null;
+    }
+
     // remember which mapjoin operator links with which work
     if (!context.currentMapJoinOperators.isEmpty()) {
       for (MapJoinOperator mj: context.currentMapJoinOperators) {
@@ -169,6 +216,9 @@ public class GenTezWork implements NodeP
               LOG.debug("connecting "+parentWork.getName()+" with "+work.getName());
               TezEdgeProperty edgeProp = parentWorkMap.getValue();
               tezWork.connect(parentWork, work, edgeProp);
+              if (edgeProp.getEdgeType() == EdgeType.CUSTOM_EDGE) {
+                tezWork.setVertexType(work, VertexType.INITIALIZED_EDGES);
+              }
 
               // need to set up output name for reduce sink now that we know the name
               // of the downstream work
@@ -192,14 +242,6 @@ public class GenTezWork implements NodeP
       context.currentMapJoinOperators.clear();
     }
 
-    // This is where we cut the tree as described above. We also remember that
-    // we might have to connect parent work with this work later.
-    for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) {
-      context.leafOperatorToFollowingWork.put(parent, work);
-      LOG.debug("Removing " + parent + " as parent from " + root);
-      root.removeParent(parent);
-    }
-
     if (!context.currentUnionOperators.isEmpty()) {
       // if there are union all operators we need to add the work to the set
       // of union operators.
@@ -229,6 +271,21 @@ public class GenTezWork implements NodeP
       work = unionWork;
     }
 
+
+    // This is where we cut the tree as described above. We also remember that
+    // we might have to connect parent work with this work later.
+    boolean removeParents = false;
+    for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators())) {
+      removeParents = true;
+      context.leafOperatorToFollowingWork.put(parent, work);
+      LOG.debug("Removing " + parent + " as parent from " + root);
+    }
+    if (removeParents) {
+      for (Operator<?> parent : new ArrayList<Operator<?>>(root.getParentOperators())) {
+        root.removeParent(parent);
+      }
+    }
+
     // We're scanning a tree from roots to leaf (this is not technically
     // correct, demux and mux operators might form a diamond shape, but
     // we will only scan one path and ignore the others, because the
@@ -248,31 +305,64 @@ public class GenTezWork implements NodeP
       LOG.debug("Second pass. Leaf operator: "+operator
         +" has common downstream work:"+followingWork);
 
-      // need to add this branch to the key + value info
-      assert operator instanceof ReduceSinkOperator
-        && followingWork instanceof ReduceWork;
-      ReduceSinkOperator rs = (ReduceSinkOperator) operator;
-      ReduceWork rWork = (ReduceWork) followingWork;
-      GenMapRedUtils.setKeyAndValueDesc(rWork, rs);
-
-      // remember which parent belongs to which tag
-      rWork.getTagToInput().put(rs.getConf().getTag(), work.getName());
-
-      // remember the output name of the reduce sink
-      rs.getConf().setOutputName(rWork.getName());
-
-      if (!context.connectedReduceSinks.contains(rs)) {
-        // add dependency between the two work items
-        TezEdgeProperty edgeProp;
-        if (rWork.isAutoReduceParallelism()) {
-          edgeProp =
-              new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
-                  rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer);
+      if (operator instanceof DummyStoreOperator) {
+        // this is the small table side.
+        assert (followingWork instanceof MergeJoinWork);
+        MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork;
+        CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
+        work.setTag(mergeJoinOp.getTagForOperator(operator));
+        mergeJoinWork.addMergedWork(null, work);
+        tezWork.setVertexType(mergeJoinWork, VertexType.MULTI_INPUT_UNINITIALIZED_EDGES);
+        for (BaseWork parentWork : tezWork.getParents(work)) {
+          TezEdgeProperty edgeProp = tezWork.getEdgeProperty(parentWork, work);
+          tezWork.disconnect(parentWork, work);
+          tezWork.connect(parentWork, mergeJoinWork, edgeProp);
+        }
+        work = mergeJoinWork;
+      } else {
+        // need to add this branch to the key + value info
+        assert operator instanceof ReduceSinkOperator
+            && ((followingWork instanceof ReduceWork) || (followingWork instanceof MergeJoinWork)
+                || followingWork instanceof UnionWork);
+        ReduceSinkOperator rs = (ReduceSinkOperator) operator;
+        ReduceWork rWork = null;
+        if (followingWork instanceof MergeJoinWork) {
+          MergeJoinWork mergeJoinWork = (MergeJoinWork) followingWork;
+          rWork = (ReduceWork) mergeJoinWork.getMainWork();
+        } else if (followingWork instanceof UnionWork) {
+          // this can only be possible if there is merge work followed by the union
+          UnionWork unionWork = (UnionWork) followingWork;
+          int index = getMergeIndex(tezWork, unionWork, rs);
+          // guaranteed to be instance of MergeJoinWork if index is valid
+          MergeJoinWork mergeJoinWork = (MergeJoinWork) tezWork.getChildren(unionWork).get(index);
+          // disconnect the connection to union work and connect to merge work
+          followingWork = mergeJoinWork;
+          rWork = (ReduceWork) mergeJoinWork.getMainWork();
         } else {
-          edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+          rWork = (ReduceWork) followingWork;
+        }
+        GenMapRedUtils.setKeyAndValueDesc(rWork, rs);
+
+        // remember which parent belongs to which tag
+        int tag = rs.getConf().getTag();
+        rWork.getTagToInput().put(tag == -1 ? 0 : tag, work.getName());
+
+        // remember the output name of the reduce sink
+        rs.getConf().setOutputName(rWork.getName());
+
+        if (!context.connectedReduceSinks.contains(rs)) {
+          // add dependency between the two work items
+          TezEdgeProperty edgeProp;
+          if (rWork.isAutoReduceParallelism()) {
+            edgeProp =
+                new TezEdgeProperty(context.conf, EdgeType.SIMPLE_EDGE, true,
+                    rWork.getMinReduceTasks(), rWork.getMaxReduceTasks(), bytesPerReducer);
+          } else {
+            edgeProp = new TezEdgeProperty(EdgeType.SIMPLE_EDGE);
+          }
+          tezWork.connect(work, followingWork, edgeProp);
+          context.connectedReduceSinks.add(rs);
         }
-        tezWork.connect(work, rWork, edgeProp);
-        context.connectedReduceSinks.add(rs);
       }
     } else {
       LOG.debug("First pass. Leaf operator: "+operator);
@@ -289,4 +379,28 @@ public class GenTezWork implements NodeP
 
     return null;
   }
+
+  private int getMergeIndex(TezWork tezWork, UnionWork unionWork, ReduceSinkOperator rs) {
+    int index = 0;
+    for (BaseWork baseWork : tezWork.getChildren(unionWork)) {
+      if (baseWork instanceof MergeJoinWork) {
+        MergeJoinWork mergeJoinWork = (MergeJoinWork) baseWork;
+        int tag = mergeJoinWork.getMergeJoinOperator().getTagForOperator(rs);
+        if (tag != -1) {
+          return index;
+        } else {
+          index++;
+        }
+      }
+    }
+
+    return -1;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Operator<? extends OperatorDesc> getParentFromStack(Node currentMergeJoinOperator,
+      Stack<Node> stack) {
+    int pos = stack.indexOf(currentMergeJoinOperator);
+    return (Operator<? extends OperatorDesc>) stack.get(pos - 1);
+  }
 }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g Sun Oct  5 22:26:43 2014
@@ -479,8 +479,9 @@ import java.util.HashMap;
     xlateMap.put("KW_SUBQUERY", "SUBQUERY");
     xlateMap.put("KW_REWRITE", "REWRITE");
     xlateMap.put("KW_UPDATE", "UPDATE");
-
     xlateMap.put("KW_VALUES", "VALUES");
+    xlateMap.put("KW_PURGE", "PURGE");
+
 
     // Operators
     xlateMap.put("DOT", ".");
@@ -929,7 +930,7 @@ dropIndexStatement
 dropTableStatement
 @init { pushMsg("drop statement", state); }
 @after { popMsg(state); }
-    : KW_DROP KW_TABLE ifExists? tableName -> ^(TOK_DROPTABLE tableName ifExists?)
+    : KW_DROP KW_TABLE ifExists? tableName KW_PURGE? -> ^(TOK_DROPTABLE tableName ifExists? KW_PURGE?)
     ;
 
 alterStatement
@@ -945,8 +946,6 @@ alterTableStatementSuffix
 @init { pushMsg("alter table statement", state); }
 @after { popMsg(state); }
     : alterStatementSuffixRename[true]
-    | alterStatementSuffixAddCol
-    | alterStatementSuffixRenameCol
     | alterStatementSuffixUpdateStatsCol
     | alterStatementSuffixDropPartitions[true]
     | alterStatementSuffixAddPartitions[true]
@@ -974,6 +973,8 @@ alterTblPartitionStatementSuffix
   | alterStatementSuffixClusterbySortby
   | alterStatementSuffixCompact
   | alterStatementSuffixUpdateStatsCol
+  | alterStatementSuffixRenameCol
+  | alterStatementSuffixAddCol
   ;
 
 alterStatementPartitionKeyType
@@ -2237,7 +2238,7 @@ deleteStatement
 /*SET <columName> = (3 + col2)*/
 columnAssignmentClause
    :
-   tableOrColumn EQUAL^ atomExpression
+   tableOrColumn EQUAL^ precedencePlusExpression
    ;
 
 /*SET col1 = 5, col2 = (4 + col4), ...*/

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseDriver.java Sun Oct  5 22:26:43 2014
@@ -131,7 +131,7 @@ public class ParseDriver {
    * so that the graph walking algorithms and the rules framework defined in
    * ql.lib can be used with the AST Nodes.
    */
-  static final TreeAdaptor adaptor = new CommonTreeAdaptor() {
+  public static final TreeAdaptor adaptor = new CommonTreeAdaptor() {
     /**
      * Creates an ASTNode for the given token. The ASTNode is a wrapper around
      * antlr's CommonTree class that implements the Node interface.

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/ParseUtils.java Sun Oct  5 22:26:43 2014
@@ -111,7 +111,7 @@ public final class ParseUtils {
    * @param tableFieldTypeInfo TypeInfo to convert to
    * @return Expression converting column to the type specified by tableFieldTypeInfo
    */
-  static ExprNodeDesc createConversionCast(ExprNodeDesc column, PrimitiveTypeInfo tableFieldTypeInfo)
+  public static ExprNodeDesc createConversionCast(ExprNodeDesc column, PrimitiveTypeInfo tableFieldTypeInfo)
       throws SemanticException {
     // Get base type, since type string may be parameterized
     String baseType = TypeInfoUtils.getBaseName(tableFieldTypeInfo.getTypeName());

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/QBMetaData.java Sun Oct  5 22:26:43 2014
@@ -104,10 +104,18 @@ public class QBMetaData {
     return nameToDestTable.get(alias.toLowerCase());
   }
 
+  public Map<String, Table> getNameToDestTable() {
+    return nameToDestTable;
+  }
+
   public Partition getDestPartitionForAlias(String alias) {
     return nameToDestPartition.get(alias.toLowerCase());
   }
 
+  public Map<String, Partition> getNameToDestPartition() {
+    return nameToDestPartition;
+  }
+
   public String getDestFileForAlias(String alias) {
     return nameToDestFile.get(alias.toLowerCase());
   }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/parse/RowResolver.java Sun Oct  5 22:26:43 2014
@@ -49,7 +49,7 @@ public class RowResolver implements Seri
    * The primary(first) mapping is still only held in
    * invRslvMap.
    */
-  private Map<String, String[]> altInvRslvMap;
+  private final Map<String, String[]> altInvRslvMap;
   private  Map<String, ASTNode> expressionMap;
 
   // TODO: Refactor this and do in a more object oriented manner
@@ -351,4 +351,73 @@ public class RowResolver implements Seri
     this.expressionMap = expressionMap;
   }
 
+
+  // TODO: 1) How to handle collisions? 2) Should we be cloning ColumnInfo or
+  // not?
+  public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
+      int outputColPos, int numColumns) throws SemanticException {
+    String tabAlias;
+    String colAlias;
+    String[] qualifiedColName;
+    int i = 0;
+
+    for (ColumnInfo cInfoFrmInput : rrToAddFrom.getRowSchema().getSignature()) {
+      if ( numColumns >= 0 && i == numColumns ) {
+        break;
+      }
+      ColumnInfo newCI = null;
+      qualifiedColName = rrToAddFrom.getInvRslvMap().get(
+          cInfoFrmInput.getInternalName());
+      tabAlias = qualifiedColName[0];
+      colAlias = qualifiedColName[1];
+
+      newCI = new ColumnInfo(cInfoFrmInput);
+      newCI.setInternalName(SemanticAnalyzer
+          .getColumnInternalName(outputColPos));
+
+      outputColPos++;
+
+      if (rrToAddTo.get(tabAlias, colAlias) != null) {
+        LOG.debug("Found duplicate column alias in RR: " + rrToAddTo.get(tabAlias, colAlias));
+      } else {
+        rrToAddTo.put(tabAlias, colAlias, newCI);
+      }
+
+      qualifiedColName = rrToAddFrom.getAlternateMappings(cInfoFrmInput
+          .getInternalName());
+      if (qualifiedColName != null) {
+        tabAlias = qualifiedColName[0];
+        colAlias = qualifiedColName[1];
+        rrToAddTo.put(tabAlias, colAlias, newCI);
+      }
+      i++;
+    }
+
+    return outputColPos;
+	}
+
+  public static int add(RowResolver rrToAddTo, RowResolver rrToAddFrom,
+      int outputColPos) throws SemanticException {
+    return add(rrToAddTo, rrToAddFrom, outputColPos, -1);
+  }
+
+	/**
+	 * Return a new row resolver that is combination of left RR and right RR.
+	 * The schema will be schema of left, schema of right
+	 *
+	 * @param leftRR
+	 * @param rightRR
+	 * @return
+	 * @throws SemanticException
+	 */
+	public static RowResolver getCombinedRR(RowResolver leftRR,
+			RowResolver rightRR) throws SemanticException {
+		int outputColPos = 0;
+
+		RowResolver combinedRR = new RowResolver();
+		outputColPos = add(combinedRR, leftRR, outputColPos);
+		outputColPos = add(combinedRR, rightRR, outputColPos);
+
+		return combinedRR;
+	}
 }