You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/11/17 20:57:13 UTC
svn commit: r1640218 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark:
MapInput.java ShuffleTran.java SparkPlanGenerator.java
Author: szehon
Date: Mon Nov 17 19:57:12 2014
New Revision: 1640218
URL: http://svn.apache.org/r1640218
Log:
HIVE-8892 : Use MEMORY_AND_DISK for RDD caching [Spark Branch] (Jimmy Xiang via Szehon)
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1640218&r1=1640217&r2=1640218&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Mon Nov 17 19:57:12 2014
@@ -35,19 +35,19 @@ import scala.Tuple2;
public class MapInput implements SparkTran<WritableComparable, Writable,
WritableComparable, Writable> {
private JavaPairRDD<WritableComparable, Writable> hadoopRDD;
- private StorageLevel storageLevel;
+ private boolean toCache;
public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD) {
- this(hadoopRDD, null);
+ this(hadoopRDD, false);
}
- public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, StorageLevel level) {
+ public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache) {
this.hadoopRDD = hadoopRDD;
- setStorageLevel(level);
+ this.toCache = toCache;
}
- public void setStorageLevel(StorageLevel level) {
- storageLevel = level;
+ public void setToCache(boolean toCache) {
+ this.toCache = toCache;
}
@Override
@@ -55,8 +55,8 @@ public class MapInput implements SparkTr
JavaPairRDD<WritableComparable, Writable> input) {
Preconditions.checkArgument(input == null,
"AssertionError: MapInput doesn't take any input");
- return storageLevel == null || storageLevel.equals(StorageLevel.NONE()) ? hadoopRDD :
- hadoopRDD.mapToPair(new CopyFunction()).persist(storageLevel);
+ return toCache ? hadoopRDD.mapToPair(
+ new CopyFunction()).persist(StorageLevel.MEMORY_AND_DISK()) : hadoopRDD;
}
private static class CopyFunction implements PairFunction<Tuple2<WritableComparable, Writable>,
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java?rev=1640218&r1=1640217&r2=1640218&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java Mon Nov 17 19:57:12 2014
@@ -26,21 +26,21 @@ import org.apache.spark.storage.StorageL
public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, Iterable<BytesWritable>> {
private final SparkShuffler shuffler;
private final int numOfPartitions;
- private final StorageLevel storageLevel;
+ private final boolean toCache;
public ShuffleTran(SparkShuffler sf, int n) {
- this(sf, n, null);
+ this(sf, n, false);
}
- public ShuffleTran(SparkShuffler sf, int n, StorageLevel level) {
+ public ShuffleTran(SparkShuffler sf, int n, boolean toCache) {
shuffler = sf;
numOfPartitions = n;
- storageLevel = level;
+ this.toCache = toCache;
}
@Override
public JavaPairRDD<HiveKey, Iterable<BytesWritable>> transform(JavaPairRDD<HiveKey, BytesWritable> input) {
JavaPairRDD<HiveKey, Iterable<BytesWritable>> result = shuffler.shuffle(input, numOfPartitions);
- return storageLevel == null || storageLevel.equals(StorageLevel.NONE()) ? result : result.persist(storageLevel);
+ return toCache ? result.persist(StorageLevel.MEMORY_AND_DISK()) : result;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1640218&r1=1640217&r2=1640218&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Mon Nov 17 19:57:12 2014
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import com.google.common.base.Preconditions;
-
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,7 +53,6 @@ import org.apache.hadoop.io.WritableComp
import org.apache.hadoop.mapred.JobConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.storage.StorageLevel;
public class SparkPlanGenerator {
private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class);
@@ -133,8 +131,7 @@ public class SparkPlanGenerator {
sparkPlan.addTran(result);
} else if (work instanceof ReduceWork) {
List<BaseWork> parentWorks = sparkWork.getParents(work);
- StorageLevel level = cloneToWork.containsKey(work) ? getStorageLevel(jobConf) : null;
- result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), level);
+ result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work));
sparkPlan.addTran(result);
for (BaseWork parentWork : parentWorks) {
sparkPlan.connect(workToTranMap.get(parentWork), result);
@@ -151,20 +148,6 @@ public class SparkPlanGenerator {
return result;
}
- private StorageLevel getStorageLevel(JobConf jobConf) {
- String storageLevel = jobConf.get("spark.storage.level");
- if (storageLevel == null || storageLevel.isEmpty()) {
- return StorageLevel.MEMORY_AND_DISK();
- }
- try {
- return StorageLevel.fromString(storageLevel);
- } catch (IllegalArgumentException iae) {
- LOG.error("Invalid configuraiton for 'spark.storage.level': "
- + storageLevel, iae);
- throw iae;
- }
- }
-
private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
// MergeFileWork is sub-class of MapWork, we don't need to distinguish here
if (mWork.getInputformat() != null) {
@@ -201,12 +184,11 @@ public class SparkPlanGenerator {
JavaPairRDD<WritableComparable, Writable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
WritableComparable.class, Writable.class);
- StorageLevel level = cloneToWork.containsKey(mapWork) ? getStorageLevel(jobConf) : null;
- MapInput result = new MapInput(hadoopRDD, level);
+ MapInput result = new MapInput(hadoopRDD, cloneToWork.containsKey(mapWork));
return result;
}
- private ShuffleTran generate(SparkEdgeProperty edge, StorageLevel level) {
+ private ShuffleTran generate(SparkEdgeProperty edge, boolean toCache) {
Preconditions.checkArgument(!edge.isShuffleNone(),
"AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
SparkShuffler shuffler;
@@ -217,7 +199,7 @@ public class SparkPlanGenerator {
} else {
shuffler = new GroupByShuffler();
}
- return new ShuffleTran(shuffler, edge.getNumPartitions(), level);
+ return new ShuffleTran(shuffler, edge.getNumPartitions(), toCache);
}
private MapTran generate(MapWork mw) throws Exception {