You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by li...@apache.org on 2016/05/17 07:44:37 UTC

hive git commit: HIVE-13293: Cache RDD to improve parallel order by performance for HoS (Rui reviewed by Xuefu)

Repository: hive
Updated Branches:
  refs/heads/master e73891415 -> 3f316cb5a


HIVE-13293: Cache RDD to improve parallel order by performance for HoS (Rui reviewed by Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3f316cb5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3f316cb5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3f316cb5

Branch: refs/heads/master
Commit: 3f316cb5aeeefecf2c432c77442164a7dd6514d3
Parents: e738914
Author: Rui Li <ru...@intel.com>
Authored: Tue May 17 15:43:43 2016 +0800
Committer: Rui Li <ru...@intel.com>
Committed: Tue May 17 15:44:16 2016 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/spark/SortByShuffler.java    | 9 ++++++++-
 .../hadoop/hive/ql/exec/spark/SparkPlanGenerator.java       | 4 ++--
 2 files changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3f316cb5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
index 766813c..a6350d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SortByShuffler.java
@@ -24,6 +24,7 @@ import org.apache.spark.HashPartitioner;
 import org.apache.spark.Partitioner;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.storage.StorageLevel;
 import scala.Tuple2;
 
 import java.util.*;
@@ -31,12 +32,14 @@ import java.util.*;
 public class SortByShuffler implements SparkShuffler {
 
   private final boolean totalOrder;
+  private final SparkPlan sparkPlan;
 
   /**
    * @param totalOrder whether this shuffler provides total order shuffle.
    */
-  public SortByShuffler(boolean totalOrder) {
+  public SortByShuffler(boolean totalOrder, SparkPlan sparkPlan) {
     this.totalOrder = totalOrder;
+    this.sparkPlan = sparkPlan;
   }
 
   @Override
@@ -45,6 +48,10 @@ public class SortByShuffler implements SparkShuffler {
     JavaPairRDD<HiveKey, BytesWritable> rdd;
     if (totalOrder) {
       if (numPartitions > 0) {
+        if (numPartitions > 1 && input.getStorageLevel() == StorageLevel.NONE()) {
+          input.persist(StorageLevel.DISK_ONLY());
+          sparkPlan.addCachedRDDId(input.id());
+        }
         rdd = input.sortByKey(true, numPartitions);
       } else {
         rdd = input.sortByKey(true);

http://git-wip-us.apache.org/repos/asf/hive/blob/3f316cb5/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index 6abef4e..66ffe5d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -215,9 +215,9 @@ public class SparkPlanGenerator {
         "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
     SparkShuffler shuffler;
     if (edge.isMRShuffle()) {
-      shuffler = new SortByShuffler(false);
+      shuffler = new SortByShuffler(false, sparkPlan);
     } else if (edge.isShuffleSort()) {
-      shuffler = new SortByShuffler(true);
+      shuffler = new SortByShuffler(true, sparkPlan);
     } else {
       shuffler = new GroupByShuffler();
     }