You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/02/11 10:33:15 UTC

spark git commit: [SPARK-13074][CORE] Add JavaSparkContext. getPersistentRDDs method

Repository: spark
Updated Branches:
  refs/heads/master c2f21d889 -> f9ae99fee


[SPARK-13074][CORE] Add JavaSparkContext. getPersistentRDDs method

The "getPersistentRDDs()" is a useful API of SparkContext to get cached RDDs. However, the JavaSparkContext does not have this API.

Add a simple getPersistentRDDs() to get java.util.Map<Integer, JavaRDD> for Java users.

Author: Junyang <fl...@gmail.com>

Closes #10978 from flyjy/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f9ae99fe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f9ae99fe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f9ae99fe

Branch: refs/heads/master
Commit: f9ae99fee13681e436fde9899b6a189746348ba1
Parents: c2f21d8
Author: Junyang <fl...@gmail.com>
Authored: Thu Feb 11 09:33:11 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Feb 11 09:33:11 2016 +0000

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaSparkContext.scala    | 10 ++++++++++
 core/src/test/java/org/apache/spark/JavaAPISuite.java   | 12 ++++++++++++
 2 files changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f9ae99fe/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 01433ca..f1aebbc 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -774,6 +774,16 @@ class JavaSparkContext(val sc: SparkContext)
 
   /** Cancel all jobs that have been scheduled or are running. */
   def cancelAllJobs(): Unit = sc.cancelAllJobs()
+
+  /**
+   * Returns an Java map of JavaRDDs that have marked themselves as persistent via cache() call.
+   * Note that this does not necessarily mean the caching or computation was successful.
+   */
+  def getPersistentRDDs: JMap[java.lang.Integer, JavaRDD[_]] = {
+    sc.getPersistentRDDs.mapValues(s => JavaRDD.fromRDD(s))
+      .asJava.asInstanceOf[JMap[java.lang.Integer, JavaRDD[_]]]
+  }
+
 }
 
 object JavaSparkContext {

http://git-wip-us.apache.org/repos/asf/spark/blob/f9ae99fe/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 8117ad9..e6a4ab7 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1811,4 +1811,16 @@ public class JavaAPISuite implements Serializable {
         conf.get("spark.kryo.classesToRegister"));
   }
 
+  @Test
+  public void testGetPersistentRDDs() {
+    java.util.Map<Integer, JavaRDD<?>> cachedRddsMap = sc.getPersistentRDDs();
+    Assert.assertTrue(cachedRddsMap.isEmpty());
+    JavaRDD<String> rdd1 = sc.parallelize(Arrays.asList("a", "b")).setName("RDD1").cache();
+    JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("c", "d")).setName("RDD2").cache();
+    cachedRddsMap = sc.getPersistentRDDs();
+    Assert.assertEquals(2, cachedRddsMap.size());
+    Assert.assertEquals("RDD1", cachedRddsMap.get(0).name());
+    Assert.assertEquals("RDD2", cachedRddsMap.get(1).name());
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org