You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by xu...@apache.org on 2014/10/12 03:03:19 UTC
svn commit: r1631140 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark:
IdentityTran.java MapInput.java MapTran.java ReduceTran.java
ShuffleTran.java SparkPlanGenerator.java SparkTran.java
Author: xuefu
Date: Sun Oct 12 01:03:19 2014
New Revision: 1631140
URL: http://svn.apache.org/r1631140
Log:
HIVE-8276: Separate shuffle from ReduceTran and so create ShuffleTran [Spark Branch] (Chao via Xuefu)
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/IdentityTran.java Sun Oct 12 01:03:19 2014
@@ -18,15 +18,13 @@
package org.apache.hadoop.hive.ql.exec.spark;
-import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
-public class IdentityTran implements SparkTran<HiveKey, HiveKey> {
+public class IdentityTran<K extends BytesWritable, V> implements SparkTran<K, V, K, V> {
@Override
- public JavaPairRDD<HiveKey, BytesWritable> transform(
- JavaPairRDD<HiveKey, BytesWritable> input) {
+ public JavaPairRDD<K, V> transform(JavaPairRDD<K, V> input) {
return input;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Sun Oct 12 01:03:19 2014
@@ -24,7 +24,7 @@ import org.apache.spark.api.java.JavaPai
import com.google.common.base.Preconditions;
-public class MapInput implements SparkTran<BytesWritable, HiveKey> {
+public class MapInput implements SparkTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> {
private JavaPairRDD<HiveKey, BytesWritable> hadoopRDD;
private boolean toCache;
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapTran.java Sun Oct 12 01:03:19 2014
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
-public class MapTran implements SparkTran<BytesWritable,HiveKey> {
+public class MapTran implements SparkTran<BytesWritable, BytesWritable, HiveKey, BytesWritable> {
private HiveMapFunction mapFunc;
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ReduceTran.java Sun Oct 12 01:03:19 2014
@@ -22,27 +22,16 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
-public class ReduceTran implements SparkTran<HiveKey, HiveKey> {
- private SparkShuffler shuffler;
+public class ReduceTran implements SparkTran<HiveKey, Iterable<BytesWritable>, HiveKey, BytesWritable> {
private HiveReduceFunction reduceFunc;
- private int numPartitions;
@Override
public JavaPairRDD<HiveKey, BytesWritable> transform(
- JavaPairRDD<HiveKey, BytesWritable> input) {
- return shuffler.shuffle(input, numPartitions).mapPartitionsToPair(reduceFunc);
+ JavaPairRDD<HiveKey, Iterable<BytesWritable>> input) {
+ return input.mapPartitionsToPair(reduceFunc);
}
public void setReduceFunction(HiveReduceFunction redFunc) {
this.reduceFunc = redFunc;
}
-
- public void setShuffler(SparkShuffler shuffler) {
- this.shuffler = shuffler;
- }
-
- public void setNumPartitions(int numPartitions) {
- this.numPartitions = numPartitions;
- }
-
}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java?rev=1631140&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java Sun Oct 12 01:03:19 2014
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.hive.ql.io.HiveKey;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.spark.api.java.JavaPairRDD;
+
+public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, Iterable<BytesWritable>> {
+ private final SparkShuffler shuffler;
+ private final int numOfPartitions;
+ private final boolean toCache;
+
+ public ShuffleTran(SparkShuffler sf, int n) {
+ this(sf, n, false);
+ }
+
+ public ShuffleTran(SparkShuffler sf, int n, boolean c) {
+ shuffler = sf;
+ numOfPartitions = n;
+ toCache = c;
+ }
+
+ @Override
+ public JavaPairRDD<HiveKey, Iterable<BytesWritable>> transform(JavaPairRDD<HiveKey, BytesWritable> input) {
+ JavaPairRDD<HiveKey, Iterable<BytesWritable>> result = shuffler.shuffle(input, numOfPartitions);
+ return toCache ? result.cache() : result;
+ }
+}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Sun Oct 12 01:03:19 2014
@@ -82,12 +82,23 @@ public class SparkPlanGenerator {
if (work instanceof MapWork) {
MapInput mapInput = generateMapInput((MapWork)work);
sparkPlan.addTran(mapInput);
- tran = generate(work, null);
+ tran = generate((MapWork)work);
sparkPlan.addTran(tran);
sparkPlan.connect(mapInput, tran);
+ } else if (work instanceof ReduceWork) {
+ List<BaseWork> parentWorks = sparkWork.getParents(work);
+ tran = generate((ReduceWork)work);
+ sparkPlan.addTran(tran);
+ ShuffleTran shuffleTran = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work));
+ sparkPlan.addTran(shuffleTran);
+ sparkPlan.connect(shuffleTran, tran);
+ for (BaseWork parentWork : parentWorks) {
+ SparkTran parentTran = workToTranMap.get(parentWork);
+ sparkPlan.connect(parentTran, shuffleTran);
+ }
} else {
List<BaseWork> parentWorks = sparkWork.getParents(work);
- tran = generate(work, sparkWork.getEdgeProperty(parentWorks.get(0), work));
+ tran = new IdentityTran();
sparkPlan.addTran(tran);
for (BaseWork parentWork : parentWorks) {
SparkTran parentTran = workToTranMap.get(parentWork);
@@ -129,24 +140,6 @@ public class SparkPlanGenerator {
return inputFormatClass;
}
- public SparkTran generate(BaseWork work, SparkEdgeProperty edge) throws Exception {
- if (work instanceof MapWork) {
- MapWork mw = (MapWork) work;
- return generate(mw);
- } else if (work instanceof ReduceWork) {
- ReduceWork rw = (ReduceWork) work;
- ReduceTran tran = generate(rw);
- SparkShuffler shuffler = generate(edge);
- tran.setShuffler(shuffler);
- tran.setNumPartitions(edge.getNumPartitions());
- return tran;
- } else if (work instanceof UnionWork) {
- return new IdentityTran();
- } else {
- throw new HiveException("Unexpected work: " + work.getName());
- }
- }
-
private MapInput generateMapInput(MapWork mapWork)
throws Exception {
JobConf jobConf = cloneJobConf(mapWork);
@@ -157,15 +150,18 @@ public class SparkPlanGenerator {
return new MapInput(hadoopRDD);
}
- private SparkShuffler generate(SparkEdgeProperty edge) {
+ private ShuffleTran generate(SparkEdgeProperty edge) {
Preconditions.checkArgument(!edge.isShuffleNone(),
"AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
+ SparkShuffler shuffler;
if (edge.isMRShuffle()) {
- return new SortByShuffler(false);
+ shuffler = new SortByShuffler(false);
} else if (edge.isShuffleSort()) {
- return new SortByShuffler(true);
+ shuffler = new SortByShuffler(true);
+ } else {
+ shuffler = new GroupByShuffler();
}
- return new GroupByShuffler();
+ return new ShuffleTran(shuffler, edge.getNumPartitions());
}
private MapTran generate(MapWork mw) throws Exception {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java?rev=1631140&r1=1631139&r2=1631140&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTran.java Sun Oct 12 01:03:19 2014
@@ -21,7 +21,7 @@ package org.apache.hadoop.hive.ql.exec.s
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
-public interface SparkTran<KI extends BytesWritable, KO extends BytesWritable> {
- JavaPairRDD<KO, BytesWritable> transform(
- JavaPairRDD<KI, BytesWritable> input);
+public interface SparkTran<KI extends BytesWritable, VI, KO extends BytesWritable, VO> {
+ JavaPairRDD<KO, VO> transform(
+ JavaPairRDD<KI, VI> input);
}