You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/31 03:33:09 UTC

svn commit: r1635656 [1/5] - in /hive/branches/spark: itests/src/test/resources/ ql/src/java/org/apache/hadoop/hive/ql/lib/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/ ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/ ql/src/java/org/apache/h...

Author: xuefu
Date: Fri Oct 31 02:33:08 2014
New Revision: 1635656

URL: http://svn.apache.org/r1635656
Log:
HIVE-8202: Support SMB Join for Hive on Spark [Spark Branch] (Szehon via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_16.q.out
Modified:
    hive/branches/spark/itests/src/test/resources/testconfiguration.properties
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_smb_mapjoin_14.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_1.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_13.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_14.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_15.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_2.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_3.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_4.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_5.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_6.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_7.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_8.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_sortmerge_join_9.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket2.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket3.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/bucket4.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/disable_merge_for_bucketing.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/load_dyn_part2.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/parquet_join.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_17.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_20.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_21.q.out
    hive/branches/spark/ql/src/test/results/clientpositive/spark/smb_mapjoin_25.q.out

Modified: hive/branches/spark/itests/src/test/resources/testconfiguration.properties
URL: http://svn.apache.org/viewvc/hive/branches/spark/itests/src/test/resources/testconfiguration.properties?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/itests/src/test/resources/testconfiguration.properties (original)
+++ hive/branches/spark/itests/src/test/resources/testconfiguration.properties Fri Oct 31 02:33:08 2014
@@ -463,6 +463,7 @@ spark.query.files=add_part_multiple.q, \
   auto_sortmerge_join_13.q, \
   auto_sortmerge_join_14.q, \
   auto_sortmerge_join_15.q, \
+  auto_sortmerge_join_16.q, \
   auto_sortmerge_join_2.q, \
   auto_sortmerge_join_3.q, \
   auto_sortmerge_join_4.q, \

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java?rev=1635656&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lib/TypeRule.java Fri Oct 31 02:33:08 2014
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.lib;
+
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+
+import java.util.Stack;
+import java.util.regex.Matcher;
+
+/**
+ * Rule that matches a particular type of node.
+ */
+public class TypeRule implements Rule {
+
+  private Class nodeClass;
+
+  public TypeRule(Class<?> nodeClass) {
+    this.nodeClass = nodeClass;
+  }
+
+  @Override
+  public int cost(Stack<Node> stack) throws SemanticException {
+    if (stack == null) {
+      return -1;
+    }
+    if (nodeClass.isInstance(stack.peek())) {
+      return 1;
+    }
+    return -1;
+  }
+
+  @Override
+  public String getName() {
+    return nodeClass.getName();
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Oct 31 02:33:08 2014
@@ -111,7 +111,7 @@ public class Optimizer {
     // If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both
     // BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer
     if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN))
-        && !isTezExecEngine && !isSparkExecEngine) {
+        && !isTezExecEngine) {
       if (!bucketMapJoinOptimizer) {
         // No need to add BucketMapJoinOptimizer twice
         transformations.add(new BucketMapJoinOptimizer());

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java?rev=1635656&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkSortMergeJoinFactory.java Fri Oct 31 02:33:08 2014
@@ -0,0 +1,177 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.NodeProcessor;
+import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
+import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.spark.GenSparkProcContext;
+import org.apache.hadoop.hive.ql.plan.BucketMapJoinContext;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+/**
+* Operator factory for Spark SMBJoin processing.
+*/
+public final class SparkSortMergeJoinFactory {
+
+  private SparkSortMergeJoinFactory() {
+    // prevent instantiation
+  }
+
+  /**
+   * Get the branch on which we are invoked (walking) from.  See diagram below.
+   * We are at the SMBJoinOp and could have come from TS of any of the input tables.
+   */
+  public static int getPositionParent(SMBMapJoinOperator op,
+      Stack<Node> stack) {
+    int size = stack.size();
+    assert size >= 2 && stack.get(size - 1) == op;
+    Operator<? extends OperatorDesc> parent =
+        (Operator<? extends OperatorDesc>) stack.get(size - 2);
+    List<Operator<? extends OperatorDesc>> parOp = op.getParentOperators();
+    int pos = parOp.indexOf(parent);
+    return pos;
+  }
+
+  /**
+   * SortMergeMapJoin processor, input is a SMBJoinOp that is part of a MapWork:
+   *
+   *  MapWork:
+   *
+   *   (Big)   (Small)  (Small)
+   *    TS       TS       TS
+   *     \       |       /
+   *       \     DS     DS
+   *         \   |    /
+   *          SMBJoinOP
+   *
+   * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS.
+   * 2. Adds the bucketing information to the MapWork.
+   * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing to small-table's TS.
+   */
+  private static class SortMergeJoinProcessor implements NodeProcessor {
+
+    public static void setupBucketMapJoinInfo(MapWork plan, SMBMapJoinOperator currMapJoinOp) {
+      if (currMapJoinOp != null) {
+        Map<String, Map<String, List<String>>> aliasBucketFileNameMapping =
+            currMapJoinOp.getConf().getAliasBucketFileNameMapping();
+        if (aliasBucketFileNameMapping != null) {
+          MapredLocalWork localPlan = plan.getMapLocalWork();
+          if (localPlan == null) {
+            localPlan = currMapJoinOp.getConf().getLocalWork();
+          } else {
+            // local plan is not null, we want to merge it into SMBMapJoinOperator's local work
+            MapredLocalWork smbLocalWork = currMapJoinOp.getConf().getLocalWork();
+            if (smbLocalWork != null) {
+              localPlan.getAliasToFetchWork().putAll(smbLocalWork.getAliasToFetchWork());
+              localPlan.getAliasToWork().putAll(smbLocalWork.getAliasToWork());
+            }
+          }
+
+          if (localPlan == null) {
+            return;
+          }
+          plan.setMapLocalWork(null);
+          currMapJoinOp.getConf().setLocalWork(localPlan);
+
+          BucketMapJoinContext bucketMJCxt = new BucketMapJoinContext();
+          localPlan.setBucketMapjoinContext(bucketMJCxt);
+          bucketMJCxt.setAliasBucketFileNameMapping(aliasBucketFileNameMapping);
+          bucketMJCxt.setBucketFileNameMapping(
+              currMapJoinOp.getConf().getBigTableBucketNumMapping());
+          localPlan.setInputFileChangeSensitive(true);
+          bucketMJCxt.setMapJoinBigTableAlias(currMapJoinOp.getConf().getBigTableAlias());
+          bucketMJCxt
+              .setBucketMatcherClass(org.apache.hadoop.hive.ql.exec.DefaultBucketMatcher.class);
+          bucketMJCxt.setBigTablePartSpecToFileMapping(
+              currMapJoinOp.getConf().getBigTablePartSpecToFileMapping());
+
+          plan.setUseBucketizedHiveInputFormat(true);
+
+        }
+      }
+    }
+
+    /**
+     * Initialize the mapWork.
+     *
+     * @param opProcCtx
+     *          processing context
+     */
+    private static void initSMBJoinPlan(MapWork mapWork,
+                                        GenSparkProcContext opProcCtx, boolean local)
+            throws SemanticException {
+      TableScanOperator ts = (TableScanOperator) opProcCtx.currentRootOperator;
+      String currAliasId = findAliasId(opProcCtx, ts);
+      GenMapRedUtils.setMapWork(mapWork, opProcCtx.parseContext,
+         opProcCtx.inputs, null, ts, currAliasId, opProcCtx.conf, local);
+    }
+
+    private static String findAliasId(GenSparkProcContext opProcCtx, TableScanOperator ts) {
+      for (String alias : opProcCtx.topOps.keySet()) {
+        if (opProcCtx.topOps.get(alias) == ts) {
+          return alias;
+        }
+      }
+      return null;
+    }
+
+    /**
+     * 1. Initializes the MapWork's aliasToWork, pointing to big-table's TS.
+     * 2. Adds the bucketing information to the MapWork.
+     * 3. Adds localwork to the MapWork, with localWork's aliasToWork pointing to small-table's TS.
+     */
+    @Override
+    public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
+        Object... nodeOutputs) throws SemanticException {
+      SMBMapJoinOperator mapJoin = (SMBMapJoinOperator) nd;
+      GenSparkProcContext ctx = (GenSparkProcContext) procCtx;
+
+      SparkTask currTask = ctx.currentTask;
+
+      // find the branch on which this processor was invoked
+      int pos = getPositionParent(mapJoin, stack);
+      boolean local = pos != mapJoin.getConf().getPosBigTable();
+
+      MapWork mapWork = ctx.smbJoinWorkMap.get(mapJoin);
+      initSMBJoinPlan(mapWork, ctx, local);
+
+      // find the associated mapWork that contains this processor.
+      setupBucketMapJoinInfo(mapWork, mapJoin);
+
+      // local aliases need not to hand over context further
+      return false;
+    }
+  }
+
+  public static NodeProcessor getTableScanMapJoin() {
+    return new SortMergeJoinProcessor();
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkProcContext.java Fri Oct 31 02:33:08 2014
@@ -18,20 +18,13 @@
 
 package org.apache.hadoop.hive.ql.parse.spark;
 
-import java.io.Serializable;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.DependencyCollectionTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -49,6 +42,14 @@ import org.apache.hadoop.hive.ql.plan.Op
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 
+import java.io.Serializable;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * GenSparkProcContext maintains information about the tasks and operators
  * as we walk the operator tree to break them into SparkTasks.
@@ -100,6 +101,9 @@ public class GenSparkProcContext impleme
   // map that says which mapjoin belongs to which work item
   public final Map<MapJoinOperator, List<BaseWork>> mapJoinWorkMap;
 
+  // a map to keep track of which MapWork item holds which SMBMapJoinOp
+  public final Map<SMBMapJoinOperator, MapWork> smbJoinWorkMap;
+
   // a map to keep track of which root generated which work
   public final Map<Operator<?>, BaseWork> rootToWorkMap;
 
@@ -134,13 +138,6 @@ public class GenSparkProcContext impleme
   // This is necessary as sometimes semantic analyzer's mapping is different than operator's own alias.
   public final Map<String, Operator<? extends OperatorDesc>> topOps;
 
-  // Keep track of the current table alias (from last TableScan)
-  public String currentAliasId;
-
-  // Keep track of the current Table-Scan.
-  public TableScanOperator currentTs;
-
-
   @SuppressWarnings("unchecked")
   public GenSparkProcContext(HiveConf conf, ParseContext parseContext,
       List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -158,6 +155,7 @@ public class GenSparkProcContext impleme
     this.leafOperatorToFollowingWork = new LinkedHashMap<Operator<?>, BaseWork>();
     this.linkOpWithWorkMap = new LinkedHashMap<Operator<?>, Map<BaseWork, SparkEdgeProperty>>();
     this.linkWorkWithReduceSinkMap = new LinkedHashMap<BaseWork, List<ReduceSinkOperator>>();
+    this.smbJoinWorkMap = new LinkedHashMap<SMBMapJoinOperator, MapWork>();
     this.mapJoinWorkMap = new LinkedHashMap<MapJoinOperator, List<BaseWork>>();
     this.rootToWorkMap = new LinkedHashMap<Operator<?>, BaseWork>();
     this.childToWorkMap = new LinkedHashMap<Operator<?>, List<BaseWork>>();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java Fri Oct 31 02:33:08 2014
@@ -18,21 +18,14 @@
 
 package org.apache.hadoop.hive.ql.parse.spark;
 
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.FetchTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
@@ -49,15 +42,21 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * GenSparkUtils is a collection of shared helper methods to produce SparkWork
@@ -94,7 +93,7 @@ public class GenSparkUtils {
     return unionWork;
   }
 
-  public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root, SparkWork sparkWork) {
+  public ReduceWork createReduceWork(GenSparkProcContext context, Operator<?> root, SparkWork sparkWork) throws SemanticException {
     Preconditions.checkArgument(!root.getParentOperators().isEmpty(),
         "AssertionError: expected root.getParentOperators() to be non-empty");
 
@@ -128,10 +127,19 @@ public class GenSparkUtils {
     }
 
     if (reduceWork.getReducer() instanceof JoinOperator) {
-      //reduce-side join
+      //reduce-side join, use MR-style shuffle
       edgeProp.setMRShuffle();
     }
 
+    //If its a FileSink to bucketed files, also use MR-style shuffle to get compatible taskId for bucket-name
+    FileSinkOperator fso = getChildOperator(reduceWork.getReducer(), FileSinkOperator.class);
+    if (fso != null) {
+      String bucketCount = fso.getConf().getTableInfo().getProperties().getProperty(hive_metastoreConstants.BUCKET_COUNT);
+      if (bucketCount != null && Integer.valueOf(bucketCount) > 1) {
+        edgeProp.setMRShuffle();
+      }
+    }
+
     sparkWork.connect(
         context.preceedingWork,
         reduceWork, edgeProp);
@@ -158,7 +166,12 @@ public class GenSparkUtils {
   }
 
   public MapWork createMapWork(GenSparkProcContext context, Operator<?> root,
-      SparkWork sparkWork, PrunedPartitionList partitions) throws SemanticException {
+    SparkWork sparkWork, PrunedPartitionList partitions) throws SemanticException {
+    return createMapWork(context, root, sparkWork, partitions, false);
+  }
+
+  public MapWork createMapWork(GenSparkProcContext context, Operator<?> root,
+      SparkWork sparkWork, PrunedPartitionList partitions, boolean deferSetup) throws SemanticException {
     Preconditions.checkArgument(root.getParentOperators().isEmpty(),
         "AssertionError: expected root.getParentOperators() to be empty");
     MapWork mapWork = new MapWork("Map "+ (++sequenceNumber));
@@ -170,7 +183,9 @@ public class GenSparkUtils {
             root.getClass().getName());
     String alias = ((TableScanOperator)root).getConf().getAlias();
 
-    setupMapWork(mapWork, context, partitions, root, alias);
+    if (!deferSetup) {
+      setupMapWork(mapWork, context, partitions, root, alias);
+    }
 
     // add new item to the Spark work
     sparkWork.add(mapWork);
@@ -322,27 +337,17 @@ public class GenSparkUtils {
     return true;
   }
 
-
-  /**
-   * Is an operator of the given class a child of the given operator.  This is more flexible
-   * than GraphWalker to tell apart subclasses such as SMBMapJoinOp vs MapJoinOp that have a common name.
-   * @param op parent operator to start search
-   * @param klazz given class
-   * @return
-   * @throws SemanticException
-   */
-  public static Operator<?> getChildOperator(Operator<?> op, Class klazz) throws SemanticException {
+  public static <T> T getChildOperator(Operator<?> op, Class<T> klazz) throws SemanticException {
     if (klazz.isInstance(op)) {
-      return op;
+      return (T) op;
     }
     List<Operator<?>> childOperators = op.getChildOperators();
     for (Operator<?> childOp : childOperators) {
-      Operator result = getChildOperator(childOp, klazz);
+      T result = getChildOperator(childOp, klazz);
       if (result != null) {
         return result;
       }
     }
     return null;
   }
-
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkWork.java Fri Oct 31 02:33:08 2014
@@ -18,36 +18,40 @@
 
 package org.apache.hadoop.hive.ql.parse.spark;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Stack;
-
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.optimizer.GenMapRedUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.UnionWork;
 
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Stack;
 
 /**
  * GenSparkWork separates the operator tree into spark tasks.
@@ -102,6 +106,26 @@ public class GenSparkWork implements Nod
 
     SparkWork sparkWork = context.currentTask.getWork();
 
+
+    if (GenSparkUtils.getChildOperator(root, DummyStoreOperator.class) != null) {
+      /*
+       *  SMB join case:
+       *
+       *   (Big)   (Small)  (Small)
+       *    TS       TS       TS
+       *     \       |       /
+       *       \     DS     DS
+       *         \   |    /
+       *          SMBJoinOP
+       *
+       * Only create MapWork rooted at TS of big table.
+       * If there are dummy-store operators anywhere in TS's children path, then this is for the small tables.
+       * No separate Map-Task need to be created for small table TS, as they will be read by the MapWork of the big-table.
+       */
+      return null;
+    }
+    SMBMapJoinOperator smbOp = (SMBMapJoinOperator) GenSparkUtils.getChildOperator(root, SMBMapJoinOperator.class);
+
     // Right now the work graph is pretty simple. If there is no
     // Preceding work we have a root and will generate a map
     // vertex. If there is a preceding work we will generate
@@ -119,7 +143,18 @@ public class GenSparkWork implements Nod
     } else {
       // create a new vertex
       if (context.preceedingWork == null) {
-        work = utils.createMapWork(context, root, sparkWork, null);
+        if (smbOp != null) {
+          //This logic is for SortMergeBucket MapJoin case.
+          //This MapWork (of big-table, see above..) is later initialized by SparkMapJoinFactory processor, so don't initialize it here.
+          //Just keep track of it in the context, for later processing.
+          work = utils.createMapWork(context, root, sparkWork, null, true);
+          if (context.smbJoinWorkMap.get(smbOp) != null) {
+            throw new SemanticException("Each SMBMapJoin should be associated only with one Mapwork");
+          }
+          context.smbJoinWorkMap.put(smbOp, (MapWork) work);
+        } else {
+          work = utils.createMapWork(context, root, sparkWork, null);
+        }
       } else {
         work = utils.createReduceWork(context, root, sparkWork);
       }
@@ -284,9 +319,17 @@ public class GenSparkWork implements Nod
           edgeProp.setShuffleSort();
         }
         if (rWork.getReducer() instanceof JoinOperator) {
-          //reduce-side join
+          //reduce-side join, use MR-style shuffle
           edgeProp.setMRShuffle();
         }
+        //If its a FileSink to bucketed files, also use MR-style shuffle to get compatible taskId for bucket-name
+        FileSinkOperator fso = GenSparkUtils.getChildOperator(rWork.getReducer(), FileSinkOperator.class);
+        if (fso != null) {
+          String bucketCount = fso.getConf().getTableInfo().getProperties().getProperty(hive_metastoreConstants.BUCKET_COUNT);
+          if (bucketCount != null && Integer.valueOf(bucketCount) > 1) {
+            edgeProp.setMRShuffle();
+          }
+        }
         sparkWork.connect(work, rWork, edgeProp);
         context.connectedReduceSinks.add(rs);
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java Fri Oct 31 02:33:08 2014
@@ -17,27 +17,15 @@
  */
 package org.apache.hadoop.hive.ql.parse.spark;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Stack;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.ConditionalTask;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
@@ -54,6 +42,7 @@ import org.apache.hadoop.hive.ql.lib.Nod
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.lib.Rule;
 import org.apache.hadoop.hive.ql.lib.RuleRegExp;
+import org.apache.hadoop.hive.ql.lib.TypeRule;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.optimizer.physical.CrossProductCheck;
 import org.apache.hadoop.hive.ql.optimizer.physical.NullScanOptimizer;
@@ -61,8 +50,7 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
 import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
 import org.apache.hadoop.hive.ql.optimizer.spark.SetSparkReducerParallelism;
-import org.apache.hadoop.hive.ql.optimizer.spark.SparkMapJoinOptimizer;
-import org.apache.hadoop.hive.ql.optimizer.spark.SparkReduceSinkMapJoinProc;
+import org.apache.hadoop.hive.ql.optimizer.spark.SparkSortMergeJoinFactory;
 import org.apache.hadoop.hive.ql.parse.GlobalLimitCtx;
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -73,6 +61,17 @@ import org.apache.hadoop.hive.ql.plan.Mo
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
 /**
  * SparkCompiler translates the operator plan into SparkTasks.
  *
@@ -181,6 +180,20 @@ public class SparkCompiler extends TaskC
     GraphWalker ogw = new GenSparkWorkWalker(disp, procCtx);
     ogw.startWalking(topNodes, null);
 
+
+
+    // ------------------- Second Pass -----------------------
+    // SMB Join optimizations to add the "localWork" and bucketing data structures to MapWork.
+    opRules.clear();
+    opRules.put(new TypeRule(SMBMapJoinOperator.class),
+       SparkSortMergeJoinFactory.getTableScanMapJoin());
+
+    disp = new DefaultRuleDispatcher(null, opRules, procCtx);
+    topNodes = new ArrayList<Node>();
+    topNodes.addAll(pCtx.getTopOps().values());
+    ogw = new GenSparkWorkWalker(disp, procCtx);
+    ogw.startWalking(topNodes, null);
+
     // we need to clone some operator plans and remove union operators still
     for (BaseWork w: procCtx.workWithUnionOperators) {
       GenSparkUtils.getUtils().removeUnionOperators(conf, procCtx, w);

Modified: hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out?rev=1635656&r1=1635655&r2=1635656&view=diff
==============================================================================
--- hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out (original)
+++ hive/branches/spark/ql/src/test/results/clientpositive/spark/auto_join32.q.out Fri Oct 31 02:33:08 2014
@@ -168,77 +168,51 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 1), Map 4 (GROUP PARTITION-LEVEL SORT, 1)
-        Reducer 3 <- Reducer 2 (GROUP SORT, 1)
+        Reducer 2 <- Map 1 (GROUP SORT, 1)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
             Map Operator Tree:
                 TableScan
-                  alias: v
-                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                  Filter Operator
-                    predicate: name is not null (type: boolean)
-                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: name (type: string)
-                      sort order: +
-                      Map-reduce partition columns: name (type: string)
-                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                      value expressions: registration (type: string)
-        Map 4 
-            Map Operator Tree:
-                TableScan
                   alias: s
                   Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                   Filter Operator
                     predicate: name is not null (type: boolean)
                     Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: name (type: string)
-                      sort order: +
-                      Map-reduce partition columns: name (type: string)
-                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+                    Sorted Merge Bucket Map Join Operator
+                      condition map:
+                           Inner Join 0 to 1
+                      condition expressions:
+                        0 {name}
+                        1 {registration}
+                      keys:
+                        0 name (type: string)
+                        1 name (type: string)
+                      outputColumnNames: _col0, _col8
+                      Select Operator
+                        expressions: _col0 (type: string), _col8 (type: string)
+                        outputColumnNames: _col0, _col8
+                        Group By Operator
+                          aggregations: count(DISTINCT _col8)
+                          keys: _col0 (type: string), _col8 (type: string)
+                          mode: hash
+                          outputColumnNames: _col0, _col1, _col2
+                          Reduce Output Operator
+                            key expressions: _col0 (type: string), _col1 (type: string)
+                            sort order: ++
+                            Map-reduce partition columns: _col0 (type: string)
         Reducer 2 
             Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                condition expressions:
-                  0 {KEY.reducesinkkey0}
-                  1 {VALUE._col1}
-                outputColumnNames: _col0, _col8
-                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                Select Operator
-                  expressions: _col0 (type: string), _col8 (type: string)
-                  outputColumnNames: _col0, _col8
-                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                  Group By Operator
-                    aggregations: count(DISTINCT _col8)
-                    keys: _col0 (type: string), _col8 (type: string)
-                    mode: hash
-                    outputColumnNames: _col0, _col1, _col2
-                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: string), _col1 (type: string)
-                      sort order: ++
-                      Map-reduce partition columns: _col0 (type: string)
-                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-        Reducer 3 
-            Reduce Operator Tree:
               Group By Operator
                 aggregations: count(DISTINCT KEY._col1:0._col0)
                 keys: KEY._col0 (type: string)
                 mode: mergepartial
                 outputColumnNames: _col0, _col1
-                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                 Select Operator
                   expressions: _col0 (type: string), _col1 (type: bigint)
                   outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                   File Output Operator
                     compressed: false
-                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                     table:
                         input format: org.apache.hadoop.mapred.TextInputFormat
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
@@ -316,77 +290,51 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 1), Map 4 (GROUP PARTITION-LEVEL SORT, 1)
-        Reducer 3 <- Reducer 2 (GROUP SORT, 1)
+        Reducer 2 <- Map 1 (GROUP SORT, 1)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
             Map Operator Tree:
                 TableScan
-                  alias: v
-                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                  Filter Operator
-                    predicate: name is not null (type: boolean)
-                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: name (type: string)
-                      sort order: +
-                      Map-reduce partition columns: name (type: string)
-                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                      value expressions: registration (type: string)
-        Map 4 
-            Map Operator Tree:
-                TableScan
                   alias: s
                   Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                   Filter Operator
                     predicate: name is not null (type: boolean)
                     Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: name (type: string)
-                      sort order: +
-                      Map-reduce partition columns: name (type: string)
-                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
+                    Sorted Merge Bucket Map Join Operator
+                      condition map:
+                           Inner Join 0 to 1
+                      condition expressions:
+                        0 {name}
+                        1 {registration}
+                      keys:
+                        0 name (type: string)
+                        1 name (type: string)
+                      outputColumnNames: _col0, _col8
+                      Select Operator
+                        expressions: _col0 (type: string), _col8 (type: string)
+                        outputColumnNames: _col0, _col8
+                        Group By Operator
+                          aggregations: count(DISTINCT _col8)
+                          keys: _col0 (type: string), _col8 (type: string)
+                          mode: hash
+                          outputColumnNames: _col0, _col1, _col2
+                          Reduce Output Operator
+                            key expressions: _col0 (type: string), _col1 (type: string)
+                            sort order: ++
+                            Map-reduce partition columns: _col0 (type: string)
         Reducer 2 
             Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                condition expressions:
-                  0 {KEY.reducesinkkey0}
-                  1 {VALUE._col1}
-                outputColumnNames: _col0, _col8
-                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                Select Operator
-                  expressions: _col0 (type: string), _col8 (type: string)
-                  outputColumnNames: _col0, _col8
-                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                  Group By Operator
-                    aggregations: count(DISTINCT _col8)
-                    keys: _col0 (type: string), _col8 (type: string)
-                    mode: hash
-                    outputColumnNames: _col0, _col1, _col2
-                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: string), _col1 (type: string)
-                      sort order: ++
-                      Map-reduce partition columns: _col0 (type: string)
-                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-        Reducer 3 
-            Reduce Operator Tree:
               Group By Operator
                 aggregations: count(DISTINCT KEY._col1:0._col0)
                 keys: KEY._col0 (type: string)
                 mode: mergepartial
                 outputColumnNames: _col0, _col1
-                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                 Select Operator
                   expressions: _col0 (type: string), _col1 (type: bigint)
                   outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                   File Output Operator
                     compressed: false
-                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                     table:
                         input format: org.apache.hadoop.mapred.TextInputFormat
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
@@ -488,52 +436,22 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (GROUP PARTITION-LEVEL SORT, 1), Map 4 (GROUP PARTITION-LEVEL SORT, 1)
-        Reducer 3 <- Reducer 2 (GROUP SORT, 1)
+        Reducer 2 <- Map 1 (GROUP SORT, 1)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
-        Map 4 
         Reducer 2 
             Reduce Operator Tree:
-              Join Operator
-                condition map:
-                     Inner Join 0 to 1
-                condition expressions:
-                  0 {KEY.reducesinkkey0}
-                  1 {VALUE._col1}
-                outputColumnNames: _col0, _col9
-                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                Select Operator
-                  expressions: _col0 (type: string), _col9 (type: string)
-                  outputColumnNames: _col0, _col9
-                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                  Group By Operator
-                    aggregations: count(DISTINCT _col9)
-                    keys: _col0 (type: string), _col9 (type: string)
-                    mode: hash
-                    outputColumnNames: _col0, _col1, _col2
-                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: string), _col1 (type: string)
-                      sort order: ++
-                      Map-reduce partition columns: _col0 (type: string)
-                      Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
-        Reducer 3 
-            Reduce Operator Tree:
               Group By Operator
                 aggregations: count(DISTINCT KEY._col1:0._col0)
                 keys: KEY._col0 (type: string)
                 mode: mergepartial
                 outputColumnNames: _col0, _col1
-                Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                 Select Operator
                   expressions: _col0 (type: string), _col1 (type: bigint)
                   outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                   File Output Operator
                     compressed: false
-                    Statistics: Num rows: 0 Data size: 0 Basic stats: NONE Column stats: NONE
                     table:
                         input format: org.apache.hadoop.mapred.TextInputFormat
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat