You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/11/17 20:57:13 UTC

svn commit: r1640218 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: MapInput.java ShuffleTran.java SparkPlanGenerator.java

Author: szehon
Date: Mon Nov 17 19:57:12 2014
New Revision: 1640218

URL: http://svn.apache.org/r1640218
Log:
HIVE-8892 : Use MEMORY_AND_DISK for RDD caching [Spark Branch] (Jimmy Xiang via Szehon)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1640218&r1=1640217&r2=1640218&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Mon Nov 17 19:57:12 2014
@@ -35,19 +35,19 @@ import scala.Tuple2;
 public class MapInput implements SparkTran<WritableComparable, Writable,
     WritableComparable, Writable> {
   private JavaPairRDD<WritableComparable, Writable> hadoopRDD;
-  private StorageLevel storageLevel;
+  private boolean toCache;
 
   public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD) {
-    this(hadoopRDD, null);
+    this(hadoopRDD, false);
   }
 
-  public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, StorageLevel level) {
+  public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache) {
     this.hadoopRDD = hadoopRDD;
-    setStorageLevel(level);
+    this.toCache = toCache;
   }
 
-  public void setStorageLevel(StorageLevel level) {
-    storageLevel = level;
+  public void setToCache(boolean toCache) {
+    this.toCache = toCache;
   }
 
   @Override
@@ -55,8 +55,8 @@ public class MapInput implements SparkTr
       JavaPairRDD<WritableComparable, Writable> input) {
     Preconditions.checkArgument(input == null,
         "AssertionError: MapInput doesn't take any input");
-    return storageLevel == null || storageLevel.equals(StorageLevel.NONE()) ? hadoopRDD :
-      hadoopRDD.mapToPair(new CopyFunction()).persist(storageLevel);
+    return toCache ? hadoopRDD.mapToPair(
+      new CopyFunction()).persist(StorageLevel.MEMORY_AND_DISK()) : hadoopRDD;
   }
 
   private static class CopyFunction implements PairFunction<Tuple2<WritableComparable, Writable>,

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java?rev=1640218&r1=1640217&r2=1640218&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java Mon Nov 17 19:57:12 2014
@@ -26,21 +26,21 @@ import org.apache.spark.storage.StorageL
 public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, Iterable<BytesWritable>> {
   private final SparkShuffler shuffler;
   private final int numOfPartitions;
-  private final StorageLevel storageLevel;
+  private final boolean toCache;
 
   public ShuffleTran(SparkShuffler sf, int n) {
-    this(sf, n, null);
+    this(sf, n, false);
   }
 
-  public ShuffleTran(SparkShuffler sf, int n, StorageLevel level) {
+  public ShuffleTran(SparkShuffler sf, int n, boolean toCache) {
     shuffler = sf;
     numOfPartitions = n;
-    storageLevel = level;
+    this.toCache = toCache;
   }
 
   @Override
   public JavaPairRDD<HiveKey, Iterable<BytesWritable>> transform(JavaPairRDD<HiveKey, BytesWritable> input) {
     JavaPairRDD<HiveKey, Iterable<BytesWritable>> result = shuffler.shuffle(input, numOfPartitions);
-    return storageLevel == null || storageLevel.equals(StorageLevel.NONE()) ? result : result.persist(storageLevel);
+    return toCache ? result.persist(StorageLevel.MEMORY_AND_DISK()) : result;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1640218&r1=1640217&r2=1640218&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Mon Nov 17 19:57:12 2014
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -54,7 +53,6 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.storage.StorageLevel;
 
 public class SparkPlanGenerator {
   private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class);
@@ -133,8 +131,7 @@ public class SparkPlanGenerator {
       sparkPlan.addTran(result);
     } else if (work instanceof ReduceWork) {
       List<BaseWork> parentWorks = sparkWork.getParents(work);
-      StorageLevel level = cloneToWork.containsKey(work) ? getStorageLevel(jobConf) : null;
-      result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), level);
+      result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work));
       sparkPlan.addTran(result);
       for (BaseWork parentWork : parentWorks) {
         sparkPlan.connect(workToTranMap.get(parentWork), result);
@@ -151,20 +148,6 @@ public class SparkPlanGenerator {
     return result;
   }
 
-  private StorageLevel getStorageLevel(JobConf jobConf) {
-    String storageLevel = jobConf.get("spark.storage.level");
-    if (storageLevel == null || storageLevel.isEmpty()) {
-      return StorageLevel.MEMORY_AND_DISK();
-    }
-    try {
-      return StorageLevel.fromString(storageLevel);
-    } catch (IllegalArgumentException iae) {
-      LOG.error("Invalid configuraiton for 'spark.storage.level': "
-        +  storageLevel, iae);
-      throw iae;
-    }
-  }
-
   private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
     // MergeFileWork is sub-class of MapWork, we don't need to distinguish here
     if (mWork.getInputformat() != null) {
@@ -201,12 +184,11 @@ public class SparkPlanGenerator {
 
     JavaPairRDD<WritableComparable, Writable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
         WritableComparable.class, Writable.class);
-    StorageLevel level = cloneToWork.containsKey(mapWork) ? getStorageLevel(jobConf) : null;
-    MapInput result = new MapInput(hadoopRDD, level);
+    MapInput result = new MapInput(hadoopRDD, cloneToWork.containsKey(mapWork));
     return result;
   }
 
-  private ShuffleTran generate(SparkEdgeProperty edge, StorageLevel level) {
+  private ShuffleTran generate(SparkEdgeProperty edge, boolean toCache) {
     Preconditions.checkArgument(!edge.isShuffleNone(),
         "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
     SparkShuffler shuffler;
@@ -217,7 +199,7 @@ public class SparkPlanGenerator {
     } else {
       shuffler = new GroupByShuffler();
     }
-    return new ShuffleTran(shuffler, edge.getNumPartitions(), level);
+    return new ShuffleTran(shuffler, edge.getNumPartitions(), toCache);
   }
 
   private MapTran generate(MapWork mw) throws Exception {