You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2017/11/06 02:17:52 UTC

[3/3] hive git commit: HIVE-17877: HoS: combine equivalent DPP sink works (Rui reviewed by Sahil)

HIVE-17877: HoS: combine equivalent DPP sink works (Rui reviewed by Sahil)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aaacda47
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aaacda47
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aaacda47

Branch: refs/heads/master
Commit: aaacda474d881cef14f3d7c1ff5da32c28c5663f
Parents: 49f6814
Author: Rui Li <li...@apache.org>
Authored: Mon Nov 6 10:17:44 2017 +0800
Committer: Rui Li <li...@apache.org>
Committed: Mon Nov 6 10:17:44 2017 +0800

----------------------------------------------------------------------
 .../test/resources/testconfiguration.properties |    1 +
 .../exec/spark/SparkDynamicPartitionPruner.java |   15 +-
 .../DynamicPartitionPruningOptimization.java    |    4 +-
 .../SparkDynamicPartitionPruningResolver.java   |    2 +-
 .../spark/CombineEquivalentWorkResolver.java    |  103 +-
 .../spark/SparkPartitionPruningSinkDesc.java    |  127 +-
 .../hive/ql/parse/spark/GenSparkUtils.java      |    2 +-
 .../SparkPartitionPruningSinkOperator.java      |    5 +-
 .../spark_dynamic_partition_pruning_4.q         |  157 ++
 .../spark/spark_dynamic_partition_pruning.q.out |  246 +--
 .../spark_dynamic_partition_pruning_2.q.out     |   24 +-
 .../spark_dynamic_partition_pruning_3.q.out     |   78 +-
 .../spark_dynamic_partition_pruning_4.q.out     | 1891 ++++++++++++++++++
 ...dynamic_partition_pruning_mapjoin_only.q.out |   18 +-
 ...k_vectorized_dynamic_partition_pruning.q.out |  234 +--
 15 files changed, 2542 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 9642697..42c17f4 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1461,6 +1461,7 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\
   spark_dynamic_partition_pruning.q,\
   spark_dynamic_partition_pruning_2.q,\
   spark_dynamic_partition_pruning_3.q,\
+  spark_dynamic_partition_pruning_4.q,\
   spark_dynamic_partition_pruning_mapjoin_only.q,\
   spark_constprog_dpp.q,\
   spark_dynamic_partition_pruning_recursive_mapjoin.q,\

http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
index 2d3d756..e8c019a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.spark;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -124,18 +125,23 @@ public class SparkDynamicPartitionPruner {
         for (FileStatus fstatus : fs.listStatus(sourceDir)) {
           LOG.info("Start processing pruning file: " + fstatus.getPath());
           in = new ObjectInputStream(fs.open(fstatus.getPath()));
-          String columnName = in.readUTF();
+          final int numName = in.readInt();
           SourceInfo info = null;
 
+          Set<String> columnNames = new HashSet<>();
+          for (int i = 0; i < numName; i++) {
+            columnNames.add(in.readUTF());
+          }
           for (SourceInfo si : sourceInfoMap.get(name)) {
-            if (columnName.equals(si.columnName)) {
+            if (columnNames.contains(si.columnName)) {
               info = si;
               break;
             }
           }
 
           Preconditions.checkArgument(info != null,
-              "AssertionError: no source info for the column: " + columnName);
+              "AssertionError: no source info for the column: " +
+                  Arrays.toString(columnNames.toArray()));
 
           // Read fields
           while (in.available() > 0) {
@@ -172,7 +178,8 @@ public class SparkDynamicPartitionPruner {
   private void prunePartitionSingleSource(SourceInfo info, MapWork work)
       throws HiveException {
     Set<Object> values = info.values;
-    String columnName = info.columnName;
+    // strip the column name of the targetId
+    String columnName = info.columnName.substring(info.columnName.indexOf(':') + 1);
 
     ObjectInspector oi =
         PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory

http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index cf355d4..46c1a12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -460,9 +460,7 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
       desc.setTableScan(ts);
       desc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils
           .getFieldSchemasFromColumnList(keyExprs, "key")));
-      desc.setTargetColumnName(column);
-      desc.setTargetColumnType(columnType);
-      desc.setPartKey(partKey);
+      desc.addTarget(column, columnType, partKey, null);
       OperatorFactory.getAndMakeChild(desc, groupByOp);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java
index 4d5c234..bcd3825 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java
@@ -122,7 +122,7 @@ public class SparkDynamicPartitionPruningResolver implements PhysicalPlanResolve
     targetMapWork.getEventSourceTableDescMap().get(sourceWorkId).remove(pruningSinkDesc.getTable());
     targetMapWork.getEventSourceColumnNameMap().get(sourceWorkId).remove(pruningSinkDesc.getTargetColumnName());
     targetMapWork.getEventSourceColumnTypeMap().get(sourceWorkId).remove(pruningSinkDesc.getTargetColumnType());
-    targetMapWork.getEventSourcePartKeyExprMap().get(sourceWorkId).remove(pruningSinkDesc.getPartKey());
+    targetMapWork.getEventSourcePartKeyExprMap().get(sourceWorkId).remove(pruningSinkDesc.getTargetPartKey());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
index 2641c1a..988579e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
@@ -35,8 +35,10 @@ import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -82,13 +84,14 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
       }
     };
 
+    // maps from a work to the DPPs it contains
+    private Map<BaseWork, List<SparkPartitionPruningSinkDesc>> workToDpps = new HashMap<>();
+
     @Override
     public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs) throws SemanticException {
       if (nd instanceof SparkTask) {
         SparkTask sparkTask = (SparkTask) nd;
         SparkWork sparkWork = sparkTask.getWork();
-        Set<BaseWork> roots = sparkWork.getRoots();
-        compareWorksRecursively(roots, sparkWork);
         // For dpp case, dpp sink will appear in Task1 and the target work of dpp sink will appear in Task2.
         // Task2 is the child task of Task1. Task2 will be traversed before task1 because TaskGraphWalker will first
         // put children task in the front of task queue.
@@ -100,11 +103,15 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
             removeEmptySparkTask(sparkTask);
           }
         }
