You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/11/15 06:30:58 UTC
svn commit: r1639836 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark:
MapInput.java ShuffleTran.java SparkPlanGenerator.java
Author: szehon
Date: Sat Nov 15 05:30:58 2014
New Revision: 1639836
URL: http://svn.apache.org/r1639836
Log:
HIVE-8844 : Choose a persisent policy for RDD caching [Spark Branch] (Jimmy Xiang via Szehon)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1639836&r1=1639835&r2=1639836&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Sat Nov 15 05:30:58 2014
@@ -23,27 +23,31 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.spark.api.java.JavaPairRDD;
+
import com.google.common.base.Preconditions;
+
import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+
import scala.Tuple2;
public class MapInput implements SparkTran<WritableComparable, Writable,
WritableComparable, Writable> {
private JavaPairRDD<WritableComparable, Writable> hadoopRDD;
- private boolean toCache;
+ private StorageLevel storageLevel;
public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD) {
- this(hadoopRDD, false);
+ this(hadoopRDD, null);
}
- public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache) {
+ public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, StorageLevel level) {
this.hadoopRDD = hadoopRDD;
- this.toCache = toCache;
+ setStorageLevel(level);
}
- public void setToCache(boolean toCache) {
- this.toCache = toCache;
+ public void setStorageLevel(StorageLevel level) {
+ storageLevel = level;
}
@Override
@@ -51,7 +55,8 @@ public class MapInput implements SparkTr
JavaPairRDD<WritableComparable, Writable> input) {
Preconditions.checkArgument(input == null,
"AssertionError: MapInput doesn't take any input");
- return toCache ? hadoopRDD.mapToPair(new CopyFunction()).cache() : hadoopRDD;
+ return storageLevel == null || storageLevel.equals(StorageLevel.NONE()) ? hadoopRDD :
+ hadoopRDD.mapToPair(new CopyFunction()).persist(storageLevel);
}
private static class CopyFunction implements PairFunction<Tuple2<WritableComparable, Writable>,
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java?rev=1639836&r1=1639835&r2=1639836&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java Sat Nov 15 05:30:58 2014
@@ -21,25 +21,26 @@ package org.apache.hadoop.hive.ql.exec.s
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.io.BytesWritable;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.storage.StorageLevel;
public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, Iterable<BytesWritable>> {
private final SparkShuffler shuffler;
private final int numOfPartitions;
- private final boolean toCache;
+ private final StorageLevel storageLevel;
public ShuffleTran(SparkShuffler sf, int n) {
- this(sf, n, false);
+ this(sf, n, null);
}
- public ShuffleTran(SparkShuffler sf, int n, boolean c) {
+ public ShuffleTran(SparkShuffler sf, int n, StorageLevel level) {
shuffler = sf;
numOfPartitions = n;
- toCache = c;
+ storageLevel = level;
}
@Override
public JavaPairRDD<HiveKey, Iterable<BytesWritable>> transform(JavaPairRDD<HiveKey, BytesWritable> input) {
JavaPairRDD<HiveKey, Iterable<BytesWritable>> result = shuffler.shuffle(input, numOfPartitions);
- return toCache ? result.cache() : result;
+ return storageLevel == null || storageLevel.equals(StorageLevel.NONE()) ? result : result.persist(storageLevel);
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1639836&r1=1639835&r2=1639836&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Sat Nov 15 05:30:58 2014
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Map;
import com.google.common.base.Preconditions;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -53,6 +54,7 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
public class SparkPlanGenerator {
private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class);
@@ -131,7 +133,8 @@ public class SparkPlanGenerator {
sparkPlan.addTran(result);
} else if (work instanceof ReduceWork) {
List<BaseWork> parentWorks = sparkWork.getParents(work);
- result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work));
+ StorageLevel level = cloneToWork.containsKey(work) ? getStorageLevel(jobConf) : null;
+ result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), level);
sparkPlan.addTran(result);
for (BaseWork parentWork : parentWorks) {
sparkPlan.connect(workToTranMap.get(parentWork), result);
@@ -148,6 +151,20 @@ public class SparkPlanGenerator {
return result;
}
+ private StorageLevel getStorageLevel(JobConf jobConf) {
+ String storageLevel = jobConf.get("spark.storage.level");
+ if (storageLevel == null || storageLevel.isEmpty()) {
+ return StorageLevel.MEMORY_AND_DISK();
+ }
+ try {
+ return StorageLevel.fromString(storageLevel);
+ } catch (IllegalArgumentException iae) {
+ LOG.error("Invalid configuraiton for 'spark.storage.level': "
+ + storageLevel, iae);
+ throw iae;
+ }
+ }
+
private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
// MergeFileWork is sub-class of MapWork, we don't need to distinguish here
if (mWork.getInputformat() != null) {
@@ -184,11 +201,12 @@ public class SparkPlanGenerator {
JavaPairRDD<WritableComparable, Writable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
WritableComparable.class, Writable.class);
- MapInput result = new MapInput(hadoopRDD, cloneToWork.containsKey(mapWork));
+ StorageLevel level = cloneToWork.containsKey(mapWork) ? getStorageLevel(jobConf) : null;
+ MapInput result = new MapInput(hadoopRDD, level);
return result;
}
- private ShuffleTran generate(SparkEdgeProperty edge, boolean needCache) {
+ private ShuffleTran generate(SparkEdgeProperty edge, StorageLevel level) {
Preconditions.checkArgument(!edge.isShuffleNone(),
"AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
SparkShuffler shuffler;
@@ -199,7 +217,7 @@ public class SparkPlanGenerator {
} else {
shuffler = new GroupByShuffler();
}
- return new ShuffleTran(shuffler, edge.getNumPartitions(), needCache);
+ return new ShuffleTran(shuffler, edge.getNumPartitions(), level);
}
private MapTran generate(MapWork mw) throws Exception {