You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2015/05/21 04:14:06 UTC
hive git commit: HIVE-10458: Enable parallel order by for spark
[Spark Branch] (Rui reviewed by Xuefu)
Repository: hive
Updated Branches:
refs/heads/spark 74ac99fa3 -> a318d5b00
HIVE-10458: Enable parallel order by for spark [Spark Branch] (Rui reviewed by Xuefu)
Signed-off-by: Rui Li <ru...@intel.com>
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a318d5b0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a318d5b0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a318d5b0
Branch: refs/heads/spark
Commit: a318d5b00f92dd2130de6a45cc44f268fded2d9c
Parents: 74ac99f
Author: Rui Li <ru...@intel.com>
Authored: Thu May 21 10:13:17 2015 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu May 21 10:13:17 2015 +0800
----------------------------------------------------------------------
.../correlation/ReduceSinkDeDuplication.java | 12 ++++-----
.../spark/SetSparkReducerParallelism.java | 28 +++++++++++++++++++-
.../hive/ql/parse/spark/GenSparkUtils.java | 2 +-
.../hadoop/hive/ql/plan/ReduceSinkDesc.java | 15 +++++------
.../clientpositive/spark/parallel_orderby.q.out | 6 ++---
5 files changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
index 404b759..8ac9ca7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
@@ -489,7 +489,7 @@ public class ReduceSinkDeDuplication implements Transform {
if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
CorrelationUtilities.replaceReduceSinkWithSelectOperator(
cRS, dedupCtx.getPctx(), dedupCtx);
- pRS.getConf().setEnforceSort(true);
+ pRS.getConf().setDeduplicated(true);
return true;
}
return false;
@@ -512,7 +512,7 @@ public class ReduceSinkDeDuplication implements Transform {
if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
CorrelationUtilities.removeReduceSinkForGroupBy(
cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
- pRS.getConf().setEnforceSort(true);
+ pRS.getConf().setDeduplicated(true);
return true;
}
return false;
@@ -535,7 +535,7 @@ public class ReduceSinkDeDuplication implements Transform {
CorrelationUtilities.findPossibleParent(
pJoin, ReduceSinkOperator.class, dedupCtx.trustScript());
if (pRS != null) {
- pRS.getConf().setEnforceSort(true);
+ pRS.getConf().setDeduplicated(true);
}
return true;
}
@@ -559,7 +559,7 @@ public class ReduceSinkDeDuplication implements Transform {
CorrelationUtilities.findPossibleParent(
pJoin, ReduceSinkOperator.class, dedupCtx.trustScript());
if (pRS != null) {
- pRS.getConf().setEnforceSort(true);
+ pRS.getConf().setDeduplicated(true);
}
return true;
}
@@ -579,7 +579,7 @@ public class ReduceSinkDeDuplication implements Transform {
if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
CorrelationUtilities.replaceReduceSinkWithSelectOperator(
cRS, dedupCtx.getPctx(), dedupCtx);
- pRS.getConf().setEnforceSort(true);
+ pRS.getConf().setDeduplicated(true);
return true;
}
return false;
@@ -596,7 +596,7 @@ public class ReduceSinkDeDuplication implements Transform {
start, ReduceSinkOperator.class, dedupCtx.trustScript());
if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
CorrelationUtilities.removeReduceSinkForGroupBy(cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
- pRS.getConf().setEnforceSort(true);
+ pRS.getConf().setDeduplicated(true);
return true;
}
return false;
http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
index f9ef474..5f9225c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.optimizer.spark;
+import java.util.List;
import java.util.Stack;
import org.apache.commons.logging.Log;
@@ -26,6 +27,7 @@ import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -75,7 +77,7 @@ public class SetSparkReducerParallelism implements NodeProcessor {
context.getVisitedReduceSinks().add(sink);
- if (desc.getNumReducers() <= 0) {
+ if (needSetParallelism(sink, context.getConf())) {
if (constantReducers > 0) {
LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers);
desc.setNumReducers(constantReducers);
@@ -158,4 +160,28 @@ public class SetSparkReducerParallelism implements NodeProcessor {
return false;
}
+ // tests whether the RS needs automatic setting parallelism
+ private boolean needSetParallelism(ReduceSinkOperator reduceSink, HiveConf hiveConf) {
+ ReduceSinkDesc desc = reduceSink.getConf();
+ if (desc.getNumReducers() <= 0) {
+ return true;
+ }
+ if (desc.getNumReducers() == 1 && desc.hasOrderBy() &&
+ hiveConf.getBoolVar(HiveConf.ConfVars.HIVESAMPLINGFORORDERBY) && !desc.isDeduplicated()) {
+ List<Operator<? extends OperatorDesc>> children = reduceSink.getChildOperators();
+ while (children != null && children.size() > 0) {
+ if (children.size() != 1 || children.get(0) instanceof LimitOperator) {
+ return false;
+ }
+ if (children.get(0) instanceof ReduceSinkOperator ||
+ children.get(0) instanceof FileSinkOperator) {
+ break;
+ }
+ children = children.get(0).getChildOperators();
+ }
+ return true;
+ }
+ return false;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index e27ce0d..7992c88 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -399,7 +399,7 @@ public class GenSparkUtils {
*/
private static boolean groupByNeedParLevelOrder(ReduceSinkOperator reduceSinkOperator) {
// whether we have to enforce sort anyway, e.g. in case of RS deduplication
- if (reduceSinkOperator.getConf().isEnforceSort()) {
+ if (reduceSinkOperator.getConf().isDeduplicated()) {
return true;
}
List<Operator<? extends OperatorDesc>> children = reduceSinkOperator.getChildOperators();
http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
index 1891dff..b4316ec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.plan;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -114,8 +113,8 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
// Write type, since this needs to calculate buckets differently for updates and deletes
private AcidUtils.Operation writeType;
- // whether we'll enforce the sort order of the RS
- private transient boolean enforceSort = false;
+ // whether this RS is deduplicated
+ private transient boolean isDeduplicated = false;
// used by spark mode to decide whether global order is needed
private transient boolean hasOrderBy = false;
@@ -174,7 +173,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
desc.setStatistics(this.getStatistics());
desc.setSkipTag(skipTag);
desc.reduceTraits = reduceTraits.clone();
- desc.setEnforceSort(enforceSort);
+ desc.setDeduplicated(isDeduplicated);
desc.setHasOrderBy(hasOrderBy);
return desc;
}
@@ -434,12 +433,12 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
return writeType;
}
- public boolean isEnforceSort() {
- return enforceSort;
+ public boolean isDeduplicated() {
+ return isDeduplicated;
}
- public void setEnforceSort(boolean isDeduplicated) {
- this.enforceSort = isDeduplicated;
+ public void setDeduplicated(boolean isDeduplicated) {
+ this.isDeduplicated = isDeduplicated;
}
public boolean hasOrderBy() {
http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out b/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out
index 0194dbb..03314ea 100644
--- a/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out
+++ b/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out
@@ -38,7 +38,7 @@ STAGE PLANS:
Stage: Stage-1
Spark
Edges:
- Reducer 2 <- Map 1 (SORT, 1)
+ Reducer 2 <- Map 1 (SORT, 4)
#### A masked pattern was here ####
Vertices:
Map 1
@@ -117,7 +117,7 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE true
- numFiles 1
+ numFiles 4
numRows 48
rawDataSize 512
totalSize 560
@@ -231,7 +231,7 @@ Retention: 0
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE true
- numFiles 1
+ numFiles 4
numRows 48
rawDataSize 512
totalSize 560