You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/12 03:03:19 UTC

svn commit: r1631140 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: IdentityTran.java MapInput.java MapTran.java ReduceTran.java ShuffleTran.java SparkPlanGenerator.java SparkTran.java

Author: xuefu
Date: Sun Oct 12 01:03:19 2014
New Revision: 1631140

URL: http://svn.apache.org/r1631140
Log:
HIVE-8276: Separate shuffle from ReduceTran and so create ShuffleTran [Spark Branch] (Chao via Xuefu)

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java Sun Oct 12 01:03:19 2014
@@ -18,15 +18,13 @@
 
 package org.apache.hadoop.hive.ql.exec.spark;
 
-import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
-public class IdentityTran implements SparkTran<HiveKey, HiveKey> {
+public class IdentityTran<K extends BytesWritable, V> implements SparkTran<K, V, K, V> {
 
   @Override
-  public JavaPairRDD<HiveKey, BytesWritable> transform(
-      JavaPairRDD<HiveKey, BytesWritable> input) {
+  public JavaPairRDD<K, V> transform(JavaPairRDD<K, V> input) {
     return input;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Sun Oct 12 01:03:19 2014
@@ -24,7 +24,7 @@ import org.apache.spark.api.java.JavaPai
 
 import com.google.common.base.Preconditions;
 
-public class MapInput implements SparkTran<BytesWritable, HiveKey> {
+public class MapInput implements SparkTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> {
   private JavaPairRDD<HiveKey, BytesWritable> hadoopRDD;
   private boolean toCache;
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java Sun Oct 12 01:03:19 2014
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
-public class MapTran implements SparkTran<BytesWritable,HiveKey> {
+public class MapTran implements SparkTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> {
   private HiveMapFunction mapFunc;
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java Sun Oct 12 01:03:19 2014
@@ -22,27 +22,16 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
-public class ReduceTran implements SparkTran<HiveKey, HiveKey> {
-  private SparkShuffler shuffler;
+public class ReduceTran implements SparkTran<HiveKey, Iterable<BytesWritable>, HiveKey, BytesWritable> {
   private HiveReduceFunction reduceFunc;
-  private int numPartitions;
 
   @Override
   public JavaPairRDD<HiveKey, BytesWritable> transform(
-      JavaPairRDD<HiveKey, BytesWritable> input) {
-    return shuffler.shuffle(input, numPartitions).mapPartitionsToPair(reduceFunc);
+      JavaPairRDD<HiveKey, Iterable<BytesWritable>> input) {
+    return input.mapPartitionsToPair(reduceFunc);
   }
 
   public void setReduceFunction(HiveReduceFunction redFunc) {
     this.reduceFunc = redFunc;
   }
-
-  public void setShuffler(SparkShuffler shuffler) {
-    this.shuffler = shuffler;
-  }
-
-  public void setNumPartitions(int numPartitions) {
-    this.numPartitions = numPartitions;
-  }
-
 }

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java?rev=1631140&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java Sun Oct 12 01:03:19 2014
@@ -0,0 +1,45 @@
+/**
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, Iterable<BytesWritable>> {
+  private final SparkShuffler shuffler;
+  private final int numOfPartitions;
+  private final boolean toCache;
+
+  public ShuffleTran(SparkShuffler sf, int n) {
+    this(sf, n, false);
+  }
+
+  public ShuffleTran(SparkShuffler sf, int n, boolean c) {
+    shuffler = sf;
+    numOfPartitions = n;
+    toCache = c;
+  }
+
+  @Override
+  public JavaPairRDD<HiveKey, Iterable<BytesWritable>> transform(JavaPairRDD<HiveKey, BytesWritable> input) {
+    JavaPairRDD<HiveKey, Iterable<BytesWritable>> result = shuffler.shuffle(input, numOfPartitions);
+    return toCache ? result.cache() : result;
+  }
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Sun Oct 12 01:03:19 2014
@@ -82,12 +82,23 @@ public class SparkPlanGenerator {
       if (work instanceof MapWork) {
         MapInput mapInput = generateMapInput((MapWork)work);
         sparkPlan.addTran(mapInput);
-        tran = generate(work, null);
+        tran = generate((MapWork)work);
         sparkPlan.addTran(tran);
         sparkPlan.connect(mapInput, tran);
+      } else if (work instanceof ReduceWork) {
+        List<BaseWork> parentWorks = sparkWork.getParents(work);
+        tran = generate((ReduceWork)work);
+        sparkPlan.addTran(tran);
+        ShuffleTran shuffleTran = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work));
+        sparkPlan.addTran(shuffleTran);
+        sparkPlan.connect(shuffleTran, tran);
+        for (BaseWork parentWork : parentWorks) {
+          SparkTran parentTran = workToTranMap.get(parentWork);
+          sparkPlan.connect(parentTran, shuffleTran);
+        }
       } else {
         List<BaseWork> parentWorks = sparkWork.getParents(work);
-        tran = generate(work, sparkWork.getEdgeProperty(parentWorks.get(0), work));
+        tran = new IdentityTran();
         sparkPlan.addTran(tran);
         for (BaseWork parentWork : parentWorks) {
           SparkTran parentTran = workToTranMap.get(parentWork);
@@ -129,24 +140,6 @@ public class SparkPlanGenerator {
     return inputFormatClass;
   }
 
-  public SparkTran generate(BaseWork work, SparkEdgeProperty edge) throws Exception {
-    if (work instanceof MapWork) {
-      MapWork mw = (MapWork) work;
-      return generate(mw);
-    } else if (work instanceof ReduceWork) {
-      ReduceWork rw = (ReduceWork) work;
-      ReduceTran tran = generate(rw);
-      SparkShuffler shuffler = generate(edge);
-      tran.setShuffler(shuffler);
-      tran.setNumPartitions(edge.getNumPartitions());
-      return tran;
-    } else if (work instanceof UnionWork) {
-      return new IdentityTran();
-    } else {
-      throw new HiveException("Unexpected work: " + work.getName());
-    }
-  }
-
   private MapInput generateMapInput(MapWork mapWork)
       throws Exception {
     JobConf jobConf = cloneJobConf(mapWork);
@@ -157,15 +150,18 @@ public class SparkPlanGenerator {
     return new MapInput(hadoopRDD);
   }
 
-  private SparkShuffler generate(SparkEdgeProperty edge) {
+  private ShuffleTran generate(SparkEdgeProperty edge) {
     Preconditions.checkArgument(!edge.isShuffleNone(),
         "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
+    SparkShuffler shuffler;
     if (edge.isMRShuffle()) {
-      return new SortByShuffler(false);
+      shuffler = new SortByShuffler(false);
     } else if (edge.isShuffleSort()) {
-      return new SortByShuffler(true);
+      shuffler = new SortByShuffler(true);
+    } else {
+      shuffler = new GroupByShuffler();
     }
-    return new GroupByShuffler();
+    return new ShuffleTran(shuffler, edge.getNumPartitions());
   }
 
   private MapTran generate(MapWork mw) throws Exception {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java Sun Oct 12 01:03:19 2014
@@ -21,7 +21,7 @@ package org.apache.hadoop.hive.ql.exec.s
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
 
-public interface SparkTran<KI extends BytesWritable, KO extends BytesWritable> {
-  JavaPairRDD<KO, BytesWritable> transform(
-      JavaPairRDD<KI, BytesWritable> input);
+public interface SparkTran<KI extends BytesWritable, VI, KO extends BytesWritable, VO> {
+  JavaPairRDD<KO, VO> transform(
+      JavaPairRDD<KI, VI> input);
 }