You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2015/05/21 04:14:06 UTC

hive git commit: HIVE-10458: Enable parallel order by for spark [Spark Branch] (Rui reviewed by Xuefu)

Repository: hive
Updated Branches:
  refs/heads/spark 74ac99fa3 -> a318d5b00


HIVE-10458: Enable parallel order by for spark [Spark Branch] (Rui reviewed by Xuefu)

Signed-off-by: Rui Li <ru...@intel.com>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a318d5b0
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a318d5b0
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a318d5b0

Branch: refs/heads/spark
Commit: a318d5b00f92dd2130de6a45cc44f268fded2d9c
Parents: 74ac99f
Author: Rui Li <ru...@intel.com>
Authored: Thu May 21 10:13:17 2015 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Thu May 21 10:13:17 2015 +0800

----------------------------------------------------------------------
 .../correlation/ReduceSinkDeDuplication.java    | 12 ++++-----
 .../spark/SetSparkReducerParallelism.java       | 28 +++++++++++++++++++-
 .../hive/ql/parse/spark/GenSparkUtils.java      |  2 +-
 .../hadoop/hive/ql/plan/ReduceSinkDesc.java     | 15 +++++------
 .../clientpositive/spark/parallel_orderby.q.out |  6 ++---
 5 files changed, 44 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
index 404b759..8ac9ca7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/correlation/ReduceSinkDeDuplication.java
@@ -489,7 +489,7 @@ public class ReduceSinkDeDuplication implements Transform {
       if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
         CorrelationUtilities.replaceReduceSinkWithSelectOperator(
             cRS, dedupCtx.getPctx(), dedupCtx);
-        pRS.getConf().setEnforceSort(true);
+        pRS.getConf().setDeduplicated(true);
         return true;
       }
       return false;
