You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/07 02:17:35 UTC

spark git commit: [SPARK-12604][CORE] Java count(AprroxDistinct)ByKey methods return Scala Long not Java

Repository: spark
Updated Branches:
  refs/heads/master 917d3fc06 -> ac56cf605


[SPARK-12604][CORE] Java count(AprroxDistinct)ByKey methods return Scala Long not Java

Change Java countByKey, countApproxDistinctByKey return types to use Java Long, not Scala; update similar methods for consistency on java.long.Long.valueOf with no API change

Author: Sean Owen <so...@cloudera.com>

Closes #10554 from srowen/SPARK-12604.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ac56cf60
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ac56cf60
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ac56cf60

Branch: refs/heads/master
Commit: ac56cf605b61803c26e0004b43c703cca7e02d61
Parents: 917d3fc
Author: Sean Owen <so...@cloudera.com>
Authored: Wed Jan 6 17:17:32 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Jan 6 17:17:32 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/api/java/JavaPairRDD.scala | 32 ++++++++++++--------
 .../org/apache/spark/api/java/JavaRDDLike.scala | 16 +++++-----
 .../java/org/apache/spark/JavaAPISuite.java     | 18 +++++------
 .../streaming/api/java/JavaDStreamLike.scala    | 18 +++++------
 .../streaming/api/java/JavaPairDStream.scala    |  7 +++--
 5 files changed, 49 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ac56cf60/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 91dc186..76752e1 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.api.java
 
+import java.{lang => jl}
 import java.lang.{Iterable => JIterable}
-import java.util.{Comparator, List => JList, Map => JMap}
+import java.util.{Comparator, List => JList}
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
@@ -139,7 +140,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * math.ceil(numItems * samplingRate) over all key values.
    */
   def sampleByKey(withReplacement: Boolean,
-      fractions: JMap[K, Double],
+      fractions: java.util.Map[K, Double],
       seed: Long): JavaPairRDD[K, V] =
     new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions.asScala, seed))
 
@@ -154,7 +155,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * Use Utils.random.nextLong as the default seed for the random number generator.
    */
   def sampleByKey(withReplacement: Boolean,
-      fractions: JMap[K, Double]): JavaPairRDD[K, V] =
+      fractions: java.util.Map[K, Double]): JavaPairRDD[K, V] =
     sampleByKey(withReplacement, fractions, Utils.random.nextLong)
 
   /**
@@ -168,7 +169,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * two additional passes.
    */
   def sampleByKeyExact(withReplacement: Boolean,
-      fractions: JMap[K, Double],
+      fractions: java.util.Map[K, Double],
       seed: Long): JavaPairRDD[K, V] =
     new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions.asScala, seed))
 
@@ -184,7 +185,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    *
    * Use Utils.random.nextLong as the default seed for the random number generator.
    */
-  def sampleByKeyExact(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] =
+  def sampleByKeyExact(
+      withReplacement: Boolean,
+      fractions: java.util.Map[K, Double]): JavaPairRDD[K, V] =
     sampleByKeyExact(withReplacement, fractions, Utils.random.nextLong)
 
   /**
@@ -292,7 +295,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
     mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func))
 
   /** Count the number of elements for each key, and return the result to the master as a Map. */
-  def countByKey(): java.util.Map[K, Long] = mapAsSerializableJavaMap(rdd.countByKey())
+  def countByKey(): java.util.Map[K, jl.Long] =
+    mapAsSerializableJavaMap(rdd.countByKey().mapValues(jl.Long.valueOf))
 
   /**
    * Approximate version of countByKey that can return a partial result if it does
@@ -934,9 +938,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    *                   It must be greater than 0.000017.
    * @param partitioner partitioner of the resulting RDD.
    */
-  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
-  {
-    fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
+  def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner)
+  : JavaPairRDD[K, jl.Long] = {
+    fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)).
+      asInstanceOf[JavaPairRDD[K, jl.Long]]
   }
 
   /**
@@ -950,8 +955,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    *                   It must be greater than 0.000017.
    * @param numPartitions number of partitions of the resulting RDD.
    */
