You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2014/11/15 06:30:58 UTC

svn commit: r1639836 - in /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark: MapInput.java ShuffleTran.java SparkPlanGenerator.java

Author: szehon
Date: Sat Nov 15 05:30:58 2014
New Revision: 1639836

URL: http://svn.apache.org/r1639836
Log:
HIVE-8844 : Choose a persisent policy for RDD caching [Spark Branch] (Jimmy Xiang via Szehon)

Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java?rev=1639836&r1=1639835&r2=1639836&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/MapInput.java Sat Nov 15 05:30:58 2014
@@ -23,27 +23,31 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.spark.api.java.JavaPairRDD;
+
 import com.google.common.base.Preconditions;
+
 import org.apache.spark.api.java.function.PairFunction;
+import org.apache.spark.storage.StorageLevel;
+
 import scala.Tuple2;
 
 
 public class MapInput implements SparkTran<WritableComparable, Writable,
     WritableComparable, Writable> {
   private JavaPairRDD<WritableComparable, Writable> hadoopRDD;
-  private boolean toCache;
+  private StorageLevel storageLevel;
 
   public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD) {
-    this(hadoopRDD, false);
+    this(hadoopRDD, null);
   }
 
-  public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, boolean toCache) {
+  public MapInput(JavaPairRDD<WritableComparable, Writable> hadoopRDD, StorageLevel level) {
     this.hadoopRDD = hadoopRDD;
-    this.toCache = toCache;
+    setStorageLevel(level);
   }
 
-  public void setToCache(boolean toCache) {
-    this.toCache = toCache;
+  public void setStorageLevel(StorageLevel level) {
+    storageLevel = level;
   }
 
   @Override
@@ -51,7 +55,8 @@ public class MapInput implements SparkTr
       JavaPairRDD<WritableComparable, Writable> input) {
     Preconditions.checkArgument(input == null,
         "AssertionError: MapInput doesn't take any input");
-    return toCache ? hadoopRDD.mapToPair(new CopyFunction()).cache() : hadoopRDD;
+    return storageLevel == null || storageLevel.equals(StorageLevel.NONE()) ? hadoopRDD :
+      hadoopRDD.mapToPair(new CopyFunction()).persist(storageLevel);
   }
 
   private static class CopyFunction implements PairFunction<Tuple2<WritableComparable, Writable>,

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java?rev=1639836&r1=1639835&r2=1639836&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/ShuffleTran.java Sat Nov 15 05:30:58 2014
@@ -21,25 +21,26 @@ package org.apache.hadoop.hive.ql.exec.s
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.storage.StorageLevel;
 
 public class ShuffleTran implements SparkTran<HiveKey, BytesWritable, HiveKey, Iterable<BytesWritable>> {
   private final SparkShuffler shuffler;
   private final int numOfPartitions;
-  private final boolean toCache;
+  private final StorageLevel storageLevel;
 
   public ShuffleTran(SparkShuffler sf, int n) {
-    this(sf, n, false);
+    this(sf, n, null);
   }
 
-  public ShuffleTran(SparkShuffler sf, int n, boolean c) {
+  public ShuffleTran(SparkShuffler sf, int n, StorageLevel level) {
     shuffler = sf;
     numOfPartitions = n;
-    toCache = c;
+    storageLevel = level;
   }
 
   @Override
   public JavaPairRDD<HiveKey, Iterable<BytesWritable>> transform(JavaPairRDD<HiveKey, BytesWritable> input) {
     JavaPairRDD<HiveKey, Iterable<BytesWritable>> result = shuffler.shuffle(input, numOfPartitions);
-    return toCache ? result.cache() : result;
+    return storageLevel == null || storageLevel.equals(StorageLevel.NONE()) ? result : result.persist(storageLevel);
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java?rev=1639836&r1=1639835&r2=1639836&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java Sat Nov 15 05:30:58 2014
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Map;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -53,6 +54,7 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
 
 public class SparkPlanGenerator {
   private static final Log LOG = LogFactory.getLog(SparkPlanGenerator.class);
@@ -131,7 +133,8 @@ public class SparkPlanGenerator {
       sparkPlan.addTran(result);
     } else if (work instanceof ReduceWork) {
       List<BaseWork> parentWorks = sparkWork.getParents(work);
-      result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), cloneToWork.containsKey(work));
+      StorageLevel level = cloneToWork.containsKey(work) ? getStorageLevel(jobConf) : null;
+      result = generate(sparkWork.getEdgeProperty(parentWorks.get(0), work), level);
       sparkPlan.addTran(result);
       for (BaseWork parentWork : parentWorks) {
         sparkPlan.connect(workToTranMap.get(parentWork), result);
@@ -148,6 +151,20 @@ public class SparkPlanGenerator {
     return result;
   }
 
+  private StorageLevel getStorageLevel(JobConf jobConf) {
+    String storageLevel = jobConf.get("spark.storage.level");
+    if (storageLevel == null || storageLevel.isEmpty()) {
+      return StorageLevel.MEMORY_AND_DISK();
+    }
+    try {
+      return StorageLevel.fromString(storageLevel);
+    } catch (IllegalArgumentException iae) {
+      LOG.error("Invalid configuraiton for 'spark.storage.level': "
+        +  storageLevel, iae);
+      throw iae;
+    }
+  }
+
   private Class getInputFormat(JobConf jobConf, MapWork mWork) throws HiveException {
     // MergeFileWork is sub-class of MapWork, we don't need to distinguish here
     if (mWork.getInputformat() != null) {
@@ -184,11 +201,12 @@ public class SparkPlanGenerator {
 
     JavaPairRDD<WritableComparable, Writable> hadoopRDD = sc.hadoopRDD(jobConf, ifClass,
         WritableComparable.class, Writable.class);
-    MapInput result = new MapInput(hadoopRDD, cloneToWork.containsKey(mapWork));
+    StorageLevel level = cloneToWork.containsKey(mapWork) ? getStorageLevel(jobConf) : null;
+    MapInput result = new MapInput(hadoopRDD, level);
     return result;
   }
 
-  private ShuffleTran generate(SparkEdgeProperty edge, boolean needCache) {
+  private ShuffleTran generate(SparkEdgeProperty edge, StorageLevel level) {
     Preconditions.checkArgument(!edge.isShuffleNone(),
         "AssertionError: SHUFFLE_NONE should only be used for UnionWork.");
     SparkShuffler shuffler;
@@ -199,7 +217,7 @@ public class SparkPlanGenerator {
     } else {
       shuffler = new GroupByShuffler();
     }
-    return new ShuffleTran(shuffler, edge.getNumPartitions(), needCache);
+    return new ShuffleTran(shuffler, edge.getNumPartitions(), level);
   }
 
   private MapTran generate(MapWork mw) throws Exception {