You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2015/08/02 05:19:29 UTC

[2/2] hive git commit: HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)

HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)

Conflicts:

	ql/src/test/results/clientpositive/spark/groupby10.q.out
	ql/src/test/results/clientpositive/spark/groupby8.q.out
	ql/src/test/results/clientpositive/spark/groupby8_map_skew.q.out
	ql/src/test/results/clientpositive/spark/union10.q.out
	ql/src/test/results/clientpositive/spark/union11.q.out
	ql/src/test/results/clientpositive/spark/union20.q.out
	ql/src/test/results/clientpositive/spark/union28.q.out
	ql/src/test/results/clientpositive/spark/union30.q.out
	ql/src/test/results/clientpositive/spark/union4.q.out
	ql/src/test/results/clientpositive/spark/union5.q.out
	ql/src/test/results/clientpositive/spark/union_remove_1.q.out
	ql/src/test/results/clientpositive/spark/union_remove_15.q.out
	ql/src/test/results/clientpositive/spark/union_remove_16.q.out
	ql/src/test/results/clientpositive/spark/union_remove_18.q.out
	ql/src/test/results/clientpositive/spark/union_remove_19.q.out
	ql/src/test/results/clientpositive/spark/union_remove_20.q.out
	ql/src/test/results/clientpositive/spark/union_remove_21.q.out
	ql/src/test/results/clientpositive/spark/union_remove_22.q.out
	ql/src/test/results/clientpositive/spark/union_remove_24.q.out
	ql/src/test/results/clientpositive/spark/union_remove_25.q.out
	ql/src/test/results/clientpositive/spark/union_remove_4.q.out
	ql/src/test/results/clientpositive/spark/union_remove_6_subq.q.out
	ql/src/test/results/clientpositive/spark/union_remove_7.q.out


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9e13be3e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9e13be3e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9e13be3e

Branch: refs/heads/branch-1
Commit: 9e13be3e3314954991d0da0380c99b3156b8e4b7
Parents: 182c7f4
Author: Xuefu Zhang <xz...@Cloudera.com>
Authored: Wed Jun 24 06:43:26 2015 -0700
Committer: xzhang <xz...@xzdt>
Committed: Sat Aug 1 20:17:46 2015 -0700

----------------------------------------------------------------------
 .../ql/optimizer/OperatorComparatorFactory.java | 492 +++++++++++++++++++
 .../spark/CombineEquivalentWorkResolver.java    | 302 ++++++++++++
 .../hive/ql/parse/spark/SparkCompiler.java      |   3 +
 .../hadoop/hive/ql/plan/JoinCondDesc.java       |  14 +
 .../clientpositive/spark/auto_join30.q.out      |  51 +-
 .../spark/auto_smb_mapjoin_14.q.out             |  30 +-
 .../clientpositive/spark/groupby10.q.out        |  20 +
 .../clientpositive/spark/groupby7_map.q.out     |  23 +-
 .../spark/groupby7_map_skew.q.out               |  38 +-
 .../clientpositive/spark/groupby7_noskew.q.out  |  17 +-
 .../groupby7_noskew_multi_single_reducer.q.out  |  18 +-
 .../results/clientpositive/spark/groupby8.q.out |  36 ++
 .../spark/groupby8_map_skew.q.out               |  22 +-
 .../clientpositive/spark/insert_into3.q.out     |  33 +-
 .../results/clientpositive/spark/join22.q.out   |  19 +-
 .../clientpositive/spark/skewjoinopt11.q.out    |  60 +--
 .../results/clientpositive/spark/union28.q.out  |  23 +-
 .../results/clientpositive/spark/union3.q.out   |  45 +-
 .../results/clientpositive/spark/union30.q.out  |  23 +-
 .../clientpositive/spark/union_remove_1.q.out   |  23 +-
 .../clientpositive/spark/union_remove_15.q.out  |  23 +-
 .../clientpositive/spark/union_remove_16.q.out  |  23 +-
 .../clientpositive/spark/union_remove_18.q.out  |  23 +-
 .../clientpositive/spark/union_remove_19.q.out  |  72 +--
 .../clientpositive/spark/union_remove_20.q.out  |  23 +-
 .../clientpositive/spark/union_remove_21.q.out  |  23 +-
 .../clientpositive/spark/union_remove_22.q.out  |  46 +-
 .../clientpositive/spark/union_remove_24.q.out  |  23 +-
 .../clientpositive/spark/union_remove_25.q.out  |  59 +--
 .../clientpositive/spark/union_remove_4.q.out   |  23 +-
 .../clientpositive/spark/union_remove_6.q.out   |  23 +-
 .../spark/union_remove_6_subq.q.out             |  64 +--
 .../clientpositive/spark/union_remove_7.q.out   |  23 +-
 .../clientpositive/spark/union_top_level.q.out  |  59 +--
 34 files changed, 908 insertions(+), 891 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java