-  def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
-    fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
+  def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, jl.Long] = {
+    fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)).
+      asInstanceOf[JavaPairRDD[K, jl.Long]]
   }
 
   /**
@@ -964,8 +970,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
    * @param relativeSD Relative accuracy. Smaller values create counters that require more space.
    *                   It must be greater than 0.000017.
    */
-  def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
-    fromRDD(rdd.countApproxDistinctByKey(relativeSD))
+  def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, jl.Long] = {
+    fromRDD(rdd.countApproxDistinctByKey(relativeSD)).asInstanceOf[JavaPairRDD[K, jl.Long]]
   }
 
   /** Assign a name to this RDD */

http://git-wip-us.apache.org/repos/asf/spark/blob/ac56cf60/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 6d3485d..1b1a9dc 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.api.java
 
 import java.{lang => jl}
-import java.lang.{Iterable => JIterable, Long => JLong}
+import java.lang.{Iterable => JIterable}
 import java.util.{Comparator, Iterator => JIterator, List => JList}
 
 import scala.collection.JavaConverters._
@@ -305,8 +305,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
    * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
    */
-  def zipWithUniqueId(): JavaPairRDD[T, JLong] = {
-    JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]]
+  def zipWithUniqueId(): JavaPairRDD[T, jl.Long] = {
+    JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, jl.Long]]
   }
 
   /**
@@ -316,8 +316,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
    * This method needs to trigger a spark job when this RDD contains more than one partitions.
    */
-  def zipWithIndex(): JavaPairRDD[T, JLong] = {
-    JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]]
+  def zipWithIndex(): JavaPairRDD[T, jl.Long] = {
+    JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, jl.Long]]
   }
 
   // Actions (launch a job to return a value to the user program)
@@ -448,7 +448,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * combine step happens locally on the master, equivalent to running a single reduce task.
    */
   def countByValue(): java.util.Map[T, jl.Long] =
-    mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new jl.Long(x._2)))))
+    mapAsSerializableJavaMap(rdd.countByValue().mapValues(jl.Long.valueOf))
 
   /**
    * (Experimental) Approximate version of countByValue().
@@ -631,8 +631,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
    * The asynchronous version of `count`, which returns a
    * future for counting the number of elements in this RDD.
    */
-  def countAsync(): JavaFutureAction[JLong] = {
-    new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf)
+  def countAsync(): JavaFutureAction[jl.Long] = {
+    new JavaFutureActionWrapper[Long, jl.Long](rdd.countAsync(), jl.Long.valueOf)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/ac56cf60/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 502f86f..47382e4 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1580,11 +1580,11 @@ public class JavaAPISuite implements Serializable {
     }
     double relativeSD = 0.001;
     JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
-    List<Tuple2<Integer, Object>> res =  pairRdd.countApproxDistinctByKey(relativeSD, 8).collect();
-    for (Tuple2<Integer, Object> resItem : res) {
-      double count = (double)resItem._1();
-      Long resCount = (Long)resItem._2();
-      Double error = Math.abs((resCount - count) / count);
+    List<Tuple2<Integer, Long>> res =  pairRdd.countApproxDistinctByKey(relativeSD, 8).collect();
+    for (Tuple2<Integer, Long> resItem : res) {
+      double count = resItem._1();
+      long resCount = resItem._2();
+      double error = Math.abs((resCount - count) / count);
       Assert.assertTrue(error < 0.1);
     }
 
@@ -1633,12 +1633,12 @@ public class JavaAPISuite implements Serializable {
     fractions.put(0, 0.5);
     fractions.put(1, 1.0);
     JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L);
-    Map<Integer, Long> wrCounts = (Map<Integer, Long>) (Object) wr.countByKey();
+    Map<Integer, Long> wrCounts = wr.countByKey();
     Assert.assertEquals(2, wrCounts.size());
     Assert.assertTrue(wrCounts.get(0) > 0);
     Assert.assertTrue(wrCounts.get(1) > 0);
     JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L);
-    Map<Integer, Long> worCounts = (Map<Integer, Long>) (Object) wor.countByKey();
+    Map<Integer, Long> worCounts = wor.countByKey();
     Assert.assertEquals(2, worCounts.size());
     Assert.assertTrue(worCounts.get(0) > 0);
     Assert.assertTrue(worCounts.get(1) > 0);
@@ -1659,12 +1659,12 @@ public class JavaAPISuite implements Serializable {
     fractions.put(0, 0.5);
     fractions.put(1, 1.0);
     JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L);
-    Map<Integer, Long> wrExactCounts = (Map<Integer, Long>) (Object) wrExact.countByKey();
+    Map<Integer, Long> wrExactCounts = wrExact.countByKey();
     Assert.assertEquals(2, wrExactCounts.size());
     Assert.assertTrue(wrExactCounts.get(0) == 2);
     Assert.assertTrue(wrExactCounts.get(1) == 4);
     JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L);