+
+        Set<BaseWork> roots = sparkWork.getRoots();
+        compareWorksRecursively(roots, sparkWork);
       }
       return null;
     }
 
     private void compareWorksRecursively(Set<BaseWork> works, SparkWork sparkWork) {
+      workToDpps.clear();
       // find out all equivalent works in the Set.
       Set<Set<BaseWork>> equivalentWorks = compareChildWorks(works, sparkWork);
       // combine equivalent work into single one in SparkWork's work graph.
@@ -154,14 +161,72 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
       return false;
     }
 
+    // merge the second into the first
+    private void combineEquivalentDPPSinks(SparkPartitionPruningSinkDesc first,
+        SparkPartitionPruningSinkDesc second, String firstId, String secondId) {
+      MapWork target2 = second.getTargetMapWork();
+
+      first.addTarget(second.getTargetColumnName(), second.getTargetColumnType(),
+          second.getTargetPartKey(), target2);
+
+      // update the target map work of the second
+      target2.setTmpPathForPartitionPruning(first.getTmpPathOfTargetWork());
+
+      List<ExprNodeDesc> partKey = target2.getEventSourcePartKeyExprMap().get(secondId);
+      partKey.remove(second.getTargetPartKey());
+      if (partKey.isEmpty()) {
+        target2.getEventSourcePartKeyExprMap().remove(secondId);
+      }
+      List<ExprNodeDesc> newPartKey = target2.getEventSourcePartKeyExprMap().computeIfAbsent(
+          firstId, v -> new ArrayList<>());
+      newPartKey.add(second.getTargetPartKey());
+
+      List<TableDesc> tableDesc = target2.getEventSourceTableDescMap().get(secondId);
+      tableDesc.remove(second.getTable());
+      if (tableDesc.isEmpty()) {
+        target2.getEventSourceTableDescMap().remove(secondId);
+      }
+      List<TableDesc> newTableDesc = target2.getEventSourceTableDescMap().computeIfAbsent(
+          firstId, v -> new ArrayList<>());
+      newTableDesc.add(second.getTable());
+
+      List<String> columnName = target2.getEventSourceColumnNameMap().get(secondId);
+      columnName.remove(second.getTargetColumnName());
+      if (columnName.isEmpty()) {
+        target2.getEventSourceColumnNameMap().remove(secondId);
+      }
+      List<String> newColumnName = target2.getEventSourceColumnNameMap().computeIfAbsent(
+          firstId, v -> new ArrayList<>());
+      newColumnName.add(second.getTargetColumnName());
+
+      List<String> columnType = target2.getEventSourceColumnTypeMap().get(secondId);
+      columnType.remove(second.getTargetColumnType());
+      if (columnType.isEmpty()) {
+        target2.getEventSourceColumnTypeMap().remove(secondId);
+      }
+      List<String> newColumnType = target2.getEventSourceColumnTypeMap().computeIfAbsent(
+          firstId, v -> new ArrayList<>());
+      newColumnType.add(second.getTargetColumnType());
+    }
+
     private Set<BaseWork> combineEquivalentWorks(Set<Set<BaseWork>> equivalentWorks, SparkWork sparkWork) {
       Set<BaseWork> removedWorks = Sets.newHashSet();
       for (Set<BaseWork> workSet : equivalentWorks) {
         if (workSet.size() > 1) {
           Iterator<BaseWork> iterator = workSet.iterator();
           BaseWork first = iterator.next();
+          List<SparkPartitionPruningSinkDesc> dppList1 = workToDpps.get(first);
+          String firstId = SparkUtilities.getWorkId(first);
           while (iterator.hasNext()) {
             BaseWork next = iterator.next();
+            if (dppList1 != null) {
+              List<SparkPartitionPruningSinkDesc> dppList2 = workToDpps.get(next);
+              // equivalent works must have dpp lists of same size
+              for (int i = 0; i < dppList1.size(); i++) {
+                combineEquivalentDPPSinks(dppList1.get(i), dppList2.get(i),
+                    firstId, SparkUtilities.getWorkId(next));
+              }
+            }
             replaceWork(next, first, sparkWork);
             removedWorks.add(next);
           }
@@ -231,7 +296,14 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
       // leave work's output may be read in further SparkWork/FetchWork, we should not combine
       // leave works without notifying further SparkWork/FetchWork.
       if (sparkWork.getLeaves().contains(first) && sparkWork.getLeaves().contains(second)) {
-        return false;
+        Set<Operator<? extends OperatorDesc>> leafOps = first.getAllLeafOperators();
+        leafOps.addAll(second.getAllLeafOperators());
+        for (Operator operator : leafOps) {
+          // we know how to handle DPP sinks
+          if (!(operator instanceof SparkPartitionPruningSinkOperator)) {
+            return false;
+          }
+        }
       }
 
       // need to check paths and partition desc for MapWorks
@@ -248,9 +320,10 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
       Iterator<Operator<?>> firstIterator = firstRootOperators.iterator();
       Iterator<Operator<?>> secondIterator = secondRootOperators.iterator();
       while (firstIterator.hasNext()) {
-        boolean result = compareOperatorChain(firstIterator.next(), secondIterator.next());
+        boolean result = compareOperatorChain(firstIterator.next(), secondIterator.next(),
+            first, second);
         if (!result) {
-          return result;
+          return false;
         }
       }
 
@@ -290,10 +363,11 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
       return result;
     }
 
-    private boolean compareOperatorChain(Operator<?> firstOperator, Operator<?> secondOperator) {
+    private boolean compareOperatorChain(Operator<?> firstOperator, Operator<?> secondOperator,
+        BaseWork first, BaseWork second) {
       boolean result = compareCurrentOperator(firstOperator, secondOperator);
       if (!result) {
-        return result;
+        return false;
       }
 
       List<Operator<? extends OperatorDesc>> firstOperatorChildOperators = firstOperator.getChildOperators();
@@ -302,19 +376,26 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
         return false;
       } else if (firstOperatorChildOperators != null && secondOperatorChildOperators == null) {
         return false;
-      } else if (firstOperatorChildOperators != null && secondOperatorChildOperators != null) {
+      } else if (firstOperatorChildOperators != null) {
         if (firstOperatorChildOperators.size() != secondOperatorChildOperators.size()) {
           return false;
         }
         int size = firstOperatorChildOperators.size();
         for (int i = 0; i < size; i++) {
-          result = compareOperatorChain(firstOperatorChildOperators.get(i), secondOperatorChildOperators.get(i));
+          result = compareOperatorChain(firstOperatorChildOperators.get(i),
+              secondOperatorChildOperators.get(i), first, second);
           if (!result) {
             return false;
           }
         }
       }
 
+      if (firstOperator instanceof SparkPartitionPruningSinkOperator) {
+        List<SparkPartitionPruningSinkDesc> dpps = workToDpps.computeIfAbsent(first, k -> new ArrayList<>());
+        dpps.add(((SparkPartitionPruningSinkOperator) firstOperator).getConf());
+        dpps = workToDpps.computeIfAbsent(second, k -> new ArrayList<>());
+        dpps.add(((SparkPartitionPruningSinkOperator) secondOperator).getConf());
+      }
       return true;
     }
 
@@ -347,9 +428,9 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
           SparkUtilities.collectOp(pruningList, root, SparkPartitionPruningSinkOperator.class);
           for (Operator pruneSinkOp : pruningList) {
             SparkPartitionPruningSinkOperator sparkPruneSinkOp = (SparkPartitionPruningSinkOperator) pruneSinkOp;
-            if (removedMapWorkList.contains(sparkPruneSinkOp.getConf().getTargetWork())) {
+            if (removedMapWorkList.contains(sparkPruneSinkOp.getConf().getTargetMapWork().getName())) {
               LOG.debug("ready to remove the sparkPruneSinkOp which target work is " +
-                  sparkPruneSinkOp.getConf().getTargetWork() + " because the MapWork is equals to other map work and " +
+                  sparkPruneSinkOp.getConf().getTargetWorks() + " because the MapWork is equals to other map work and " +
                   "has been deleted!");
               // If there is branch, remove prune sink operator branch in the baseWork
               // If there is no branch, remove the whole baseWork

http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
index baf85cf..1329e13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,91 +18,120 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
 import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 @Explain(displayName = "Spark Partition Pruning Sink Operator")
 public class SparkPartitionPruningSinkDesc extends AbstractOperatorDesc {
 
-  // column in the target table that will be pruned against
-  private String targetColumnName;
+  public static class DPPTargetInfo {
+    // column in the target table that will be pruned against
+    public String columnName;
+    // type of target column
+    public String columnType;
+    // the partition column we're interested in
+    public ExprNodeDesc partKey;
+    public MapWork work;
+
+    DPPTargetInfo(String columnName, String columnType, ExprNodeDesc partKey, MapWork work) {
+      this.columnName = columnName;
+      this.columnType = columnType;
+      this.partKey = partKey;
+      this.work = work;
+    }
+  }
 
-  // type of target column
-  private String targetColumnType;
+  private List<DPPTargetInfo> targetInfos = new ArrayList<>();
 
   private TableDesc table;
 
   private transient TableScanOperator tableScan;
 
-  // the partition column we're interested in
-  private ExprNodeDesc partKey;
-
   private Path path;
 
-  private MapWork targetMapWork;
+  public List<DPPTargetInfo> getTargetInfos() {
+    return targetInfos;
+  }
 
-  @Explain(displayName = "tmp Path", explainLevels = { Explain.Level.EXTENDED })
-  public Path getPath() {
-    return path;
+  private void assertSingleTarget() {
+    Preconditions.checkState(targetInfos.size() < 2, "The DPP sink has multiple targets.");
   }
 
-  public void setPath(Path path) {
-    this.path = path;
+  public String getTargetColumnName() {
+    assertSingleTarget();
+    return targetInfos.isEmpty() ? null : targetInfos.get(0).columnName;
   }
 
-  @Explain(displayName = "target work")
-  public String getTargetWork() {
-    return this.targetMapWork.getName();
+  public String getTargetColumnType() {
+    assertSingleTarget();
+    return targetInfos.isEmpty() ? null : targetInfos.get(0).columnType;
   }
 
-  public MapWork getTargetMapWork() {
-    return this.targetMapWork;
+  public ExprNodeDesc getTargetPartKey() {
+    assertSingleTarget();
+    return targetInfos.isEmpty() ? null : targetInfos.get(0).partKey;
   }
 
-  public void setTargetMapWork(MapWork targetMapWork) {
-    this.targetMapWork = targetMapWork;
+  public MapWork getTargetMapWork() {
+    assertSingleTarget();
+    return targetInfos.isEmpty() ? null : targetInfos.get(0).work;
   }
 
-  public TableScanOperator getTableScan() {
-    return tableScan;
+  public void addTarget(String colName, String colType, ExprNodeDesc partKey, MapWork mapWork) {
+    targetInfos.add(new DPPTargetInfo(colName, colType, partKey, mapWork));
   }
 
-  public void setTableScan(TableScanOperator tableScan) {
-    this.tableScan = tableScan;
+  public void setTargetMapWork(MapWork mapWork) {
+    Preconditions.checkState(targetInfos.size() == 1,
+        "The DPP sink should have exactly one target.");
+    targetInfos.get(0).work = mapWork;
+    // in order to make the col name unique, prepend the targetId
+    targetInfos.get(0).columnName = SparkUtilities.getWorkId(mapWork) + ":" +
+        targetInfos.get(0).columnName;
   }
 
-  @Explain(displayName = "Target column")
-  public String displayTargetColumn() {
-    return targetColumnName + " (" + targetColumnType + ")";
+  Path getTmpPathOfTargetWork() {
+    return targetInfos.isEmpty() ? null : targetInfos.get(0).work.getTmpPathForPartitionPruning();
   }
 
-  public String getTargetColumnName() {
-    return targetColumnName;
+  @Explain(displayName = "tmp Path", explainLevels = {Explain.Level.EXTENDED})
+  public Path getPath() {
+    return path;
   }
 
-  public void setTargetColumnName(String targetColumnName) {
-    this.targetColumnName = targetColumnName;
+  public void setPath(Path path) {
+    this.path = path;
   }
 
-  public String getTargetColumnType() {
-    return targetColumnType;
+  @Explain(displayName = "target works")
+  public String getTargetWorks() {
+    return Arrays.toString(targetInfos.stream().map(info -> info.work.getName()).toArray());
   }
 
-  public void setTargetColumnType(String columnType) {
-    this.targetColumnType = columnType;
+  public TableScanOperator getTableScan() {
+    return tableScan;
   }
 
-  public ExprNodeDesc getPartKey() {
-    return partKey;
+  public void setTableScan(TableScanOperator tableScan) {
+    this.tableScan = tableScan;
   }
 
-  public void setPartKey(ExprNodeDesc partKey) {
-    this.partKey = partKey;
+  @Explain(displayName = "Target column")
+  public String displayTargetColumns() {
+    return Arrays.toString(targetInfos.stream().map(
+        info -> info.columnName + " (" + info.columnType + ")").toArray());
   }
 
   public TableDesc getTable() {
@@ -114,7 +143,17 @@ public class SparkPartitionPruningSinkDesc extends AbstractOperatorDesc {
   }
 
   @Explain(displayName = "partition key expr")
-  public String getPartKeyString() {
-    return partKey.getExprString();
+  public String getPartKeyStrings() {
+    return Arrays.toString(targetInfos.stream().map(
+        info -> info.partKey.getExprString()).toArray());
+  }
+
+  @Override
+  public boolean isSame(OperatorDesc other) {
+    if (getClass().getName().equals(other.getClass().getName())) {
+      SparkPartitionPruningSinkDesc otherDesc = (SparkPartitionPruningSinkDesc) other;
+      return getTable().equals(otherDesc.getTable());
+    }
+    return false;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 24c8baf..9bea4dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -505,7 +505,7 @@ public class GenSparkUtils {
       targetWork.getEventSourcePartKeyExprMap().put(sourceId, new LinkedList<ExprNodeDesc>());
     }
     List<ExprNodeDesc> keys = targetWork.getEventSourcePartKeyExprMap().get(sourceId);
-    keys.add(desc.getPartKey());
+    keys.add(desc.getTargetPartKey());
   }
 
   public static SparkEdgeProperty getEdgeProperty(HiveConf conf, ReduceSinkOperator reduceSink,

http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
index e3146cf..bd9de09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
@@ -166,7 +166,10 @@ public class SparkPartitionPruningSinkOperator extends Operator<SparkPartitionPr
     try {
       fsout = fs.create(path, numOfRepl);
       out = new ObjectOutputStream(new BufferedOutputStream(fsout));
-      out.writeUTF(conf.getTargetColumnName());
+      out.writeInt(conf.getTargetInfos().size());
+      for (SparkPartitionPruningSinkDesc.DPPTargetInfo info : conf.getTargetInfos()) {
+        out.writeUTF(info.columnName);
+      }
       buffer.writeTo(out);
     } catch (Exception e) {
       try {

http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q
new file mode 100644
index 0000000..240128f
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q
@@ -0,0 +1,157 @@
+set hive.spark.dynamic.partition.pruning=true;
+set hive.combine.equivalent.work.optimization=true;
+
+-- This qfile tests whether equivalent DPP sink works are combined.
+-- When combined, one DPP sink operator will have multiple target columns/works.
+
+-- SORT_QUERY_RESULTS
+
+create table part1(key string, value string) partitioned by (p string, q string);
+insert into table part1 partition (p='1', q='1') values ('1','1'), ('2','2');
+insert into table part1 partition (p='1', q='2') values ('3','3'), ('4','4');
+insert into table part1 partition (p='2', q='1') values ('5','5'), ('6','6');
+insert into table part1 partition (p='2', q='2') values ('7','7'), ('8','8');
+
+create table part2(key string, value string) partitioned by (p string, q string);
+insert into table part2 partition (p='3', q='3') values ('a','a'), ('b','b');
+insert into table part2 partition (p='3', q='4') values ('c','c'), ('d','d');
+insert into table part2 partition (p='4', q='3') values ('e','e'), ('f','f');
+insert into table part2 partition (p='4', q='4') values ('g','g'), ('h','h');
+
+-- dpp works should be combined
+explain
+select * from
+  (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+  (select part2.key, part2.value from part2 join src on part2.p=src.key);
+
+select * from
+  (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+  (select part2.key, part2.value from part2 join src on part2.p=src.key);
+
+-- verify result
+set hive.spark.dynamic.partition.pruning=false;
+
+select * from
+  (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+  (select part2.key, part2.value from part2 join src on part2.p=src.key);
+
+set hive.spark.dynamic.partition.pruning=true;
+
+-- dpp works should be combined
+explain
+select * from
+  (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+  (select part2.key, part2.value from part2 join src on part2.q=src.key);
+
+select * from
+  (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+  (select part2.key, part2.value from part2 join src on part2.q=src.key);
+
+-- verify result
+set hive.spark.dynamic.partition.pruning=false;
+
+select * from
+  (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+  (select part2.key, part2.value from part2 join src on part2.q=src.key);
+
+set hive.spark.dynamic.partition.pruning=true;
+
+-- target works are already combined
+explain
+select * from
+  (select part1.key, part1.value from part1 join src on part1.q=src.key) a
+union all
+  (select part1.key, part1.value from part1 join src on part1.q=src.key);
+
+select * from
+  (select part1.key, part1.value from part1 join src on part1.q=src.key) a
+union all
+  (select part1.key, part1.value from part1 join src on part1.q=src.key);
+
+-- dpp works shouldn't be combined
+explain
+select * from
+  (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+  (select part2.key, part2.value from part2 join src on part2.p=src.value);
+
+-- dpp works shouldn't be combined
+explain
+select * from
+  (select part1.key, part1.value from part1 join src on part1.p=upper(src.key)) a
+union all
+  (select part2.key, part2.value from part2 join src on part2.p=src.key);
+
+-- dpp works should be combined
+explain
+with top as
+(select key from src order by key limit 200)
+select * from
+  (select part1.key, part1.value from part1 join top on part1.q=top.key) a
+union all
+  (select part2.key, part2.value from part2 join top on part2.q=top.key);
+  
+with top as
+(select key from src order by key limit 200)
+select * from
+  (select part1.key, part1.value from part1 join top on part1.q=top.key) a
+union all
+  (select part2.key, part2.value from part2 join top on part2.q=top.key);
+  
+-- verify result
+set hive.spark.dynamic.partition.pruning=false;
+
+with top as
+(select key from src order by key limit 200)
+select * from
+  (select part1.key, part1.value from part1 join top on part1.q=top.key) a
+union all
+  (select part2.key, part2.value from part2 join top on part2.q=top.key);
+  
+set hive.spark.dynamic.partition.pruning=true;
+
+-- dpp works should be combined
+explain
+with top as
+(select key, value from src order by key, value limit 200)
+select * from
+  (select part1.key, part1.value from part1 join top on part1.p=top.key and part1.q=top.key) a
+union all
+  (select part2.key, part2.value from part2 join top on part2.p=top.key and part2.q=top.key);
+
+with top as
+(select key, value from src order by key, value limit 200)
+select * from
+  (select part1.key, part1.value from part1 join top on part1.p=top.key and part1.q=top.key) a
+union all
+  (select part2.key, part2.value from part2 join top on part2.p=top.key and part2.q=top.key);
+
+-- verify result
+set hive.spark.dynamic.partition.pruning=false;
+
+with top as
+(select key, value from src order by key, value limit 200)
+select * from
+  (select part1.key, part1.value from part1 join top on part1.p=top.key and part1.q=top.key) a
+union all
+  (select part2.key, part2.value from part2 join top on part2.p=top.key and part2.q=top.key);
+
+set hive.spark.dynamic.partition.pruning=true;
+
+-- dpp works shouldn't be combined
+explain
+with top as
+(select key, value from src order by key, value limit 200)
+select * from
+  (select part1.key, part1.value from part1 join top on part1.p=top.key and part1.q=top.key) a
+union all
+  (select part2.key, part2.value from part2 join top on part2.p=top.key and part2.q=top.value);
+
+drop table part1;
+drop table part2;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
index 95efa39..7e97988 100644
--- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
@@ -227,10 +227,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -483,10 +483,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: day(ds)
+                            Target column: [1:ds (string)]
+                            partition key expr: [day(ds)]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -730,10 +730,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: abs(((- UDFToLong(concat(UDFToString(day(ds)), '0'))) + 10))
+                            Target column: [1:ds (string)]
+                            partition key expr: [abs(((- UDFToLong(concat(UDFToString(day(ds)), '0'))) + 10))]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -873,10 +873,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: CAST( UDFToShort(day(ds)) AS decimal(10,0))
+                            Target column: [1:ds (string)]
+                            partition key expr: [CAST( UDFToShort(day(ds)) AS decimal(10,0))]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -1015,10 +1015,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
         Map 8 
             Map Operator Tree:
                 TableScan
@@ -1042,10 +1042,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: hr
+                            Target column: [1:hr (string)]
+                            partition key expr: [hr]
                             Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -1377,10 +1377,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
                       Select Operator
                         expressions: _col2 (type: string)
                         outputColumnNames: _col0
@@ -1391,10 +1391,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: hr
+                            Target column: [1:hr (string)]
+                            partition key expr: [hr]
                             Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -1646,10 +1646,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -1902,10 +1902,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: UDFToDouble(hr)
+                            Target column: [1:hr (string)]
+                            partition key expr: [UDFToDouble(hr)]
                             Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -2042,10 +2042,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: (UDFToDouble(hr) * 2.0)
+                            Target column: [1:hr (string)]
+                            partition key expr: [(UDFToDouble(hr) * 2.0)]
                             Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -2405,10 +2405,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: UDFToString((UDFToDouble(hr) * 2.0))
+                            Target column: [1:hr (string)]
+                            partition key expr: [UDFToString((UDFToDouble(hr) * 2.0))]
                             Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -2787,10 +2787,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
                       Select Operator
                         expressions: _col2 (type: string)
                         outputColumnNames: _col0
@@ -2801,10 +2801,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: hr
+                            Target column: [1:hr (string)]
+                            partition key expr: [hr]
                             Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -2940,10 +2940,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -3061,10 +3061,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [4:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 4
+                            target works: [Map 4]
 
   Stage: Stage-1
     Spark
@@ -3181,10 +3181,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -3303,10 +3303,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
         Map 8 
             Map Operator Tree:
                 TableScan
@@ -3330,10 +3330,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: hr
+                            Target column: [1:hr (string)]
+                            partition key expr: [hr]
                             Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -3688,10 +3688,10 @@ STAGE PLANS:
                         outputColumnNames: _col0
                         Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
                         Spark Partition Pruning Sink Operator
-                          Target column: ds (string)
-                          partition key expr: ds
+                          Target column: [1:ds (string)]
+                          partition key expr: [ds]
                           Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
-                          target work: Map 1
+                          target works: [Map 1]
         Reducer 9 
             Reduce Operator Tree:
               Group By Operator
@@ -3717,10 +3717,10 @@ STAGE PLANS:
                         outputColumnNames: _col0
                         Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
                         Spark Partition Pruning Sink Operator
-                          Target column: ds (string)
-                          partition key expr: ds
+                          Target column: [1:ds (string)]
+                          partition key expr: [ds]
                           Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
-                          target work: Map 1
+                          target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -3956,10 +3956,10 @@ STAGE PLANS:
                         outputColumnNames: _col0
                         Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
                         Spark Partition Pruning Sink Operator
-                          Target column: ds (string)
-                          partition key expr: ds
+                          Target column: [1:ds (string)]
+                          partition key expr: [ds]
                           Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
-                          target work: Map 1
+                          target works: [Map 1]
         Reducer 9 
             Reduce Operator Tree:
               Group By Operator
@@ -3985,10 +3985,10 @@ STAGE PLANS:
                         outputColumnNames: _col0
                         Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
                         Spark Partition Pruning Sink Operator
-                          Target column: ds (string)
-                          partition key expr: ds
+                          Target column: [1:ds (string)]
+                          partition key expr: [ds]
                           Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
-                          target work: Map 1
+                          target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -4227,10 +4227,10 @@ STAGE PLANS:
                         outputColumnNames: _col0
                         Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
                         Spark Partition Pruning Sink Operator
-                          Target column: ds (string)
-                          partition key expr: ds
+                          Target column: [1:ds (string)]
+                          partition key expr: [ds]
                           Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
-                          target work: Map 1
+                          target works: [Map 1]
         Reducer 13 
             Reduce Operator Tree:
               Group By Operator
@@ -4256,10 +4256,10 @@ STAGE PLANS:
                         outputColumnNames: _col0
                         Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
                         Spark Partition Pruning Sink Operator
-                          Target column: ds (string)
-                          partition key expr: ds
+                          Target column: [1:ds (string)]
+                          partition key expr: [ds]
                           Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
-                          target work: Map 1
+                          target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -4459,10 +4459,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -4592,10 +4592,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: day(ds)
+                            Target column: [1:ds (string)]
+                            partition key expr: [day(ds)]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -4718,10 +4718,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
         Map 4 
@@ -4751,10 +4751,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: hr
+                            Target column: [1:hr (string)]
+                            partition key expr: [hr]
                             Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -4897,10 +4897,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
                       Select Operator
                         expressions: _col2 (type: string)
                         outputColumnNames: _col0
@@ -4911,10 +4911,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: hr
+                            Target column: [1:hr (string)]
+                            partition key expr: [hr]
                             Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -5043,10 +5043,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -5167,10 +5167,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: UDFToDouble(hr)
+                            Target column: [1:hr (string)]
+                            partition key expr: [UDFToDouble(hr)]
                             Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -5291,10 +5291,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: (UDFToDouble(hr) * 2.0)
+                            Target column: [1:hr (string)]
+                            partition key expr: [(UDFToDouble(hr) * 2.0)]
                             Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -5553,10 +5553,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -5840,10 +5840,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
         Map 4 
@@ -5873,10 +5873,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (string)
-                            partition key expr: hr
+                            Target column: [1:hr (string)]
+                            partition key expr: [hr]
                             Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -6189,10 +6189,10 @@ STAGE PLANS:
                         outputColumnNames: _col0
                         Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
                         Spark Partition Pruning Sink Operator
-                          Target column: ds (string)
-                          partition key expr: ds
+                          Target column: [1:ds (string)]
+                          partition key expr: [ds]
                           Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
-                          target work: Map 1
+                          target works: [Map 1]
         Reducer 9 
             Reduce Operator Tree:
               Group By Operator
@@ -6218,10 +6218,10 @@ STAGE PLANS:
                         outputColumnNames: _col0
                         Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
                         Spark Partition Pruning Sink Operator
-                          Target column: ds (string)
-                          partition key expr: ds
+                          Target column: [1:ds (string)]
+                          partition key expr: [ds]
                           Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
-                          target work: Map 1
+                          target works: [Map 1]
 
   Stage: Stage-1
     Spark
@@ -6458,10 +6458,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 2 Data size: 54 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: ds (string)
-                            partition key expr: ds
+                            Target column: [1:ds (string)]
+                            partition key expr: [ds]
                             Statistics: Num rows: 2 Data size: 54 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
                       Select Operator
                         expressions: UDFToDouble(_col2) (type: double)
                         outputColumnNames: _col0
@@ -6472,10 +6472,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 2 Data size: 54 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: hr (int)
-                            partition key expr: UDFToDouble(hr)
+                            Target column: [1:hr (int)]
+                            partition key expr: [UDFToDouble(hr)]
                             Statistics: Num rows: 2 Data size: 54 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 

http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
index cc7819c..075aaff 100644
--- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
@@ -184,10 +184,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 2 Data size: 10 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: dim_shops_id (int)
-                            partition key expr: dim_shops_id
+                            Target column: [1:dim_shops_id (int)]
+                            partition key expr: [dim_shops_id]
                             Statistics: Num rows: 2 Data size: 10 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -726,10 +726,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 2 Data size: 10 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: dim_shops_id (int)
-                            partition key expr: dim_shops_id
+                            Target column: [1:dim_shops_id (int)]
+                            partition key expr: [dim_shops_id]
                             Statistics: Num rows: 2 Data size: 10 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -887,10 +887,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: dim_shops_id (int)
-                            partition key expr: dim_shops_id
+                            Target column: [1:dim_shops_id (int)]
+                            partition key expr: [dim_shops_id]
                             Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -925,10 +925,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: dim_shops_id (int)
-                            partition key expr: dim_shops_id
+                            Target column: [3:dim_shops_id (int)]
+                            partition key expr: [dim_shops_id]
                             Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 3
+                            target works: [Map 3]
             Local Work:
               Map Reduce Local Work
 

http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out
index cad45e1..53f25ce 100644
--- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out
@@ -331,10 +331,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: part_col (int)
-                            partition key expr: part_col
+                            Target column: [4:part_col (int)]
+                            partition key expr: [part_col]
                             Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 4
+                            target works: [Map 4]
             Local Work:
               Map Reduce Local Work
 
@@ -383,10 +383,10 @@ STAGE PLANS:
                               outputColumnNames: _col0
                               Statistics: Num rows: 4 Data size: 4 Basic stats: COMPLETE Column stats: NONE
                               Spark Partition Pruning Sink Operator
-                                Target column: part_col (int)
-                                partition key expr: part_col
+                                Target column: [1:part_col (int)]
+                                partition key expr: [part_col]
                                 Statistics: Num rows: 4 Data size: 4 Basic stats: COMPLETE Column stats: NONE
-                                target work: Map 1
+                                target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -524,10 +524,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: part_col (int)
-                            partition key expr: part_col
+                            Target column: [1:part_col (int)]
+                            partition key expr: [part_col]
                             Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
         Map 4 
@@ -557,10 +557,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: part_col (int)
-                            partition key expr: part_col
+                            Target column: [1:part_col (int)]
+                            partition key expr: [part_col]
                             Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -700,10 +700,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: part_col (int)
-                            partition key expr: part_col
+                            Target column: [1:part_col (int)]
+                            partition key expr: [part_col]
                             Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -737,10 +737,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: part_col (int)
-                            partition key expr: part_col
+                            Target column: [5:part_col (int)]
+                            partition key expr: [part_col]
                             Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 5
+                            target works: [Map 5]
             Local Work:
               Map Reduce Local Work
 
@@ -920,10 +920,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: part_col (int)
-                            partition key expr: part_col
+                            Target column: [2:part_col (int)]
+                            partition key expr: [part_col]
                             Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 2
+                            target works: [Map 2]
             Local Work:
               Map Reduce Local Work
 
@@ -1054,10 +1054,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: part_col (int)
-                            partition key expr: part_col
+                            Target column: [2:part_col (int)]
+                            partition key expr: [part_col]
                             Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 2
+                            target works: [Map 2]
             Local Work:
               Map Reduce Local Work
 
@@ -1201,10 +1201,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: part_col (int)
-                            partition key expr: part_col
+                            Target column: [1:part_col (int)]
+                            partition key expr: [part_col]
                             Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 1
+                            target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -1340,10 +1340,10 @@ STAGE PLANS:
                         outputColumnNames: _col0
                         Statistics: Num rows: 18 Data size: 18 Basic stats: COMPLETE Column stats: NONE
                         Spark Partition Pruning Sink Operator
-                          Target column: part_col1 (int)
-                          partition key expr: part_col1
+                          Target column: [1:part_col1 (int)]
+                          partition key expr: [part_col1]
                           Statistics: Num rows: 18 Data size: 18 Basic stats: COMPLETE Column stats: NONE
-                          target work: Map 1
+                          target works: [Map 1]
                     Select Operator
                       expressions: _col1 (type: int)
                       outputColumnNames: _col0
@@ -1354,10 +1354,10 @@ STAGE PLANS:
                         outputColumnNames: _col0
                         Statistics: Num rows: 18 Data size: 18 Basic stats: COMPLETE Column stats: NONE
                         Spark Partition Pruning Sink Operator
-                          Target column: part_col2 (int)
-                          partition key expr: part_col2
+                          Target column: [1:part_col2 (int)]
+                          partition key expr: [part_col2]
                           Statistics: Num rows: 18 Data size: 18 Basic stats: COMPLETE Column stats: NONE
-                          target work: Map 1
+                          target works: [Map 1]
             Local Work:
               Map Reduce Local Work
 
@@ -1493,10 +1493,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: part_col (int)
-                            partition key expr: part_col
+                            Target column: [3:part_col (int)]
+                            partition key expr: [part_col]
                             Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 3
+                            target works: [Map 3]
             Local Work:
               Map Reduce Local Work
         Map 2 
@@ -1526,10 +1526,10 @@ STAGE PLANS:
                           outputColumnNames: _col0
                           Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE Column stats: NONE
                           Spark Partition Pruning Sink Operator
-                            Target column: part_col (int)
-                            partition key expr: part_col
+                            Target column: [3:part_col (int)]
+                            partition key expr: [part_col]
                             Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE Column stats: NONE
-                            target work: Map 3
+                            target works: [Map 3]
             Local Work:
               Map Reduce Local Work