You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2017/11/06 02:17:52 UTC
[3/3] hive git commit: HIVE-17877: HoS: combine equivalent DPP sink
works (Rui reviewed by Sahil)
HIVE-17877: HoS: combine equivalent DPP sink works (Rui reviewed by Sahil)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aaacda47
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aaacda47
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aaacda47
Branch: refs/heads/master
Commit: aaacda474d881cef14f3d7c1ff5da32c28c5663f
Parents: 49f6814
Author: Rui Li <li...@apache.org>
Authored: Mon Nov 6 10:17:44 2017 +0800
Committer: Rui Li <li...@apache.org>
Committed: Mon Nov 6 10:17:44 2017 +0800
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 1 +
.../exec/spark/SparkDynamicPartitionPruner.java | 15 +-
.../DynamicPartitionPruningOptimization.java | 4 +-
.../SparkDynamicPartitionPruningResolver.java | 2 +-
.../spark/CombineEquivalentWorkResolver.java | 103 +-
.../spark/SparkPartitionPruningSinkDesc.java | 127 +-
.../hive/ql/parse/spark/GenSparkUtils.java | 2 +-
.../SparkPartitionPruningSinkOperator.java | 5 +-
.../spark_dynamic_partition_pruning_4.q | 157 ++
.../spark/spark_dynamic_partition_pruning.q.out | 246 +--
.../spark_dynamic_partition_pruning_2.q.out | 24 +-
.../spark_dynamic_partition_pruning_3.q.out | 78 +-
.../spark_dynamic_partition_pruning_4.q.out | 1891 ++++++++++++++++++
...dynamic_partition_pruning_mapjoin_only.q.out | 18 +-
...k_vectorized_dynamic_partition_pruning.q.out | 234 +--
15 files changed, 2542 insertions(+), 365 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index 9642697..42c17f4 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1461,6 +1461,7 @@ miniSparkOnYarn.only.query.files=spark_combine_equivalent_work.q,\
spark_dynamic_partition_pruning.q,\
spark_dynamic_partition_pruning_2.q,\
spark_dynamic_partition_pruning_3.q,\
+ spark_dynamic_partition_pruning_4.q,\
spark_dynamic_partition_pruning_mapjoin_only.q,\
spark_constprog_dpp.q,\
spark_dynamic_partition_pruning_recursive_mapjoin.q,\
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
index 2d3d756..e8c019a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkDynamicPartitionPruner.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.spark;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -124,18 +125,23 @@ public class SparkDynamicPartitionPruner {
for (FileStatus fstatus : fs.listStatus(sourceDir)) {
LOG.info("Start processing pruning file: " + fstatus.getPath());
in = new ObjectInputStream(fs.open(fstatus.getPath()));
- String columnName = in.readUTF();
+ final int numName = in.readInt();
SourceInfo info = null;
+ Set<String> columnNames = new HashSet<>();
+ for (int i = 0; i < numName; i++) {
+ columnNames.add(in.readUTF());
+ }
for (SourceInfo si : sourceInfoMap.get(name)) {
- if (columnName.equals(si.columnName)) {
+ if (columnNames.contains(si.columnName)) {
info = si;
break;
}
}
Preconditions.checkArgument(info != null,
- "AssertionError: no source info for the column: " + columnName);
+ "AssertionError: no source info for the column: " +
+ Arrays.toString(columnNames.toArray()));
// Read fields
while (in.available() > 0) {
@@ -172,7 +178,8 @@ public class SparkDynamicPartitionPruner {
private void prunePartitionSingleSource(SourceInfo info, MapWork work)
throws HiveException {
Set<Object> values = info.values;
- String columnName = info.columnName;
+ // strip the column name of the targetId
+ String columnName = info.columnName.substring(info.columnName.indexOf(':') + 1);
ObjectInspector oi =
PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(TypeInfoFactory
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
index cf355d4..46c1a12 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/DynamicPartitionPruningOptimization.java
@@ -460,9 +460,7 @@ public class DynamicPartitionPruningOptimization implements NodeProcessor {
desc.setTableScan(ts);
desc.setTable(PlanUtils.getReduceValueTableDesc(PlanUtils
.getFieldSchemasFromColumnList(keyExprs, "key")));
- desc.setTargetColumnName(column);
- desc.setTargetColumnType(columnType);
- desc.setPartKey(partKey);
+ desc.addTarget(column, columnType, partKey, null);
OperatorFactory.getAndMakeChild(desc, groupByOp);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java
index 4d5c234..bcd3825 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SparkDynamicPartitionPruningResolver.java
@@ -122,7 +122,7 @@ public class SparkDynamicPartitionPruningResolver implements PhysicalPlanResolve
targetMapWork.getEventSourceTableDescMap().get(sourceWorkId).remove(pruningSinkDesc.getTable());
targetMapWork.getEventSourceColumnNameMap().get(sourceWorkId).remove(pruningSinkDesc.getTargetColumnName());
targetMapWork.getEventSourceColumnTypeMap().get(sourceWorkId).remove(pruningSinkDesc.getTargetColumnType());
- targetMapWork.getEventSourcePartKeyExprMap().get(sourceWorkId).remove(pruningSinkDesc.getPartKey());
+ targetMapWork.getEventSourcePartKeyExprMap().get(sourceWorkId).remove(pruningSinkDesc.getTargetPartKey());
}
/**
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
index 2641c1a..988579e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/CombineEquivalentWorkResolver.java
@@ -35,8 +35,10 @@ import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
@@ -82,13 +84,14 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
}
};
+ // maps from a work to the DPPs it contains
+ private Map<BaseWork, List<SparkPartitionPruningSinkDesc>> workToDpps = new HashMap<>();
+
@Override
public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs) throws SemanticException {
if (nd instanceof SparkTask) {
SparkTask sparkTask = (SparkTask) nd;
SparkWork sparkWork = sparkTask.getWork();
- Set<BaseWork> roots = sparkWork.getRoots();
- compareWorksRecursively(roots, sparkWork);
// For dpp case, dpp sink will appear in Task1 and the target work of dpp sink will appear in Task2.
// Task2 is the child task of Task1. Task2 will be traversed before task1 because TaskGraphWalker will first
// put children task in the front of task queue.
@@ -100,11 +103,15 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
removeEmptySparkTask(sparkTask);
}
}
+
+ Set<BaseWork> roots = sparkWork.getRoots();
+ compareWorksRecursively(roots, sparkWork);
}
return null;
}
private void compareWorksRecursively(Set<BaseWork> works, SparkWork sparkWork) {
+ workToDpps.clear();
// find out all equivalent works in the Set.
Set<Set<BaseWork>> equivalentWorks = compareChildWorks(works, sparkWork);
// combine equivalent work into single one in SparkWork's work graph.
@@ -154,14 +161,72 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
return false;
}
+ // merge the second into the first
+ private void combineEquivalentDPPSinks(SparkPartitionPruningSinkDesc first,
+ SparkPartitionPruningSinkDesc second, String firstId, String secondId) {
+ MapWork target2 = second.getTargetMapWork();
+
+ first.addTarget(second.getTargetColumnName(), second.getTargetColumnType(),
+ second.getTargetPartKey(), target2);
+
+ // update the target map work of the second
+ target2.setTmpPathForPartitionPruning(first.getTmpPathOfTargetWork());
+
+ List<ExprNodeDesc> partKey = target2.getEventSourcePartKeyExprMap().get(secondId);
+ partKey.remove(second.getTargetPartKey());
+ if (partKey.isEmpty()) {
+ target2.getEventSourcePartKeyExprMap().remove(secondId);
+ }
+ List<ExprNodeDesc> newPartKey = target2.getEventSourcePartKeyExprMap().computeIfAbsent(
+ firstId, v -> new ArrayList<>());
+ newPartKey.add(second.getTargetPartKey());
+
+ List<TableDesc> tableDesc = target2.getEventSourceTableDescMap().get(secondId);
+ tableDesc.remove(second.getTable());
+ if (tableDesc.isEmpty()) {
+ target2.getEventSourceTableDescMap().remove(secondId);
+ }
+ List<TableDesc> newTableDesc = target2.getEventSourceTableDescMap().computeIfAbsent(
+ firstId, v -> new ArrayList<>());
+ newTableDesc.add(second.getTable());
+
+ List<String> columnName = target2.getEventSourceColumnNameMap().get(secondId);
+ columnName.remove(second.getTargetColumnName());
+ if (columnName.isEmpty()) {
+ target2.getEventSourceColumnNameMap().remove(secondId);
+ }
+ List<String> newColumnName = target2.getEventSourceColumnNameMap().computeIfAbsent(
+ firstId, v -> new ArrayList<>());
+ newColumnName.add(second.getTargetColumnName());
+
+ List<String> columnType = target2.getEventSourceColumnTypeMap().get(secondId);
+ columnType.remove(second.getTargetColumnType());
+ if (columnType.isEmpty()) {
+ target2.getEventSourceColumnTypeMap().remove(secondId);
+ }
+ List<String> newColumnType = target2.getEventSourceColumnTypeMap().computeIfAbsent(
+ firstId, v -> new ArrayList<>());
+ newColumnType.add(second.getTargetColumnType());
+ }
+
private Set<BaseWork> combineEquivalentWorks(Set<Set<BaseWork>> equivalentWorks, SparkWork sparkWork) {
Set<BaseWork> removedWorks = Sets.newHashSet();
for (Set<BaseWork> workSet : equivalentWorks) {
if (workSet.size() > 1) {
Iterator<BaseWork> iterator = workSet.iterator();
BaseWork first = iterator.next();
+ List<SparkPartitionPruningSinkDesc> dppList1 = workToDpps.get(first);
+ String firstId = SparkUtilities.getWorkId(first);
while (iterator.hasNext()) {
BaseWork next = iterator.next();
+ if (dppList1 != null) {
+ List<SparkPartitionPruningSinkDesc> dppList2 = workToDpps.get(next);
+ // equivalent works must have dpp lists of same size
+ for (int i = 0; i < dppList1.size(); i++) {
+ combineEquivalentDPPSinks(dppList1.get(i), dppList2.get(i),
+ firstId, SparkUtilities.getWorkId(next));
+ }
+ }
replaceWork(next, first, sparkWork);
removedWorks.add(next);
}
@@ -231,7 +296,14 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
// leave work's output may be read in further SparkWork/FetchWork, we should not combine
// leave works without notifying further SparkWork/FetchWork.
if (sparkWork.getLeaves().contains(first) && sparkWork.getLeaves().contains(second)) {
- return false;
+ Set<Operator<? extends OperatorDesc>> leafOps = first.getAllLeafOperators();
+ leafOps.addAll(second.getAllLeafOperators());
+ for (Operator operator : leafOps) {
+ // we know how to handle DPP sinks
+ if (!(operator instanceof SparkPartitionPruningSinkOperator)) {
+ return false;
+ }
+ }
}
// need to check paths and partition desc for MapWorks
@@ -248,9 +320,10 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
Iterator<Operator<?>> firstIterator = firstRootOperators.iterator();
Iterator<Operator<?>> secondIterator = secondRootOperators.iterator();
while (firstIterator.hasNext()) {
- boolean result = compareOperatorChain(firstIterator.next(), secondIterator.next());
+ boolean result = compareOperatorChain(firstIterator.next(), secondIterator.next(),
+ first, second);
if (!result) {
- return result;
+ return false;
}
}
@@ -290,10 +363,11 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
return result;
}
- private boolean compareOperatorChain(Operator<?> firstOperator, Operator<?> secondOperator) {
+ private boolean compareOperatorChain(Operator<?> firstOperator, Operator<?> secondOperator,
+ BaseWork first, BaseWork second) {
boolean result = compareCurrentOperator(firstOperator, secondOperator);
if (!result) {
- return result;
+ return false;
}
List<Operator<? extends OperatorDesc>> firstOperatorChildOperators = firstOperator.getChildOperators();
@@ -302,19 +376,26 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
return false;
} else if (firstOperatorChildOperators != null && secondOperatorChildOperators == null) {
return false;
- } else if (firstOperatorChildOperators != null && secondOperatorChildOperators != null) {
+ } else if (firstOperatorChildOperators != null) {
if (firstOperatorChildOperators.size() != secondOperatorChildOperators.size()) {
return false;
}
int size = firstOperatorChildOperators.size();
for (int i = 0; i < size; i++) {
- result = compareOperatorChain(firstOperatorChildOperators.get(i), secondOperatorChildOperators.get(i));
+ result = compareOperatorChain(firstOperatorChildOperators.get(i),
+ secondOperatorChildOperators.get(i), first, second);
if (!result) {
return false;
}
}
}
+ if (firstOperator instanceof SparkPartitionPruningSinkOperator) {
+ List<SparkPartitionPruningSinkDesc> dpps = workToDpps.computeIfAbsent(first, k -> new ArrayList<>());
+ dpps.add(((SparkPartitionPruningSinkOperator) firstOperator).getConf());
+ dpps = workToDpps.computeIfAbsent(second, k -> new ArrayList<>());
+ dpps.add(((SparkPartitionPruningSinkOperator) secondOperator).getConf());
+ }
return true;
}
@@ -347,9 +428,9 @@ public class CombineEquivalentWorkResolver implements PhysicalPlanResolver {
SparkUtilities.collectOp(pruningList, root, SparkPartitionPruningSinkOperator.class);
for (Operator pruneSinkOp : pruningList) {
SparkPartitionPruningSinkOperator sparkPruneSinkOp = (SparkPartitionPruningSinkOperator) pruneSinkOp;
- if (removedMapWorkList.contains(sparkPruneSinkOp.getConf().getTargetWork())) {
+ if (removedMapWorkList.contains(sparkPruneSinkOp.getConf().getTargetMapWork().getName())) {
LOG.debug("ready to remove the sparkPruneSinkOp which target work is " +
- sparkPruneSinkOp.getConf().getTargetWork() + " because the MapWork is equals to other map work and " +
+ sparkPruneSinkOp.getConf().getTargetWorks() + " because the MapWork is equals to other map work and " +
"has been deleted!");
// If there is branch, remove prune sink operator branch in the baseWork
// If there is no branch, remove the whole baseWork
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
index baf85cf..1329e13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SparkPartitionPruningSinkDesc.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -18,91 +18,120 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.spark.SparkUtilities;
import org.apache.hadoop.hive.ql.plan.AbstractOperatorDesc;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
@Explain(displayName = "Spark Partition Pruning Sink Operator")
public class SparkPartitionPruningSinkDesc extends AbstractOperatorDesc {
- // column in the target table that will be pruned against
- private String targetColumnName;
+ public static class DPPTargetInfo {
+ // column in the target table that will be pruned against
+ public String columnName;
+ // type of target column
+ public String columnType;
+ // the partition column we're interested in
+ public ExprNodeDesc partKey;
+ public MapWork work;
+
+ DPPTargetInfo(String columnName, String columnType, ExprNodeDesc partKey, MapWork work) {
+ this.columnName = columnName;
+ this.columnType = columnType;
+ this.partKey = partKey;
+ this.work = work;
+ }
+ }
- // type of target column
- private String targetColumnType;
+ private List<DPPTargetInfo> targetInfos = new ArrayList<>();
private TableDesc table;
private transient TableScanOperator tableScan;
- // the partition column we're interested in
- private ExprNodeDesc partKey;
-
private Path path;
- private MapWork targetMapWork;
+ public List<DPPTargetInfo> getTargetInfos() {
+ return targetInfos;
+ }
- @Explain(displayName = "tmp Path", explainLevels = { Explain.Level.EXTENDED })
- public Path getPath() {
- return path;
+ private void assertSingleTarget() {
+ Preconditions.checkState(targetInfos.size() < 2, "The DPP sink has multiple targets.");
}
- public void setPath(Path path) {
- this.path = path;
+ public String getTargetColumnName() {
+ assertSingleTarget();
+ return targetInfos.isEmpty() ? null : targetInfos.get(0).columnName;
}
- @Explain(displayName = "target work")
- public String getTargetWork() {
- return this.targetMapWork.getName();
+ public String getTargetColumnType() {
+ assertSingleTarget();
+ return targetInfos.isEmpty() ? null : targetInfos.get(0).columnType;
}
- public MapWork getTargetMapWork() {
- return this.targetMapWork;
+ public ExprNodeDesc getTargetPartKey() {
+ assertSingleTarget();
+ return targetInfos.isEmpty() ? null : targetInfos.get(0).partKey;
}
- public void setTargetMapWork(MapWork targetMapWork) {
- this.targetMapWork = targetMapWork;
+ public MapWork getTargetMapWork() {
+ assertSingleTarget();
+ return targetInfos.isEmpty() ? null : targetInfos.get(0).work;
}
- public TableScanOperator getTableScan() {
- return tableScan;
+ public void addTarget(String colName, String colType, ExprNodeDesc partKey, MapWork mapWork) {
+ targetInfos.add(new DPPTargetInfo(colName, colType, partKey, mapWork));
}
- public void setTableScan(TableScanOperator tableScan) {
- this.tableScan = tableScan;
+ public void setTargetMapWork(MapWork mapWork) {
+ Preconditions.checkState(targetInfos.size() == 1,
+ "The DPP sink should have exactly one target.");
+ targetInfos.get(0).work = mapWork;
+ // in order to make the col name unique, prepend the targetId
+ targetInfos.get(0).columnName = SparkUtilities.getWorkId(mapWork) + ":" +
+ targetInfos.get(0).columnName;
}
- @Explain(displayName = "Target column")
- public String displayTargetColumn() {
- return targetColumnName + " (" + targetColumnType + ")";
+ Path getTmpPathOfTargetWork() {
+ return targetInfos.isEmpty() ? null : targetInfos.get(0).work.getTmpPathForPartitionPruning();
}
- public String getTargetColumnName() {
- return targetColumnName;
+ @Explain(displayName = "tmp Path", explainLevels = {Explain.Level.EXTENDED})
+ public Path getPath() {
+ return path;
}
- public void setTargetColumnName(String targetColumnName) {
- this.targetColumnName = targetColumnName;
+ public void setPath(Path path) {
+ this.path = path;
}
- public String getTargetColumnType() {
- return targetColumnType;
+ @Explain(displayName = "target works")
+ public String getTargetWorks() {
+ return Arrays.toString(targetInfos.stream().map(info -> info.work.getName()).toArray());
}
- public void setTargetColumnType(String columnType) {
- this.targetColumnType = columnType;
+ public TableScanOperator getTableScan() {
+ return tableScan;
}
- public ExprNodeDesc getPartKey() {
- return partKey;
+ public void setTableScan(TableScanOperator tableScan) {
+ this.tableScan = tableScan;
}
- public void setPartKey(ExprNodeDesc partKey) {
- this.partKey = partKey;
+ @Explain(displayName = "Target column")
+ public String displayTargetColumns() {
+ return Arrays.toString(targetInfos.stream().map(
+ info -> info.columnName + " (" + info.columnType + ")").toArray());
}
public TableDesc getTable() {
@@ -114,7 +143,17 @@ public class SparkPartitionPruningSinkDesc extends AbstractOperatorDesc {
}
@Explain(displayName = "partition key expr")
- public String getPartKeyString() {
- return partKey.getExprString();
+ public String getPartKeyStrings() {
+ return Arrays.toString(targetInfos.stream().map(
+ info -> info.partKey.getExprString()).toArray());
+ }
+
+ @Override
+ public boolean isSame(OperatorDesc other) {
+ if (getClass().getName().equals(other.getClass().getName())) {
+ SparkPartitionPruningSinkDesc otherDesc = (SparkPartitionPruningSinkDesc) other;
+ return getTable().equals(otherDesc.getTable());
+ }
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index 24c8baf..9bea4dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -505,7 +505,7 @@ public class GenSparkUtils {
targetWork.getEventSourcePartKeyExprMap().put(sourceId, new LinkedList<ExprNodeDesc>());
}
List<ExprNodeDesc> keys = targetWork.getEventSourcePartKeyExprMap().get(sourceId);
- keys.add(desc.getPartKey());
+ keys.add(desc.getTargetPartKey());
}
public static SparkEdgeProperty getEdgeProperty(HiveConf conf, ReduceSinkOperator reduceSink,
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
index e3146cf..bd9de09 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/SparkPartitionPruningSinkOperator.java
@@ -166,7 +166,10 @@ public class SparkPartitionPruningSinkOperator extends Operator<SparkPartitionPr
try {
fsout = fs.create(path, numOfRepl);
out = new ObjectOutputStream(new BufferedOutputStream(fsout));
- out.writeUTF(conf.getTargetColumnName());
+ out.writeInt(conf.getTargetInfos().size());
+ for (SparkPartitionPruningSinkDesc.DPPTargetInfo info : conf.getTargetInfos()) {
+ out.writeUTF(info.columnName);
+ }
buffer.writeTo(out);
} catch (Exception e) {
try {
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q
new file mode 100644
index 0000000..240128f
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/spark_dynamic_partition_pruning_4.q
@@ -0,0 +1,157 @@
+set hive.spark.dynamic.partition.pruning=true;
+set hive.combine.equivalent.work.optimization=true;
+
+-- This qfile tests whether equivalent DPP sink works are combined.
+-- When combined, one DPP sink operator will have multiple target columns/works.
+
+-- SORT_QUERY_RESULTS
+
+create table part1(key string, value string) partitioned by (p string, q string);
+insert into table part1 partition (p='1', q='1') values ('1','1'), ('2','2');
+insert into table part1 partition (p='1', q='2') values ('3','3'), ('4','4');
+insert into table part1 partition (p='2', q='1') values ('5','5'), ('6','6');
+insert into table part1 partition (p='2', q='2') values ('7','7'), ('8','8');
+
+create table part2(key string, value string) partitioned by (p string, q string);
+insert into table part2 partition (p='3', q='3') values ('a','a'), ('b','b');
+insert into table part2 partition (p='3', q='4') values ('c','c'), ('d','d');
+insert into table part2 partition (p='4', q='3') values ('e','e'), ('f','f');
+insert into table part2 partition (p='4', q='4') values ('g','g'), ('h','h');
+
+-- dpp works should be combined
+explain
+select * from
+ (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+ (select part2.key, part2.value from part2 join src on part2.p=src.key);
+
+select * from
+ (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+ (select part2.key, part2.value from part2 join src on part2.p=src.key);
+
+-- verify result
+set hive.spark.dynamic.partition.pruning=false;
+
+select * from
+ (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+ (select part2.key, part2.value from part2 join src on part2.p=src.key);
+
+set hive.spark.dynamic.partition.pruning=true;
+
+-- dpp works should be combined
+explain
+select * from
+ (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+ (select part2.key, part2.value from part2 join src on part2.q=src.key);
+
+select * from
+ (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+ (select part2.key, part2.value from part2 join src on part2.q=src.key);
+
+-- verify result
+set hive.spark.dynamic.partition.pruning=false;
+
+select * from
+ (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+ (select part2.key, part2.value from part2 join src on part2.q=src.key);
+
+set hive.spark.dynamic.partition.pruning=true;
+
+-- target works are already combined
+explain
+select * from
+ (select part1.key, part1.value from part1 join src on part1.q=src.key) a
+union all
+ (select part1.key, part1.value from part1 join src on part1.q=src.key);
+
+select * from
+ (select part1.key, part1.value from part1 join src on part1.q=src.key) a
+union all
+ (select part1.key, part1.value from part1 join src on part1.q=src.key);
+
+-- dpp works shouldn't be combined
+explain
+select * from
+ (select part1.key, part1.value from part1 join src on part1.p=src.key) a
+union all
+ (select part2.key, part2.value from part2 join src on part2.p=src.value);
+
+-- dpp works shouldn't be combined
+explain
+select * from
+ (select part1.key, part1.value from part1 join src on part1.p=upper(src.key)) a
+union all
+ (select part2.key, part2.value from part2 join src on part2.p=src.key);
+
+-- dpp works should be combined
+explain
+with top as
+(select key from src order by key limit 200)
+select * from
+ (select part1.key, part1.value from part1 join top on part1.q=top.key) a
+union all
+ (select part2.key, part2.value from part2 join top on part2.q=top.key);
+
+with top as
+(select key from src order by key limit 200)
+select * from
+ (select part1.key, part1.value from part1 join top on part1.q=top.key) a
+union all
+ (select part2.key, part2.value from part2 join top on part2.q=top.key);
+
+-- verify result
+set hive.spark.dynamic.partition.pruning=false;
+
+with top as
+(select key from src order by key limit 200)
+select * from
+ (select part1.key, part1.value from part1 join top on part1.q=top.key) a
+union all
+ (select part2.key, part2.value from part2 join top on part2.q=top.key);
+
+set hive.spark.dynamic.partition.pruning=true;
+
+-- dpp works should be combined
+explain
+with top as
+(select key, value from src order by key, value limit 200)
+select * from
+ (select part1.key, part1.value from part1 join top on part1.p=top.key and part1.q=top.key) a
+union all
+ (select part2.key, part2.value from part2 join top on part2.p=top.key and part2.q=top.key);
+
+with top as
+(select key, value from src order by key, value limit 200)
+select * from
+ (select part1.key, part1.value from part1 join top on part1.p=top.key and part1.q=top.key) a
+union all
+ (select part2.key, part2.value from part2 join top on part2.p=top.key and part2.q=top.key);
+
+-- verify result
+set hive.spark.dynamic.partition.pruning=false;
+
+with top as
+(select key, value from src order by key, value limit 200)
+select * from
+ (select part1.key, part1.value from part1 join top on part1.p=top.key and part1.q=top.key) a
+union all
+ (select part2.key, part2.value from part2 join top on part2.p=top.key and part2.q=top.key);
+
+set hive.spark.dynamic.partition.pruning=true;
+
+-- dpp works shouldn't be combined
+explain
+with top as
+(select key, value from src order by key, value limit 200)
+select * from
+ (select part1.key, part1.value from part1 join top on part1.p=top.key and part1.q=top.key) a
+union all
+ (select part2.key, part2.value from part2 join top on part2.p=top.key and part2.q=top.value);
+
+drop table part1;
+drop table part2;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
index 95efa39..7e97988 100644
--- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning.q.out
@@ -227,10 +227,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -483,10 +483,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: day(ds)
+ Target column: [1:ds (string)]
+ partition key expr: [day(ds)]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -730,10 +730,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: abs(((- UDFToLong(concat(UDFToString(day(ds)), '0'))) + 10))
+ Target column: [1:ds (string)]
+ partition key expr: [abs(((- UDFToLong(concat(UDFToString(day(ds)), '0'))) + 10))]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -873,10 +873,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: CAST( UDFToShort(day(ds)) AS decimal(10,0))
+ Target column: [1:ds (string)]
+ partition key expr: [CAST( UDFToShort(day(ds)) AS decimal(10,0))]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -1015,10 +1015,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Map 8
Map Operator Tree:
TableScan
@@ -1042,10 +1042,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: hr
+ Target column: [1:hr (string)]
+ partition key expr: [hr]
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -1377,10 +1377,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Select Operator
expressions: _col2 (type: string)
outputColumnNames: _col0
@@ -1391,10 +1391,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: hr
+ Target column: [1:hr (string)]
+ partition key expr: [hr]
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -1646,10 +1646,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -1902,10 +1902,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: UDFToDouble(hr)
+ Target column: [1:hr (string)]
+ partition key expr: [UDFToDouble(hr)]
Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -2042,10 +2042,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: (UDFToDouble(hr) * 2.0)
+ Target column: [1:hr (string)]
+ partition key expr: [(UDFToDouble(hr) * 2.0)]
Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -2405,10 +2405,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: UDFToString((UDFToDouble(hr) * 2.0))
+ Target column: [1:hr (string)]
+ partition key expr: [UDFToString((UDFToDouble(hr) * 2.0))]
Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -2787,10 +2787,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Select Operator
expressions: _col2 (type: string)
outputColumnNames: _col0
@@ -2801,10 +2801,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: hr
+ Target column: [1:hr (string)]
+ partition key expr: [hr]
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -2940,10 +2940,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -3061,10 +3061,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [4:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 4
+ target works: [Map 4]
Stage: Stage-1
Spark
@@ -3181,10 +3181,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -3303,10 +3303,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Map 8
Map Operator Tree:
TableScan
@@ -3330,10 +3330,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: hr
+ Target column: [1:hr (string)]
+ partition key expr: [hr]
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -3688,10 +3688,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Reducer 9
Reduce Operator Tree:
Group By Operator
@@ -3717,10 +3717,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -3956,10 +3956,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Reducer 9
Reduce Operator Tree:
Group By Operator
@@ -3985,10 +3985,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -4227,10 +4227,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Reducer 13
Reduce Operator Tree:
Group By Operator
@@ -4256,10 +4256,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -4459,10 +4459,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -4592,10 +4592,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: day(ds)
+ Target column: [1:ds (string)]
+ partition key expr: [day(ds)]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -4718,10 +4718,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
Map 4
@@ -4751,10 +4751,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: hr
+ Target column: [1:hr (string)]
+ partition key expr: [hr]
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -4897,10 +4897,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Select Operator
expressions: _col2 (type: string)
outputColumnNames: _col0
@@ -4911,10 +4911,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: hr
+ Target column: [1:hr (string)]
+ partition key expr: [hr]
Statistics: Num rows: 1 Data size: 27 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -5043,10 +5043,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -5167,10 +5167,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: UDFToDouble(hr)
+ Target column: [1:hr (string)]
+ partition key expr: [UDFToDouble(hr)]
Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -5291,10 +5291,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: (UDFToDouble(hr) * 2.0)
+ Target column: [1:hr (string)]
+ partition key expr: [(UDFToDouble(hr) * 2.0)]
Statistics: Num rows: 1 Data size: 7 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -5553,10 +5553,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -5840,10 +5840,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 1 Data size: 21 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
Map 4
@@ -5873,10 +5873,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (string)
- partition key expr: hr
+ Target column: [1:hr (string)]
+ partition key expr: [hr]
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -6189,10 +6189,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Reducer 9
Reduce Operator Tree:
Group By Operator
@@ -6218,10 +6218,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 2 Data size: 368 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Stage: Stage-1
Spark
@@ -6458,10 +6458,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 54 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: ds (string)
- partition key expr: ds
+ Target column: [1:ds (string)]
+ partition key expr: [ds]
Statistics: Num rows: 2 Data size: 54 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Select Operator
expressions: UDFToDouble(_col2) (type: double)
outputColumnNames: _col0
@@ -6472,10 +6472,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 54 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: hr (int)
- partition key expr: UDFToDouble(hr)
+ Target column: [1:hr (int)]
+ partition key expr: [UDFToDouble(hr)]
Statistics: Num rows: 2 Data size: 54 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
index cc7819c..075aaff 100644
--- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_2.q.out
@@ -184,10 +184,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 10 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: dim_shops_id (int)
- partition key expr: dim_shops_id
+ Target column: [1:dim_shops_id (int)]
+ partition key expr: [dim_shops_id]
Statistics: Num rows: 2 Data size: 10 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -726,10 +726,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 2 Data size: 10 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: dim_shops_id (int)
- partition key expr: dim_shops_id
+ Target column: [1:dim_shops_id (int)]
+ partition key expr: [dim_shops_id]
Statistics: Num rows: 2 Data size: 10 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -887,10 +887,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: dim_shops_id (int)
- partition key expr: dim_shops_id
+ Target column: [1:dim_shops_id (int)]
+ partition key expr: [dim_shops_id]
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -925,10 +925,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: dim_shops_id (int)
- partition key expr: dim_shops_id
+ Target column: [3:dim_shops_id (int)]
+ partition key expr: [dim_shops_id]
Statistics: Num rows: 1 Data size: 5 Basic stats: COMPLETE Column stats: NONE
- target work: Map 3
+ target works: [Map 3]
Local Work:
Map Reduce Local Work
http://git-wip-us.apache.org/repos/asf/hive/blob/aaacda47/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out
index cad45e1..53f25ce 100644
--- a/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out
+++ b/ql/src/test/results/clientpositive/spark/spark_dynamic_partition_pruning_3.q.out
@@ -331,10 +331,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col (int)
- partition key expr: part_col
+ Target column: [4:part_col (int)]
+ partition key expr: [part_col]
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
- target work: Map 4
+ target works: [Map 4]
Local Work:
Map Reduce Local Work
@@ -383,10 +383,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 4 Data size: 4 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col (int)
- partition key expr: part_col
+ Target column: [1:part_col (int)]
+ partition key expr: [part_col]
Statistics: Num rows: 4 Data size: 4 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -524,10 +524,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col (int)
- partition key expr: part_col
+ Target column: [1:part_col (int)]
+ partition key expr: [part_col]
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
Map 4
@@ -557,10 +557,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col (int)
- partition key expr: part_col
+ Target column: [1:part_col (int)]
+ partition key expr: [part_col]
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -700,10 +700,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col (int)
- partition key expr: part_col
+ Target column: [1:part_col (int)]
+ partition key expr: [part_col]
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -737,10 +737,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col (int)
- partition key expr: part_col
+ Target column: [5:part_col (int)]
+ partition key expr: [part_col]
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
- target work: Map 5
+ target works: [Map 5]
Local Work:
Map Reduce Local Work
@@ -920,10 +920,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col (int)
- partition key expr: part_col
+ Target column: [2:part_col (int)]
+ partition key expr: [part_col]
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
- target work: Map 2
+ target works: [Map 2]
Local Work:
Map Reduce Local Work
@@ -1054,10 +1054,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col (int)
- partition key expr: part_col
+ Target column: [2:part_col (int)]
+ partition key expr: [part_col]
Statistics: Num rows: 1 Data size: 3 Basic stats: COMPLETE Column stats: NONE
- target work: Map 2
+ target works: [Map 2]
Local Work:
Map Reduce Local Work
@@ -1201,10 +1201,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col (int)
- partition key expr: part_col
+ Target column: [1:part_col (int)]
+ partition key expr: [part_col]
Statistics: Num rows: 6 Data size: 6 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -1340,10 +1340,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 18 Data size: 18 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col1 (int)
- partition key expr: part_col1
+ Target column: [1:part_col1 (int)]
+ partition key expr: [part_col1]
Statistics: Num rows: 18 Data size: 18 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Select Operator
expressions: _col1 (type: int)
outputColumnNames: _col0
@@ -1354,10 +1354,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 18 Data size: 18 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col2 (int)
- partition key expr: part_col2
+ Target column: [1:part_col2 (int)]
+ partition key expr: [part_col2]
Statistics: Num rows: 18 Data size: 18 Basic stats: COMPLETE Column stats: NONE
- target work: Map 1
+ target works: [Map 1]
Local Work:
Map Reduce Local Work
@@ -1493,10 +1493,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col (int)
- partition key expr: part_col
+ Target column: [3:part_col (int)]
+ partition key expr: [part_col]
Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE Column stats: NONE
- target work: Map 3
+ target works: [Map 3]
Local Work:
Map Reduce Local Work
Map 2
@@ -1526,10 +1526,10 @@ STAGE PLANS:
outputColumnNames: _col0
Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE Column stats: NONE
Spark Partition Pruning Sink Operator
- Target column: part_col (int)
- partition key expr: part_col
+ Target column: [3:part_col (int)]
+ partition key expr: [part_col]
Statistics: Num rows: 3 Data size: 9 Basic stats: COMPLETE Column stats: NONE
- target work: Map 3
+ target works: [Map 3]
Local Work:
Map Reduce Local Work