@@ -512,7 +512,7 @@ public class ReduceSinkDeDuplication implements Transform {
       if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
         CorrelationUtilities.removeReduceSinkForGroupBy(
             cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
-        pRS.getConf().setEnforceSort(true);
+        pRS.getConf().setDeduplicated(true);
         return true;
       }
       return false;
@@ -535,7 +535,7 @@ public class ReduceSinkDeDuplication implements Transform {
             CorrelationUtilities.findPossibleParent(
                 pJoin, ReduceSinkOperator.class, dedupCtx.trustScript());
         if (pRS != null) {
-          pRS.getConf().setEnforceSort(true);
+          pRS.getConf().setDeduplicated(true);
         }
         return true;
       }
@@ -559,7 +559,7 @@ public class ReduceSinkDeDuplication implements Transform {
             CorrelationUtilities.findPossibleParent(
                 pJoin, ReduceSinkOperator.class, dedupCtx.trustScript());
         if (pRS != null) {
-          pRS.getConf().setEnforceSort(true);
+          pRS.getConf().setDeduplicated(true);
         }
         return true;
       }
@@ -579,7 +579,7 @@ public class ReduceSinkDeDuplication implements Transform {
       if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
         CorrelationUtilities.replaceReduceSinkWithSelectOperator(
             cRS, dedupCtx.getPctx(), dedupCtx);
-        pRS.getConf().setEnforceSort(true);
+        pRS.getConf().setDeduplicated(true);
         return true;
       }
       return false;
@@ -596,7 +596,7 @@ public class ReduceSinkDeDuplication implements Transform {
               start, ReduceSinkOperator.class, dedupCtx.trustScript());
       if (pRS != null && merge(cRS, pRS, dedupCtx.minReducer())) {
         CorrelationUtilities.removeReduceSinkForGroupBy(cRS, cGBY, dedupCtx.getPctx(), dedupCtx);
-        pRS.getConf().setEnforceSort(true);
+        pRS.getConf().setDeduplicated(true);
         return true;
       }
       return false;

http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
index f9ef474..5f9225c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/spark/SetSparkReducerParallelism.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.optimizer.spark;
 
+import java.util.List;
 import java.util.Stack;
 
 import org.apache.commons.logging.Log;
@@ -26,6 +27,7 @@ import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.LimitOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -75,7 +77,7 @@ public class SetSparkReducerParallelism implements NodeProcessor {
 
     context.getVisitedReduceSinks().add(sink);
 
-    if (desc.getNumReducers() <= 0) {
+    if (needSetParallelism(sink, context.getConf())) {
       if (constantReducers > 0) {
         LOG.info("Parallelism for reduce sink " + sink + " set by user to " + constantReducers);
         desc.setNumReducers(constantReducers);
@@ -158,4 +160,28 @@ public class SetSparkReducerParallelism implements NodeProcessor {
     return false;
   }
 
+  // tests whether the RS needs automatic setting parallelism
+  private boolean needSetParallelism(ReduceSinkOperator reduceSink, HiveConf hiveConf) {
+    ReduceSinkDesc desc = reduceSink.getConf();
+    if (desc.getNumReducers() <= 0) {
+      return true;
+    }
+    if (desc.getNumReducers() == 1 && desc.hasOrderBy() &&
+        hiveConf.getBoolVar(HiveConf.ConfVars.HIVESAMPLINGFORORDERBY) && !desc.isDeduplicated()) {
+      List<Operator<? extends OperatorDesc>> children = reduceSink.getChildOperators();
+      while (children != null && children.size() > 0) {
+        if (children.size() != 1 || children.get(0) instanceof LimitOperator) {
+          return false;
+        }
+        if (children.get(0) instanceof ReduceSinkOperator ||
+            children.get(0) instanceof FileSinkOperator) {
+          break;
+        }
+        children = children.get(0).getChildOperators();
+      }
+      return true;
+    }
+    return false;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
index e27ce0d..7992c88 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/spark/GenSparkUtils.java
@@ -399,7 +399,7 @@ public class GenSparkUtils {
    */
   private static boolean groupByNeedParLevelOrder(ReduceSinkOperator reduceSinkOperator) {
     // whether we have to enforce sort anyway, e.g. in case of RS deduplication
-    if (reduceSinkOperator.getConf().isEnforceSort()) {
+    if (reduceSinkOperator.getConf().isDeduplicated()) {
       return true;
     }
     List<Operator<? extends OperatorDesc>> children = reduceSinkOperator.getChildOperators();

http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
index 1891dff..b4316ec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReduceSinkDesc.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.plan;
 import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.List;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -114,8 +113,8 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
   // Write type, since this needs to calculate buckets differently for updates and deletes
   private AcidUtils.Operation writeType;
 
-  // whether we'll enforce the sort order of the RS
-  private transient boolean enforceSort = false;
+  // whether this RS is deduplicated
+  private transient boolean isDeduplicated = false;
 
   // used by spark mode to decide whether global order is needed
   private transient boolean hasOrderBy = false;
@@ -174,7 +173,7 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     desc.setStatistics(this.getStatistics());
     desc.setSkipTag(skipTag);
     desc.reduceTraits = reduceTraits.clone();
-    desc.setEnforceSort(enforceSort);
+    desc.setDeduplicated(isDeduplicated);
     desc.setHasOrderBy(hasOrderBy);
     return desc;
   }
@@ -434,12 +433,12 @@ public class ReduceSinkDesc extends AbstractOperatorDesc {
     return writeType;
   }
 
-  public boolean isEnforceSort() {
-    return enforceSort;
+  public boolean isDeduplicated() {
+    return isDeduplicated;
   }
 
-  public void setEnforceSort(boolean isDeduplicated) {
-    this.enforceSort = isDeduplicated;
+  public void setDeduplicated(boolean isDeduplicated) {
+    this.isDeduplicated = isDeduplicated;
   }
 
   public boolean hasOrderBy() {

http://git-wip-us.apache.org/repos/asf/hive/blob/a318d5b0/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out b/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out
index 0194dbb..03314ea 100644
--- a/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out
+++ b/ql/src/test/results/clientpositive/spark/parallel_orderby.q.out
@@ -38,7 +38,7 @@ STAGE PLANS:
   Stage: Stage-1
     Spark
       Edges:
-        Reducer 2 <- Map 1 (SORT, 1)
+        Reducer 2 <- Map 1 (SORT, 4)
 #### A masked pattern was here ####
       Vertices:
         Map 1 
@@ -117,7 +117,7 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	true                
-	numFiles            	1                   
+	numFiles            	4                   
 	numRows             	48                  
 	rawDataSize         	512                 
 	totalSize           	560                 
@@ -231,7 +231,7 @@ Retention:          	0
 Table Type:         	MANAGED_TABLE       	 
 Table Parameters:	 	 
 	COLUMN_STATS_ACCURATE	true                
-	numFiles            	1                   
+	numFiles            	4                   
 	numRows             	48                  
 	rawDataSize         	512                 
 	totalSize           	560