-    Map<Integer, Long> worExactCounts = (Map<Integer, Long>) (Object) worExact.countByKey();
+    Map<Integer, Long> worExactCounts = worExact.countByKey();
     Assert.assertEquals(2, worExactCounts.size());
     Assert.assertTrue(worExactCounts.get(0) == 2);
     Assert.assertTrue(worExactCounts.get(1) == 4);

http://git-wip-us.apache.org/repos/asf/spark/blob/ac56cf60/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 84acec7..733147f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.streaming.api.java
 
-import java.lang.{Long => JLong}
+import java.{lang => jl}
 import java.util.{List => JList}
 
 import scala.collection.JavaConverters._
@@ -50,8 +50,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
 
   def wrapRDD(in: RDD[T]): R
 
-  implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
-    in.map(new JLong(_))
+  implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[jl.Long] = {
+    in.map(jl.Long.valueOf)
   }
 
   /**
@@ -74,14 +74,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * Return a new DStream in which each RDD has a single element generated by counting each RDD
    * of this DStream.
    */
-  def count(): JavaDStream[JLong] = dstream.count()
+  def count(): JavaDStream[jl.Long] = dstream.count()
 
   /**
    * Return a new DStream in which each RDD contains the counts of each distinct value in
    * each RDD of this DStream.  Hash partitioning is used to generate the RDDs with
    * Spark's default number of partitions.
    */
-  def countByValue(): JavaPairDStream[T, JLong] = {
+  def countByValue(): JavaPairDStream[T, jl.Long] = {
     JavaPairDStream.scalaToJavaLong(dstream.countByValue())
   }
 
@@ -91,7 +91,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * partitions.
    * @param numPartitions  number of partitions of each RDD in the new DStream.
    */
-  def countByValue(numPartitions: Int): JavaPairDStream[T, JLong] = {
+  def countByValue(numPartitions: Int): JavaPairDStream[T, jl.Long] = {
     JavaPairDStream.scalaToJavaLong(dstream.countByValue(numPartitions))
   }
 
@@ -101,7 +101,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * of elements in a window over this DStream. windowDuration and slideDuration are as defined in
    * the window() operation. This is equivalent to window(windowDuration, slideDuration).count()
    */
-  def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = {
+  def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[jl.Long] = {
     dstream.countByWindow(windowDuration, slideDuration)
   }
 
@@ -116,7 +116,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    *                       DStream's batching interval
    */
   def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration)
-    : JavaPairDStream[T, JLong] = {
+    : JavaPairDStream[T, jl.Long] = {
     JavaPairDStream.scalaToJavaLong(
       dstream.countByValueAndWindow(windowDuration, slideDuration))
   }
@@ -133,7 +133,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
    * @param numPartitions  number of partitions of each RDD in the new DStream.
    */
   def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
-    : JavaPairDStream[T, JLong] = {
+    : JavaPairDStream[T, jl.Long] = {
     JavaPairDStream.scalaToJavaLong(
       dstream.countByValueAndWindow(windowDuration, slideDuration, numPartitions))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ac56cf60/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 2bf3cce..af0d84b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.streaming.api.java
 
-import java.lang.{Iterable => JIterable, Long => JLong}
+import java.{lang => jl}
+import java.lang.{Iterable => JIterable}
 import java.util.{List => JList}
 
 import scala.collection.JavaConverters._
@@ -847,7 +848,7 @@ object JavaPairDStream {
   }
 
   def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long])
-  : JavaPairDStream[K, JLong] = {
-    DStream.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
+  : JavaPairDStream[K, jl.Long] = {
+    DStream.toPairDStreamFunctions(dstream.dstream).mapValues(jl.Long.valueOf)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org