new file mode 100644
index 0000000..c6a43d9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/OperatorComparatorFactory.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer;
+
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hive.ql.exec.CollectOperator;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.DemuxOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.FilterOperator;
+import org.apache.hadoop.hive.ql.exec.ForwardOperator;
+import org.apache.hadoop.hive.ql.exec.GroupByOperator;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
+import org.apache.hadoop.hive.ql.exec.JoinOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator;
+import org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
+import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.MuxOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.PTFOperator;
+import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.ScriptOperator;
+import org.apache.hadoop.hive.ql.exec.SelectOperator;
+import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.TemporaryHashSinkOperator;
+import org.apache.hadoop.hive.ql.exec.UDTFOperator;
+import org.apache.hadoop.hive.ql.exec.UnionOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorLimitOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.FilterDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableSinkDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
+import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
+import org.apache.hadoop.hive.ql.plan.LimitDesc;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
+import org.apache.hadoop.hive.ql.plan.ScriptDesc;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.ql.plan.UDTFDesc;
+
+public class OperatorComparatorFactory {
+  private static final Map<Class<?>, OperatorComparator> comparatorMapping = Maps.newHashMap();
+
+  static {
+    comparatorMapping.put(TableScanOperator.class, new TableScanOperatorComparator());
+    comparatorMapping.put(SelectOperator.class, new SelectOperatorComparator());
+    comparatorMapping.put(FilterOperator.class, new FilterOperatorComparator());
+    comparatorMapping.put(GroupByOperator.class, new GroupByOperatorComparator());
+    comparatorMapping.put(ReduceSinkOperator.class, new ReduceSinkOperatorComparator());
+    comparatorMapping.put(FileSinkOperator.class, new FileSinkOperatorComparator());
+    comparatorMapping.put(JoinOperator.class, new JoinOperatorComparator());
+    comparatorMapping.put(MapJoinOperator.class, new MapJoinOperatorComparator());
+    comparatorMapping.put(SMBMapJoinOperator.class, new SMBMapJoinOperatorComparator());
+    comparatorMapping.put(LimitOperator.class, new LimitOperatorComparator());
+    comparatorMapping.put(SparkHashTableSinkOperator.class, new SparkHashTableSinkOperatorComparator());
+    comparatorMapping.put(LateralViewJoinOperator.class, new LateralViewJoinOperatorComparator());
+    comparatorMapping.put(VectorGroupByOperator.class, new GroupByOperatorComparator());
+    comparatorMapping.put(CommonMergeJoinOperator.class, new MapJoinOperatorComparator());
+    comparatorMapping.put(VectorFilterOperator.class, new FilterOperatorComparator());
+    comparatorMapping.put(UDTFOperator.class, new UDTFOperatorComparator());
+    comparatorMapping.put(VectorSelectOperator.class, new SelectOperatorComparator());
+    comparatorMapping.put(VectorLimitOperator.class, new LimitOperatorComparator());
+    comparatorMapping.put(ScriptOperator.class, new ScriptOperatorComparator());
+    comparatorMapping.put(TemporaryHashSinkOperator.class, new HashTableSinkOperatorComparator());
+    // these operators does not have state, so they always equal with the same kind.
+    comparatorMapping.put(UnionOperator.class, new AlwaysTrueOperatorComparator());
+    comparatorMapping.put(ForwardOperator.class, new AlwaysTrueOperatorComparator());
+    comparatorMapping.put(LateralViewForwardOperator.class, new AlwaysTrueOperatorComparator());
+    comparatorMapping.put(DemuxOperator.class, new AlwaysTrueOperatorComparator());
+    comparatorMapping.put(MuxOperator.class, new AlwaysTrueOperatorComparator());
+    comparatorMapping.put(ListSinkOperator.class, new AlwaysTrueOperatorComparator());
+    comparatorMapping.put(CollectOperator.class, new AlwaysTrueOperatorComparator());
+    // do not support PTFOperator comparing now.
+    comparatorMapping.put(PTFOperator.class, new AlwaysFalseOperatorComparator());
+  }
+
+  public static OperatorComparator getOperatorComparator(Class<? extends Operator> operatorClass) {
+    OperatorComparator operatorComparator = comparatorMapping.get(operatorClass);
+    if (operatorComparator == null) {
+      throw new RuntimeException("No OperatorComparator is registered for " + operatorClass.getName() + "yet.");
+    }
+
+    return operatorComparator;
+  }
+
+  public interface OperatorComparator<T extends Operator<?>> {
+    boolean equals(T op1, T op2);
+  }
+
+  static class AlwaysTrueOperatorComparator implements OperatorComparator<Operator<?>> {
+
+    @Override
+    public boolean equals(Operator<?> op1, Operator<?> op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      return true;
+    }
+  }
+
+  static class AlwaysFalseOperatorComparator implements OperatorComparator<Operator<?>> {
+
+    @Override
+    public boolean equals(Operator<?> op1, Operator<?> op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      return false;
+    }
+  }
+
+  static class TableScanOperatorComparator implements OperatorComparator<TableScanOperator> {
+
+    @Override
+    public boolean equals(TableScanOperator op1, TableScanOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      TableScanDesc op1Conf = op1.getConf();
+      TableScanDesc op2Conf = op2.getConf();
+
+      if (compareString(op1Conf.getAlias(), op2Conf.getAlias()) &&
+        compareExprNodeDesc(op1Conf.getFilterExpr(), op2Conf.getFilterExpr()) &&
+        op1Conf.getRowLimit() == op2Conf.getRowLimit() &&
+        op1Conf.isGatherStats() == op2Conf.isGatherStats()) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class SelectOperatorComparator implements OperatorComparator<SelectOperator> {
+
+    @Override
+    public boolean equals(SelectOperator op1, SelectOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      SelectDesc op1Conf = op1.getConf();
+      SelectDesc op2Conf = op2.getConf();
+
+      if (compareString(op1Conf.getColListString(), op2Conf.getColListString()) &&
+        compareObject(op1Conf.getOutputColumnNames(), op2Conf.getOutputColumnNames()) &&
+        compareString(op1Conf.explainNoCompute(), op2Conf.explainNoCompute())) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class FilterOperatorComparator implements OperatorComparator<FilterOperator> {
+
+    @Override
+    public boolean equals(FilterOperator op1, FilterOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      FilterDesc op1Conf = op1.getConf();
+      FilterDesc op2Conf = op2.getConf();
+
+      if (compareString(op1Conf.getPredicateString(), op2Conf.getPredicateString()) &&
+        (op1Conf.getIsSamplingPred() == op2Conf.getIsSamplingPred()) &&
+        compareString(op1Conf.getSampleDescExpr(), op2Conf.getSampleDescExpr())) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class GroupByOperatorComparator implements OperatorComparator<GroupByOperator> {
+
+    @Override
+    public boolean equals(GroupByOperator op1, GroupByOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      GroupByDesc op1Conf = op1.getConf();
+      GroupByDesc op2Conf = op2.getConf();
+
+      if (compareString(op1Conf.getModeString(), op2Conf.getModeString()) &&
+        compareString(op1Conf.getKeyString(), op2Conf.getKeyString()) &&
+        compareObject(op1Conf.getOutputColumnNames(), op2Conf.getOutputColumnNames()) &&
+        op1Conf.pruneGroupingSetId() == op2Conf.pruneGroupingSetId() &&
+        compareObject(op1Conf.getAggregatorStrings(), op2Conf.getAggregatorStrings()) &&
+        op1Conf.getBucketGroup() == op2Conf.getBucketGroup()) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class ReduceSinkOperatorComparator implements OperatorComparator<ReduceSinkOperator> {
+
+    @Override
+    public boolean equals(ReduceSinkOperator op1, ReduceSinkOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      ReduceSinkDesc op1Conf = op1.getConf();
+      ReduceSinkDesc op2Conf = op2.getConf();
+
+      if (compareExprNodeDescList(op1Conf.getKeyCols(), op2Conf.getKeyCols()) &&
+        compareExprNodeDescList(op1Conf.getValueCols(), op2Conf.getValueCols()) &&
+        compareExprNodeDescList(op1Conf.getPartitionCols(), op2Conf.getPartitionCols()) &&
+        op1Conf.getTag() == op2Conf.getTag() &&
+        compareString(op1Conf.getOrder(), op2Conf.getOrder()) &&
+        op1Conf.getTopN() == op2Conf.getTopN() &&
+        op1Conf.isAutoParallel() == op2Conf.isAutoParallel()) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class FileSinkOperatorComparator implements OperatorComparator<FileSinkOperator> {
+
+    @Override
+    public boolean equals(FileSinkOperator op1, FileSinkOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      FileSinkDesc op1Conf = op1.getConf();
+      FileSinkDesc op2Conf = op2.getConf();
+
+      if (compareObject(op1Conf.getDirName(), op2Conf.getDirName()) &&
+        compareObject(op1Conf.getTableInfo(), op2Conf.getTableInfo()) &&
+        op1Conf.getCompressed() == op2Conf.getCompressed() &&
+        op1Conf.getDestTableId() == op2Conf.getDestTableId() &&
+        op1Conf.isMultiFileSpray() == op2Conf.isMultiFileSpray() &&
+        op1Conf.getTotalFiles() == op2Conf.getTotalFiles() &&
+        op1Conf.getNumFiles() == op2Conf.getNumFiles() &&
+        compareString(op1Conf.getStaticSpec(), op2Conf.getStaticSpec()) &&
+        op1Conf.isGatherStats() == op2Conf.isGatherStats() &&
+        compareString(op1Conf.getStatsAggPrefix(), op2Conf.getStatsAggPrefix())) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class JoinOperatorComparator implements OperatorComparator<JoinOperator> {
+
+    @Override
+    public boolean equals(JoinOperator op1, JoinOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      JoinDesc desc1 = op1.getConf();
+      JoinDesc desc2 = op2.getConf();
+
+      if (compareObject(desc1.getKeysString(), desc2.getKeysString()) &&
+        compareObject(desc1.getFiltersStringMap(), desc2.getFiltersStringMap()) &&
+        compareObject(desc1.getOutputColumnNames(), desc2.getOutputColumnNames()) &&
+        compareObject(desc1.getCondsList(), desc2.getCondsList()) &&
+        desc1.getHandleSkewJoin() == desc2.getHandleSkewJoin() &&
+        compareString(desc1.getNullSafeString(), desc2.getNullSafeString())) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class MapJoinOperatorComparator implements OperatorComparator<MapJoinOperator> {
+
+    @Override
+    public boolean equals(MapJoinOperator op1, MapJoinOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      MapJoinDesc desc1 = op1.getConf();
+      MapJoinDesc desc2 = op2.getConf();
+
+      if (compareObject(desc1.getParentToInput(), desc2.getParentToInput()) &&
+        compareString(desc1.getKeyCountsExplainDesc(), desc2.getKeyCountsExplainDesc()) &&
+        compareObject(desc1.getKeysString(), desc2.getKeysString()) &&
+        desc1.getPosBigTable() == desc2.getPosBigTable() &&
+        desc1.isBucketMapJoin() == desc2.isBucketMapJoin() &&
+        compareObject(desc1.getKeysString(), desc2.getKeysString()) &&
+        compareObject(desc1.getFiltersStringMap(), desc2.getFiltersStringMap()) &&
+        compareObject(desc1.getOutputColumnNames(), desc2.getOutputColumnNames()) &&
+        compareObject(desc1.getCondsList(), desc2.getCondsList()) &&
+        desc1.getHandleSkewJoin() == desc2.getHandleSkewJoin() &&
+        compareString(desc1.getNullSafeString(), desc2.getNullSafeString())) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class SMBMapJoinOperatorComparator implements OperatorComparator<SMBMapJoinOperator> {
+
+    @Override
+    public boolean equals(SMBMapJoinOperator op1, SMBMapJoinOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      SMBJoinDesc desc1 = op1.getConf();
+      SMBJoinDesc desc2 = op2.getConf();
+
+      if (compareObject(desc1.getParentToInput(), desc2.getParentToInput()) &&
+        compareString(desc1.getKeyCountsExplainDesc(), desc2.getKeyCountsExplainDesc()) &&
+        compareObject(desc1.getKeysString(), desc2.getKeysString()) &&
+        desc1.getPosBigTable() == desc2.getPosBigTable() &&
+        desc1.isBucketMapJoin() == desc2.isBucketMapJoin() &&
+        compareObject(desc1.getKeysString(), desc2.getKeysString()) &&
+        compareObject(desc1.getFiltersStringMap(), desc2.getFiltersStringMap()) &&
+        compareObject(desc1.getOutputColumnNames(), desc2.getOutputColumnNames()) &&
+        compareObject(desc1.getCondsList(), desc2.getCondsList()) &&
+        desc1.getHandleSkewJoin() == desc2.getHandleSkewJoin() &&
+        compareString(desc1.getNullSafeString(), desc2.getNullSafeString())) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class LimitOperatorComparator implements OperatorComparator<LimitOperator> {
+
+    @Override
+    public boolean equals(LimitOperator op1, LimitOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      LimitDesc desc1 = op1.getConf();
+      LimitDesc desc2 = op2.getConf();
+
+      return desc1.getLimit() == desc2.getLimit();
+    }
+  }
+
+  static class SparkHashTableSinkOperatorComparator implements OperatorComparator<SparkHashTableSinkOperator> {
+
+    @Override
+    public boolean equals(SparkHashTableSinkOperator op1, SparkHashTableSinkOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      SparkHashTableSinkDesc desc1 = op1.getConf();
+      SparkHashTableSinkDesc desc2 = op2.getConf();
+
+      if (compareObject(desc1.getFilterMapString(), desc2.getFilterMapString()) &&
+        compareObject(desc1.getKeysString(), desc2.getKeysString()) &&
+        desc1.getPosBigTable() == desc2.getPosBigTable() &&
+        compareObject(desc1.getKeysString(), desc2.getKeysString()) &&
+        compareObject(desc1.getFiltersStringMap(), desc2.getFiltersStringMap()) &&
+        compareObject(desc1.getOutputColumnNames(), desc2.getOutputColumnNames()) &&
+        compareObject(desc1.getCondsList(), desc2.getCondsList()) &&
+        desc1.getHandleSkewJoin() == desc2.getHandleSkewJoin() &&
+        compareString(desc1.getNullSafeString(), desc2.getNullSafeString())) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class HashTableSinkOperatorComparator implements OperatorComparator<HashTableSinkOperator> {
+
+    @Override
+    public boolean equals(HashTableSinkOperator op1, HashTableSinkOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      HashTableSinkDesc desc1 = op1.getConf();
+      HashTableSinkDesc desc2 = op2.getConf();
+
+      if (compareObject(desc1.getFilterMapString(), desc2.getFilterMapString()) &&
+        compareObject(desc1.getKeysString(), desc2.getKeysString()) &&
+        desc1.getPosBigTable() == desc2.getPosBigTable() &&
+        compareObject(desc1.getKeysString(), desc2.getKeysString()) &&
+        compareObject(desc1.getFiltersStringMap(), desc2.getFiltersStringMap()) &&
+        compareObject(desc1.getOutputColumnNames(), desc2.getOutputColumnNames()) &&
+        compareObject(desc1.getCondsList(), desc2.getCondsList()) &&
+        desc1.getHandleSkewJoin() == desc2.getHandleSkewJoin() &&
+        compareString(desc1.getNullSafeString(), desc2.getNullSafeString())) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class LateralViewJoinOperatorComparator implements OperatorComparator<LateralViewJoinOperator> {
+
+    @Override
+    public boolean equals(LateralViewJoinOperator op1, LateralViewJoinOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      LateralViewJoinDesc desc1 = op1.getConf();
+      LateralViewJoinDesc desc2 = op2.getConf();
+
+      return compareObject(desc1.getOutputInternalColNames(), desc2.getOutputInternalColNames());
+    }
+  }
+
+  static class ScriptOperatorComparator implements OperatorComparator<ScriptOperator> {
+
+    @Override
+    public boolean equals(ScriptOperator op1, ScriptOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      ScriptDesc desc1 = op1.getConf();
+      ScriptDesc desc2 = op2.getConf();
+
+      if (compareString(desc1.getScriptCmd(), desc2.getScriptCmd()) &&
+        compareObject(desc1.getScriptOutputInfo(), desc2.getScriptOutputInfo())) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static class UDTFOperatorComparator implements OperatorComparator<UDTFOperator> {
+
+    @Override
+    public boolean equals(UDTFOperator op1, UDTFOperator op2) {
+      Preconditions.checkNotNull(op1);
+      Preconditions.checkNotNull(op2);
+      UDTFDesc desc1 = op1.getConf();
+      UDTFDesc desc2 = op2.getConf();
+
+      if (compareString(desc1.getUDTFName(), desc2.getUDTFName()) &&
+        compareString(desc1.isOuterLateralView(), desc2.isOuterLateralView())) {
+        return true;
+      } else {
+        return false;
+      }
+    }
+  }
+
+  static boolean compareString(String first, String second) {
+    return compareObject(first, second);
+  }
+
+  /*
+   * Compare Objects which implements its own meaningful equals methods.
+   */
+  static boolean compareObject(Object first, Object second) {
+    return first == null ? second == null : first.equals(second);
+  }
+
+  static boolean compareExprNodeDesc(ExprNodeDesc first, ExprNodeDesc second) {
+    return first == null ? second == null : first.isSame(second);
+  }
+
+  static boolean compareExprNodeDescList(List<ExprNodeDesc> first, List<ExprNodeDesc> second) {
+    if (first == null && second == null) {
+      return true;
+    }
+    if ((first == null && second != null) || (first != null && second == null)) {
+      return false;
+    }
+    if (first.size() != second.size()) {
+      return false;
+    } else {
+      for (int i = 0; i < first.size(); i++) {
+        if (!first.get(i).isSame(second.get(i))) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
new file mode 100644
index 0000000..b7c57e8
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.optimizer.spark;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Stack;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
+import org.apache.hadoop.hive.ql.lib.Dispatcher;
+import org.apache.hadoop.hive.ql.lib.Node;
+import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
+import org.apache.hadoop.hive.ql.optimizer.OperatorComparatorFactory;
+import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalContext;
+import org.apache.hadoop.hive.ql.optimizer.physical.PhysicalPlanResolver;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.SparkEdgeProperty;
+import org.apache.hadoop.hive.ql.plan.SparkWork;
+
+
+/**
+ * CombineEquivalentWorkResolver would search inside SparkWork, find and combine equivalent
+ * works.
+ */
+public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
+  protected static transient Log LOG = LogFactory.getLog(CombineEquivalentWorkResolver.class);
+
+  @Override
+  public PhysicalContext resolve(PhysicalContext pctx) throws SemanticException {
+    List<Node> topNodes = new ArrayList<Node>();
+    topNodes.addAll(pctx.getRootTasks());
+    TaskGraphWalker taskWalker = new TaskGraphWalker(new EquivalentWorkMatcher());
+    HashMap<Node, Object> nodeOutput = Maps.newHashMap();
+    taskWalker.startWalking(topNodes, nodeOutput);
+    return pctx;
+  }
+
+  class EquivalentWorkMatcher implements Dispatcher {
+    private Comparator<BaseWork> baseWorkComparator = new Comparator<BaseWork>() {
+      @Override
+      public int compare(BaseWork o1, BaseWork o2) {
+        return o1.getName().compareTo(o2.getName());
+      }
+    };
+
+    @Override
+    public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs) throws SemanticException {
+      if (nd instanceof SparkTask) {
+        SparkTask sparkTask = (SparkTask) nd;
+        SparkWork sparkWork = sparkTask.getWork();
+        Set<BaseWork> roots = sparkWork.getRoots();
+        compareWorksRecursively(roots, sparkWork);
+      }
+      return null;
+    }
+
+    private void compareWorksRecursively(Set<BaseWork> works, SparkWork sparkWork) {
+      // find out all equivalent works in the Set.
+      Set<Set<BaseWork>> equivalentWorks = compareChildWorks(works, sparkWork);
+      // combine equivalent work into single one in SparkWork's work graph.
+      Set<BaseWork> removedWorks = combineEquivalentWorks(equivalentWorks, sparkWork);
+
+      // try to combine next level works recursively.
+      for (BaseWork work : works) {
+        if (!removedWorks.contains(work)) {
+          Set<BaseWork> children = Sets.newHashSet();
+          children.addAll(sparkWork.getChildren(work));
+          if (children.size() > 0) {
+            compareWorksRecursively(children, sparkWork);
+          }
+        }
+      }
+    }
+
+    private Set<Set<BaseWork>> compareChildWorks(Set<BaseWork> children, SparkWork sparkWork) {
+      Set<Set<BaseWork>> equivalentChildren = Sets.newHashSet();
+      if (children.size() > 1) {
+        for (BaseWork work : children) {
+          boolean assigned = false;
+          for (Set<BaseWork> set : equivalentChildren) {
+            if (belongToSet(set, work, sparkWork)) {
+              set.add(work);
+              assigned = true;
+              break;
+            }
+          }
+          if (!assigned) {
+            // sort the works so that we get consistent query plan for multi executions(for test verification).
+            Set<BaseWork> newSet = Sets.newTreeSet(baseWorkComparator);
+            newSet.add(work);
+            equivalentChildren.add(newSet);
+          }
+        }
+      }
+      return equivalentChildren;
+    }
+
+    private boolean belongToSet(Set<BaseWork> set, BaseWork work, SparkWork sparkWork) {
+      if (set.isEmpty()) {
+        return true;
+      } else if (compareWork(set.iterator().next(), work, sparkWork)) {
+        return true;
+      }
+      return false;
+    }
+
+    private Set<BaseWork> combineEquivalentWorks(Set<Set<BaseWork>> equivalentWorks, SparkWork sparkWork) {
+      Set<BaseWork> removedWorks = Sets.newHashSet();
+      for (Set<BaseWork> workSet : equivalentWorks) {
+        if (workSet.size() > 1) {
+          Iterator<BaseWork> iterator = workSet.iterator();
+          BaseWork first = iterator.next();
+          while (iterator.hasNext()) {
+            BaseWork next = iterator.next();
+            replaceWork(next, first, sparkWork);
+            removedWorks.add(next);
+          }
+        }
+      }
+      return removedWorks;
+    }
+
+    private void replaceWork(BaseWork previous, BaseWork current, SparkWork sparkWork) {
+      updateReference(previous, current, sparkWork);
+      List<BaseWork> parents = sparkWork.getParents(previous);
+      List<BaseWork> children = sparkWork.getChildren(previous);
+      if (parents != null) {
+        for (BaseWork parent : parents) {
+          // we do not need to connect its parent to its counterpart, as they have the same parents.
+          sparkWork.disconnect(parent, previous);
+        }
+      }
+      if (children != null) {
+        for (BaseWork child : children) {
+          SparkEdgeProperty edgeProperty = sparkWork.getEdgeProperty(previous, child);
+          sparkWork.disconnect(previous, child);
+          sparkWork.connect(current, child, edgeProperty);
+        }
+      }
+      sparkWork.remove(previous);
+    }
+
+    /*
+    * update the Work name which referred by Operators in following Works.
+    */
+    private void updateReference(BaseWork previous, BaseWork current, SparkWork sparkWork) {
+      String previousName = previous.getName();
+      String currentName = current.getName();
+      List<BaseWork> children = sparkWork.getAllWork();
+      for (BaseWork child : children) {
+        Set<Operator<?>> allOperators = child.getAllOperators();
+        for (Operator<?> operator : allOperators) {
+          if (operator instanceof MapJoinOperator) {
+            MapJoinDesc mapJoinDesc = ((MapJoinOperator) operator).getConf();
+            Map<Integer, String> parentToInput = mapJoinDesc.getParentToInput();
+            for (Integer id : parentToInput.keySet()) {
+              String parent = parentToInput.get(id);
+              if (parent.equals(previousName)) {
+                parentToInput.put(id, currentName);
+              }
+            }
+          }
+        }
+      }
+    }
+
+    private boolean compareWork(BaseWork first, BaseWork second, SparkWork sparkWork) {
+
+      if (!first.getClass().getName().equals(second.getClass().getName())) {
+        return false;
+      }
+
+      if (!hasSameParent(first, second, sparkWork)) {
+        return false;
+      }
+
+      // leave work's output may be read in further SparkWork/FetchWork, we should not combine
+      // leave works without notifying further SparkWork/FetchWork.
+      if (sparkWork.getLeaves().contains(first) && sparkWork.getLeaves().contains(second)) {
+        return false;
+      }
+
+      // If these two Works share the same child, we can not combine them as SparkPlan does not
+      // support multi edge between two Works.
+      List<BaseWork> firstChildren = sparkWork.getChildren(first);
+      List<BaseWork> secondChildren = sparkWork.getChildren(second);
+      for (BaseWork child : firstChildren) {
+        if (secondChildren.contains(child)) {
+          return false;
+        }
+      }
+
+      Set<Operator<?>> firstRootOperators = first.getAllRootOperators();
+      Set<Operator<?>> secondRootOperators = second.getAllRootOperators();
+      if (firstRootOperators.size() != secondRootOperators.size()) {
+        return false;
+      }
+
+      Iterator<Operator<?>> firstIterator = firstRootOperators.iterator();
+      Iterator<Operator<?>> secondIterator = secondRootOperators.iterator();
+      while (firstIterator.hasNext()) {
+        boolean result = compareOperatorChain(firstIterator.next(), secondIterator.next());
+        if (!result) {
+          return result;
+        }
+      }
+
+      return true;
+    }
+
+    private boolean hasSameParent(BaseWork first, BaseWork second, SparkWork sparkWork) {
+      boolean result = true;
+      List<BaseWork> firstParents = sparkWork.getParents(first);
+      List<BaseWork> secondParents = sparkWork.getParents(second);
+      if (firstParents.size() != secondParents.size()) {
+        result = false;
+      }
+      for (BaseWork parent : firstParents) {
+        if (!secondParents.contains(parent)) {
+          result = false;
+          break;
+        }
+      }
+      return result;
+    }
+
+    private boolean compareOperatorChain(Operator<?> firstOperator, Operator<?> secondOperator) {
+      boolean result = compareCurrentOperator(firstOperator, secondOperator);
+      if (!result) {
+        return result;
+      }
+
+      List<Operator<? extends OperatorDesc>> firstOperatorChildOperators = firstOperator.getChildOperators();
+      List<Operator<? extends OperatorDesc>> secondOperatorChildOperators = secondOperator.getChildOperators();
+      if (firstOperatorChildOperators == null && secondOperatorChildOperators != null) {
+        return false;
+      } else if (firstOperatorChildOperators != null && secondOperatorChildOperators == null) {
+        return false;
+      } else if (firstOperatorChildOperators != null && secondOperatorChildOperators != null) {
+        if (firstOperatorChildOperators.size() != secondOperatorChildOperators.size()) {
+          return false;
+        }
+        int size = firstOperatorChildOperators.size();
+        for (int i = 0; i < size; i++) {
+          result = compareOperatorChain(firstOperatorChildOperators.get(i), secondOperatorChildOperators.get(i));
+          if (!result) {
+            return false;
+          }
+        }
+      }
+
+      return true;
+    }
+
+    /**
+     * Compare Operators through their Explain output string.
+     *
+     * @param firstOperator
+     * @param secondOperator
+     * @return
+     */
+    private boolean compareCurrentOperator(Operator<?> firstOperator, Operator<?> secondOperator) {
+      if (!firstOperator.getClass().getName().equals(secondOperator.getClass().getName())) {
+        return false;
+      }
+
+      OperatorComparatorFactory.OperatorComparator operatorComparator =
+        OperatorComparatorFactory.getOperatorComparator(firstOperator.getClass());
+      return operatorComparator.equals(firstOperator, secondOperator);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
index 19aae70..7f2c079 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkCompiler.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hive.ql.optimizer.physical.SparkCrossProductCheck;
 import org.apache.hadoop.hive.ql.optimizer.physical.SparkMapJoinResolver;
 import org.apache.hadoop.hive.ql.optimizer.physical.StageIDsRearranger;
 import org.apache.hadoop.hive.ql.optimizer.physical.Vectorizer;
+import org.apache.hadoop.hive.ql.optimizer.spark.CombineEquivalentWorkResolver;
 import org.apache.hadoop.hive.ql.optimizer.spark.SetSparkReducerParallelism;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinHintOptimizer;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkJoinOptimizer;
@@ -337,6 +338,8 @@ public class SparkCompiler extends TaskCompiler {
       LOG.debug("Skipping stage id rearranger");
     }
 
+    new CombineEquivalentWorkResolver().resolve(physicalCtx);
+
     PERF_LOGGER.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_OPTIMIZE_TASK_TREE);
     return;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
index b307b16..45931b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/JoinCondDesc.java
@@ -143,4 +143,18 @@ public class JoinCondDesc implements Serializable {
 
     return sb.toString();
   }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof JoinCondDesc)) {
+      return false;
+    }
+
+    JoinCondDesc other = (JoinCondDesc) obj;
+    if (this.type != other.type || this.left != other.left ||
+      this.right != other.right || this.preserved != other.preserved) {
+      return false;
+    }
+    return true;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/auto_join30.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/auto_join30.q.out b/ql/src/test/results/clientpositive/spark/auto_join30.q.out
index 7b5c5e7..4b67445 100644
--- a/ql/src/test/results/clientpositive/spark/auto_join30.q.out
+++ b/ql/src/test/results/clientpositive/spark/auto_join30.q.out
@@ -462,7 +462,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 5 <- Map 4 (PARTITION-LEVEL SORT, 2)
-        Reducer 7 <- Map 6 (PARTITION-LEVEL SORT, 2)
+        Reducer 7 <- Map 4 (PARTITION-LEVEL SORT, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -482,23 +482,6 @@ STAGE PLANS:
                         sort order: +
                         Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col0 (type: string)
-        Map 6 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: key is not null (type: boolean)
-                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                    Select Operator
-                      expressions: key (type: string), value (type: string)
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col1 (type: string)
-                        sort order: +
-                        Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col0 (type: string)
         Reducer 5 
             Local Work:
               Map Reduce Local Work
@@ -657,7 +640,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 5 <- Map 4 (PARTITION-LEVEL SORT, 2)
-        Reducer 7 <- Map 6 (PARTITION-LEVEL SORT, 2)
+        Reducer 7 <- Map 4 (PARTITION-LEVEL SORT, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -674,20 +657,6 @@ STAGE PLANS:
                       sort order: +
                       Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                       value expressions: _col0 (type: string)
-        Map 6 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: _col0, _col1
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: _col1 (type: string)
-                      sort order: +
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: _col0 (type: string)
         Reducer 5 
             Local Work:
               Map Reduce Local Work
@@ -843,7 +812,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 5 <- Map 4 (PARTITION-LEVEL SORT, 2)
-        Reducer 7 <- Map 6 (PARTITION-LEVEL SORT, 2)
+        Reducer 7 <- Map 4 (PARTITION-LEVEL SORT, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -860,20 +829,6 @@ STAGE PLANS:
                       sort order: +
                       Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                       value expressions: _col0 (type: string)
-        Map 6 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: _col0, _col1
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: _col1 (type: string)
-                      sort order: +
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: _col0 (type: string)
         Reducer 5 
             Local Work:
               Map Reduce Local Work

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/auto_smb_mapjoin_14.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/auto_smb_mapjoin_14.q.out b/ql/src/test/results/clientpositive/spark/auto_smb_mapjoin_14.q.out
index 8a43d78..714d098 100644
--- a/ql/src/test/results/clientpositive/spark/auto_smb_mapjoin_14.q.out
+++ b/ql/src/test/results/clientpositive/spark/auto_smb_mapjoin_14.q.out
@@ -299,7 +299,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 5 <- Map 4 (GROUP, 1)
+        Reducer 5 <- Map 1 (GROUP, 1)
         Reducer 3 <- Reducer 2 (PARTITION-LEVEL SORT, 1), Reducer 5 (PARTITION-LEVEL SORT, 1)
 #### A masked pattern was here ####
       Vertices:
@@ -331,34 +331,6 @@ STAGE PLANS:
                           Map-reduce partition columns: _col0 (type: int)
                           Statistics: Num rows: 5 Data size: 38 Basic stats: COMPLETE Column stats: NONE
                           value expressions: _col1 (type: bigint)
-        Map 4 
-            Map Operator Tree:
-                TableScan
-                  alias: a
-                  Statistics: Num rows: 10 Data size: 70 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: key is not null (type: boolean)
-                    Statistics: Num rows: 5 Data size: 35 Basic stats: COMPLETE Column stats: NONE
-                    Sorted Merge Bucket Map Join Operator
-                      condition map:
-                           Inner Join 0 to 1
-                      keys:
-                        0 key (type: int)
-                        1 key (type: int)
-                      outputColumnNames: _col0
-                      Statistics: Num rows: 5 Data size: 38 Basic stats: COMPLETE Column stats: NONE
-                      Group By Operator
-                        aggregations: count()
-                        keys: _col0 (type: int)
-                        mode: hash
-                        outputColumnNames: _col0, _col1
-                        Statistics: Num rows: 5 Data size: 38 Basic stats: COMPLETE Column stats: NONE
-                        Reduce Output Operator
-                          key expressions: _col0 (type: int)
-                          sort order: +
-                          Map-reduce partition columns: _col0 (type: int)
-                          Statistics: Num rows: 5 Data size: 38 Basic stats: COMPLETE Column stats: NONE
-                          value expressions: _col1 (type: bigint)
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/groupby10.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/groupby10.q.out b/ql/src/test/results/clientpositive/spark/groupby10.q.out
index 9d3cf36..95d7bba 100644
--- a/ql/src/test/results/clientpositive/spark/groupby10.q.out
+++ b/ql/src/test/results/clientpositive/spark/groupby10.q.out
@@ -55,8 +55,15 @@ STAGE PLANS:
   Stage: Stage-2
     Spark
       Edges:
+<<<<<<< HEAD
         Reducer 2 <- Map 4 (GROUP PARTITION-LEVEL SORT, 2)
         Reducer 3 <- Map 5 (GROUP PARTITION-LEVEL SORT, 2)
+=======
+        Reducer 2 <- Map 6 (GROUP PARTITION-LEVEL SORT, 2)
+        Reducer 4 <- Map 6 (GROUP PARTITION-LEVEL SORT, 2)
+        Reducer 3 <- Reducer 2 (GROUP, 2)
+        Reducer 5 <- Reducer 4 (GROUP, 2)
+>>>>>>> 47f796e... HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -73,6 +80,7 @@ STAGE PLANS:
                       sort order: ++
                       Map-reduce partition columns: key (type: int)
                       Statistics: Num rows: 2 Data size: 280 Basic stats: COMPLETE Column stats: NONE
+<<<<<<< HEAD
         Map 5 
             Map Operator Tree:
                 TableScan
@@ -87,6 +95,8 @@ STAGE PLANS:
                       sort order: ++
                       Map-reduce partition columns: key (type: int)
                       Statistics: Num rows: 2 Data size: 280 Basic stats: COMPLETE Column stats: NONE
+=======
+>>>>>>> 47f796e... HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator
@@ -251,8 +261,15 @@ STAGE PLANS:
   Stage: Stage-2
     Spark
       Edges:
+<<<<<<< HEAD
         Reducer 2 <- Map 4 (GROUP PARTITION-LEVEL SORT, 2)
         Reducer 3 <- Map 5 (GROUP PARTITION-LEVEL SORT, 2)
+=======
+        Reducer 2 <- Map 6 (GROUP PARTITION-LEVEL SORT, 2)
+        Reducer 4 <- Map 6 (GROUP PARTITION-LEVEL SORT, 2)
+        Reducer 3 <- Reducer 2 (GROUP, 2)
+        Reducer 5 <- Reducer 4 (GROUP, 2)
+>>>>>>> 47f796e... HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -269,6 +286,7 @@ STAGE PLANS:
                       sort order: ++
                       Map-reduce partition columns: key (type: int)
                       Statistics: Num rows: 2 Data size: 280 Basic stats: COMPLETE Column stats: NONE
+<<<<<<< HEAD
         Map 5 
             Map Operator Tree:
                 TableScan
@@ -283,6 +301,8 @@ STAGE PLANS:
                       sort order: ++
                       Map-reduce partition columns: key (type: int)
                       Statistics: Num rows: 2 Data size: 280 Basic stats: COMPLETE Column stats: NONE
+=======
+>>>>>>> 47f796e... HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/groupby7_map.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/groupby7_map.q.out b/ql/src/test/results/clientpositive/spark/groupby7_map.q.out
index abd6459..3b5c22a 100644
--- a/ql/src/test/results/clientpositive/spark/groupby7_map.q.out
+++ b/ql/src/test/results/clientpositive/spark/groupby7_map.q.out
@@ -40,7 +40,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 2 <- Map 4 (GROUP, 31)
-        Reducer 3 <- Map 5 (GROUP, 31)
+        Reducer 3 <- Map 4 (GROUP, 31)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -64,27 +64,6 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col1 (type: double)
-        Map 5 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Group By Operator
-                      aggregations: sum(substr(value, 5))
-                      keys: key (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
-                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col1 (type: double)
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out b/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out
index 5e69b31..3ba0022 100644
--- a/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out
+++ b/ql/src/test/results/clientpositive/spark/groupby7_map_skew.q.out
@@ -40,9 +40,8 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 2 <- Map 6 (GROUP PARTITION-LEVEL SORT, 31)
-        Reducer 4 <- Map 7 (GROUP PARTITION-LEVEL SORT, 31)
         Reducer 3 <- Reducer 2 (GROUP, 31)
-        Reducer 5 <- Reducer 4 (GROUP, 31)
+        Reducer 5 <- Reducer 2 (GROUP, 31)
 #### A masked pattern was here ####
       Vertices:
         Map 6 
@@ -66,27 +65,6 @@ STAGE PLANS:
                         Map-reduce partition columns: rand() (type: double)
                         Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col1 (type: double)
-        Map 7 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Group By Operator
-                      aggregations: sum(substr(value, 5))
-                      keys: key (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: rand() (type: double)
-                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col1 (type: double)
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator
@@ -121,20 +99,6 @@ STAGE PLANS:
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest1
-        Reducer 4 
-            Reduce Operator Tree:
-              Group By Operator
-                aggregations: sum(VALUE._col0)
-                keys: KEY._col0 (type: string)
-                mode: partials
-                outputColumnNames: _col0, _col1
-                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                Reduce Output Operator
-                  key expressions: _col0 (type: string)
-                  sort order: +
-                  Map-reduce partition columns: _col0 (type: string)
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  value expressions: _col1 (type: double)
         Reducer 5 
             Reduce Operator Tree:
               Group By Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out b/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out
index 3418b99..8c985c5 100644
--- a/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out
+++ b/ql/src/test/results/clientpositive/spark/groupby7_noskew.q.out
@@ -40,7 +40,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 2 <- Map 4 (GROUP, 31)
-        Reducer 3 <- Map 5 (GROUP, 31)
+        Reducer 3 <- Map 4 (GROUP, 31)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -58,21 +58,6 @@ STAGE PLANS:
                       Map-reduce partition columns: key (type: string)
                       Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                       value expressions: substr(value, 5) (type: string)
-        Map 5 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: key (type: string)
-                      sort order: +
-                      Map-reduce partition columns: key (type: string)
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: substr(value, 5) (type: string)
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/groupby7_noskew_multi_single_reducer.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/groupby7_noskew_multi_single_reducer.q.out b/ql/src/test/results/clientpositive/spark/groupby7_noskew_multi_single_reducer.q.out
index 2cb126d..a6ea423 100644
--- a/ql/src/test/results/clientpositive/spark/groupby7_noskew_multi_single_reducer.q.out
+++ b/ql/src/test/results/clientpositive/spark/groupby7_noskew_multi_single_reducer.q.out
@@ -40,9 +40,8 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 5 <- Map 1 (GROUP PARTITION-LEVEL SORT, 31)
-        Reducer 6 <- Map 1 (GROUP PARTITION-LEVEL SORT, 31)
         Reducer 3 <- Reducer 5 (SORT, 1)
-        Reducer 4 <- Reducer 6 (SORT, 1)
+        Reducer 4 <- Reducer 5 (SORT, 1)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -117,21 +116,6 @@ STAGE PLANS:
                     sort order: +
                     Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
                     value expressions: _col1 (type: double)
-        Reducer 6 
-            Reduce Operator Tree:
-              Forward
-                Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                Group By Operator
-                  aggregations: sum(VALUE._col0)
-                  keys: KEY._col0 (type: string)
-                  mode: complete
-                  outputColumnNames: _col0, _col1
-                  Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                  Reduce Output Operator
-                    key expressions: _col0 (type: string)
-                    sort order: +
-                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                    value expressions: _col1 (type: double)
 
   Stage: Stage-0
     Move Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/groupby8.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/groupby8.q.out b/ql/src/test/results/clientpositive/spark/groupby8.q.out
index 307395f..ced7f07 100644
--- a/ql/src/test/results/clientpositive/spark/groupby8.q.out
+++ b/ql/src/test/results/clientpositive/spark/groupby8.q.out
@@ -39,8 +39,14 @@ STAGE PLANS:
   Stage: Stage-2
     Spark
       Edges:
+<<<<<<< HEAD
         Reducer 2 <- Map 4 (GROUP PARTITION-LEVEL SORT, 2)
         Reducer 3 <- Map 5 (GROUP PARTITION-LEVEL SORT, 2)
+=======
+        Reducer 2 <- Map 6 (GROUP PARTITION-LEVEL SORT, 2)
+        Reducer 3 <- Reducer 2 (GROUP, 2)
+        Reducer 5 <- Reducer 2 (GROUP, 2)
+>>>>>>> 47f796e... HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -57,6 +63,7 @@ STAGE PLANS:
                       sort order: ++
                       Map-reduce partition columns: key (type: string)
                       Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+<<<<<<< HEAD
         Map 5 
             Map Operator Tree:
                 TableScan
@@ -71,6 +78,8 @@ STAGE PLANS:
                       sort order: ++
                       Map-reduce partition columns: key (type: string)
                       Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+=======
+>>>>>>> 47f796e... HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator
@@ -91,12 +100,21 @@ STAGE PLANS:
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest1
+<<<<<<< HEAD
         Reducer 3 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(DISTINCT KEY._col1:0._col0)
                 keys: KEY._col0 (type: string)
                 mode: complete
+=======
+        Reducer 5 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                keys: KEY._col0 (type: string)
+                mode: final
+>>>>>>> 47f796e... HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)
                 outputColumnNames: _col0, _col1
                 Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
                 Select Operator
@@ -811,8 +829,14 @@ STAGE PLANS:
   Stage: Stage-2
     Spark
       Edges:
+<<<<<<< HEAD
         Reducer 2 <- Map 4 (GROUP PARTITION-LEVEL SORT, 2)
         Reducer 3 <- Map 5 (GROUP PARTITION-LEVEL SORT, 2)
+=======
+        Reducer 2 <- Map 6 (GROUP PARTITION-LEVEL SORT, 2)
+        Reducer 3 <- Reducer 2 (GROUP, 2)
+        Reducer 5 <- Reducer 2 (GROUP, 2)
+>>>>>>> 47f796e... HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -829,6 +853,7 @@ STAGE PLANS:
                       sort order: ++
                       Map-reduce partition columns: key (type: string)
                       Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+<<<<<<< HEAD
         Map 5 
             Map Operator Tree:
                 TableScan
@@ -843,6 +868,8 @@ STAGE PLANS:
                       sort order: ++
                       Map-reduce partition columns: key (type: string)
                       Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
+=======
+>>>>>>> 47f796e... HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator
@@ -863,12 +890,21 @@ STAGE PLANS:
                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
                         name: default.dest1
+<<<<<<< HEAD
         Reducer 3 
             Reduce Operator Tree:
               Group By Operator
                 aggregations: count(DISTINCT KEY._col1:0._col0)
                 keys: KEY._col0 (type: string)
                 mode: complete
+=======
+        Reducer 5 
+            Reduce Operator Tree:
+              Group By Operator
+                aggregations: count(VALUE._col0)
+                keys: KEY._col0 (type: string)
+                mode: final
+>>>>>>> 47f796e... HIVE-10844: Combine equivalent Works for HoS[Spark Branch] (Chengxiang via Xuefu)
                 outputColumnNames: _col0, _col1
                 Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
                 Select Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/groupby8_map_skew.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/groupby8_map_skew.q.out b/ql/src/test/results/clientpositive/spark/groupby8_map_skew.q.out
index ba04a57..4c89acf 100644
--- a/ql/src/test/results/clientpositive/spark/groupby8_map_skew.q.out
+++ b/ql/src/test/results/clientpositive/spark/groupby8_map_skew.q.out
@@ -40,7 +40,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 2 <- Map 4 (GROUP PARTITION-LEVEL SORT, 31)
-        Reducer 3 <- Map 5 (GROUP PARTITION-LEVEL SORT, 31)
+        Reducer 3 <- Map 4 (GROUP PARTITION-LEVEL SORT, 31)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -63,26 +63,6 @@ STAGE PLANS:
                         sort order: ++
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-        Map 5 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Group By Operator
-                      aggregations: count(DISTINCT substr(value, 5))
-                      keys: key (type: string), substr(value, 5) (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1, _col2
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string), _col1 (type: string)
-                        sort order: ++
-                        Map-reduce partition columns: _col0 (type: string)
-                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/insert_into3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/insert_into3.q.out b/ql/src/test/results/clientpositive/spark/insert_into3.q.out
index 7df5ba8..0531556 100644
--- a/ql/src/test/results/clientpositive/spark/insert_into3.q.out
+++ b/ql/src/test/results/clientpositive/spark/insert_into3.q.out
@@ -40,7 +40,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 2 <- Map 4 (SORT, 1)
-        Reducer 3 <- Map 5 (SORT, 1)
+        Reducer 3 <- Map 4 (SORT, 1)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -56,19 +56,6 @@ STAGE PLANS:
                       key expressions: _col0 (type: string), _col1 (type: string)
                       sort order: ++
                       Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-        Map 5 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: _col0, _col1
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: _col0 (type: string), _col1 (type: string)
-                      sort order: ++
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
         Reducer 2 
             Reduce Operator Tree:
               Select Operator
@@ -198,7 +185,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 2 <- Map 4 (GROUP, 1)
-        Reducer 3 <- Map 5 (GROUP, 1)
+        Reducer 3 <- Map 4 (GROUP, 1)
 #### A masked pattern was here ####
       Vertices:
         Map 4 
@@ -217,22 +204,6 @@ STAGE PLANS:
                         sort order: 
                         Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col0 (type: string), _col1 (type: string)
-        Map 5 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: _col0, _col1
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Limit
-                      Number of rows: 10
-                      Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        sort order: 
-                        Statistics: Num rows: 10 Data size: 100 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col0 (type: string), _col1 (type: string)
         Reducer 2 
             Reduce Operator Tree:
               Select Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/join22.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/join22.q.out b/ql/src/test/results/clientpositive/spark/join22.q.out
index b1e5b67..dfbb714 100644
--- a/ql/src/test/results/clientpositive/spark/join22.q.out
+++ b/ql/src/test/results/clientpositive/spark/join22.q.out
@@ -13,7 +13,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 4 (PARTITION-LEVEL SORT, 2)
-        Reducer 3 <- Map 5 (PARTITION-LEVEL SORT, 2), Reducer 2 (PARTITION-LEVEL SORT, 2)
+        Reducer 3 <- Map 4 (PARTITION-LEVEL SORT, 2), Reducer 2 (PARTITION-LEVEL SORT, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -51,23 +51,6 @@ STAGE PLANS:
                         sort order: +
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-        Map 5 
-            Map Operator Tree:
-                TableScan
-                  alias: src4
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: key is not null (type: boolean)
-                    Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                    Select Operator
-                      expressions: key (type: string)
-                      outputColumnNames: _col0
-                      Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
-                        Statistics: Num rows: 250 Data size: 2656 Basic stats: COMPLETE Column stats: NONE
         Reducer 2 
             Reduce Operator Tree:
               Join Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/skewjoinopt11.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/skewjoinopt11.q.out b/ql/src/test/results/clientpositive/spark/skewjoinopt11.q.out
index 8a278ef..47a7d56 100644
--- a/ql/src/test/results/clientpositive/spark/skewjoinopt11.q.out
+++ b/ql/src/test/results/clientpositive/spark/skewjoinopt11.q.out
@@ -68,8 +68,8 @@ STAGE PLANS:
       Edges:
         Reducer 11 <- Map 10 (PARTITION-LEVEL SORT, 2), Map 12 (PARTITION-LEVEL SORT, 2)
         Reducer 2 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL SORT, 2)
-        Reducer 5 <- Map 4 (PARTITION-LEVEL SORT, 2), Map 6 (PARTITION-LEVEL SORT, 2)
-        Reducer 8 <- Map 7 (PARTITION-LEVEL SORT, 2), Map 9 (PARTITION-LEVEL SORT, 2)
+        Reducer 5 <- Map 1 (PARTITION-LEVEL SORT, 2), Map 3 (PARTITION-LEVEL SORT, 2)
+        Reducer 8 <- Map 10 (PARTITION-LEVEL SORT, 2), Map 12 (PARTITION-LEVEL SORT, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -128,62 +128,6 @@ STAGE PLANS:
                       Map-reduce partition columns: key (type: string)
                       Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
                       value expressions: val (type: string)
-        Map 4 
-            Map Operator Tree:
-                TableScan
-                  alias: a
-                  Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: (key is not null and (key = '2')) (type: boolean)
-                    Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: key (type: string)
-                      sort order: +
-                      Map-reduce partition columns: key (type: string)
-                      Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: val (type: string)
-        Map 6 
-            Map Operator Tree:
-                TableScan
-                  alias: b
-                  Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: (key is not null and (key = '2')) (type: boolean)
-                    Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: key (type: string)
-                      sort order: +
-                      Map-reduce partition columns: key (type: string)
-                      Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: val (type: string)
-        Map 7 
-            Map Operator Tree:
-                TableScan
-                  alias: a
-                  Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: (key is not null and (not (key = '2'))) (type: boolean)
-                    Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: key (type: string)
-                      sort order: +
-                      Map-reduce partition columns: key (type: string)
-                      Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: val (type: string)
-        Map 9 
-            Map Operator Tree:
-                TableScan
-                  alias: b
-                  Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                  Filter Operator
-                    predicate: (key is not null and (not (key = '2'))) (type: boolean)
-                    Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                    Reduce Output Operator
-                      key expressions: key (type: string)
-                      sort order: +
-                      Map-reduce partition columns: key (type: string)
-                      Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                      value expressions: val (type: string)
         Reducer 11 
             Reduce Operator Tree:
               Join Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/union28.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/union28.q.out b/ql/src/test/results/clientpositive/spark/union28.q.out
index fd1c233..3902a3d 100644
--- a/ql/src/test/results/clientpositive/spark/union28.q.out
+++ b/ql/src/test/results/clientpositive/spark/union28.q.out
@@ -42,7 +42,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 3 <- Map 2 (GROUP, 2)
-        Reducer 5 <- Map 4 (GROUP, 2)
+        Reducer 5 <- Map 2 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -87,27 +87,6 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
                         Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col2 (type: bigint)
-        Map 4 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Group By Operator
-                      aggregations: count(1)
-                      keys: key (type: string), value (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1, _col2
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string), _col1 (type: string)
-                        sort order: ++
-                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
-                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col2 (type: bigint)
         Reducer 3 
             Reduce Operator Tree:
               Group By Operator

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/union3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/union3.q.out b/ql/src/test/results/clientpositive/spark/union3.q.out
index 834b6d4..b437920 100644
--- a/ql/src/test/results/clientpositive/spark/union3.q.out
+++ b/ql/src/test/results/clientpositive/spark/union3.q.out
@@ -45,9 +45,9 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 2 <- Map 1 (GROUP, 1)
-        Reducer 5 <- Map 4 (GROUP, 1)
-        Reducer 7 <- Map 6 (GROUP, 1)
-        Reducer 9 <- Map 8 (GROUP, 1)
+        Reducer 5 <- Map 1 (GROUP, 1)
+        Reducer 7 <- Map 1 (GROUP, 1)
+        Reducer 9 <- Map 1 (GROUP, 1)
         Reducer 3 <- Reducer 2 (PARTITION-LEVEL SORT, 2), Reducer 5 (PARTITION-LEVEL SORT, 2), Reducer 7 (PARTITION-LEVEL SORT, 2), Reducer 9 (PARTITION-LEVEL SORT, 2)
 #### A masked pattern was here ####
       Vertices:
@@ -64,45 +64,6 @@ STAGE PLANS:
                       Reduce Output Operator
                         sort order: 
                         Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
-        Map 4 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
-                  Select Operator
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
-                    Limit
-                      Number of rows: 1
-                      Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
-                      Reduce Output Operator
-                        sort order: 
-                        Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
-        Map 6 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
-                  Select Operator
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
-                    Limit
-                      Number of rows: 1
-                      Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
-                      Reduce Output Operator
-                        sort order: 
-                        Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
-        Map 8 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
-                  Select Operator
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
-                    Limit
-                      Number of rows: 1
-                      Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
-                      Reduce Output Operator
-                        sort order: 
-                        Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: COMPLETE
         Reducer 2 
             Reduce Operator Tree:
               Limit

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/union30.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/union30.q.out b/ql/src/test/results/clientpositive/spark/union30.q.out
index f21ff0b..06f777f 100644
--- a/ql/src/test/results/clientpositive/spark/union30.q.out
+++ b/ql/src/test/results/clientpositive/spark/union30.q.out
@@ -56,7 +56,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 3 <- Map 2 (GROUP, 2)
-        Reducer 5 <- Map 4 (GROUP, 2)
+        Reducer 5 <- Map 2 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -101,27 +101,6 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
                         Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col2 (type: bigint)
-        Map 4 
-            Map Operator Tree:
-                TableScan
-                  alias: src
-                  Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: string), value (type: string)
-                    outputColumnNames: key, value
-                    Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                    Group By Operator
-                      aggregations: count(1)
-                      keys: key (type: string), value (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1, _col2
-                      Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string), _col1 (type: string)
-                        sort order: ++
-                        Map-reduce partition columns: _col0 (type: string), _col1 (type: string)
-                        Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col2 (type: bigint)
         Map 6 
             Map Operator Tree:
                 TableScan

http://git-wip-us.apache.org/repos/asf/hive/blob/9e13be3e/ql/src/test/results/clientpositive/spark/union_remove_1.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/union_remove_1.q.out b/ql/src/test/results/clientpositive/spark/union_remove_1.q.out
index 12e3275..efd1554 100644
--- a/ql/src/test/results/clientpositive/spark/union_remove_1.q.out
+++ b/ql/src/test/results/clientpositive/spark/union_remove_1.q.out
@@ -69,7 +69,7 @@ STAGE PLANS:
     Spark
       Edges:
         Reducer 2 <- Map 1 (GROUP, 2)
-        Reducer 4 <- Map 3 (GROUP, 2)
+        Reducer 4 <- Map 1 (GROUP, 2)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -93,27 +93,6 @@ STAGE PLANS:
                         Map-reduce partition columns: _col0 (type: string)
                         Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
                         value expressions: _col1 (type: bigint)
-        Map 3 
-            Map Operator Tree:
-                TableScan
-                  alias: inputtbl1
-                  Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                  Select Operator
-                    expressions: key (type: string)
-                    outputColumnNames: key
-                    Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                    Group By Operator
-                      aggregations: count(1)
-                      keys: key (type: string)
-                      mode: hash
-                      outputColumnNames: _col0, _col1
-                      Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                      Reduce Output Operator
-                        key expressions: _col0 (type: string)
-                        sort order: +
-                        Map-reduce partition columns: _col0 (type: string)
-                        Statistics: Num rows: 1 Data size: 30 Basic stats: COMPLETE Column stats: NONE
-                        value expressions: _col1 (type: bigint)
         Reducer 2 
             Reduce Operator Tree:
               Group By Operator