You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/11/26 09:00:35 UTC

[1/9] git commit: Initial commit of adding histogram functionality to the DoubleRDDFunctions.

Updated Branches:
  refs/heads/master 0e2109ddb -> 18d6df0e1


Initial commit of adding histogram functionality to the DoubleRDDFunctions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2a372358
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2a372358
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2a372358

Branch: refs/heads/master
Commit: 2a37235825cecd3f75286d11456c6e3cb13d4327
Parents: dca8009
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Fri Oct 18 00:07:49 2013 -0700
Committer: Holden Karau <ho...@pigscanfly.ca>
Committed: Sat Oct 19 00:57:25 2013 -0700

----------------------------------------------------------------------
 .../apache/spark/api/java/JavaDoubleRDD.scala   |  32 +++
 .../apache/spark/rdd/DoubleRDDFunctions.scala   | 134 +++++++++++
 .../org/apache/spark/rdd/DoubleRDDSuite.scala   | 233 +++++++++++++++++++
 3 files changed, 399 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2a372358/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index 5fd1fab..d2a2818 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -26,6 +26,8 @@ import org.apache.spark.storage.StorageLevel
 import java.lang.Double
 import org.apache.spark.Partitioner
 
+import scala.collection.JavaConverters._
+
 class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
 
   override val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]]
@@ -158,6 +160,36 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
 
   /** (Experimental) Approximate operation to return the sum within a timeout. */
   def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout)
+
+  /**
+   * Compute a histogram of the data using bucketCount number of buckets evenly
+   *  spaced between the minimum and maximum of the RDD. For example if the min
+   *  value is 0 and the max is 100 and there are two buckets the resulting
+   *  buckets will be [0,50) [50,100]. bucketCount must be at least 1
+   * If the RDD contains infinity, NaN throws an exception
+   * If the elements in RDD do not vary (max == min) throws an exception
+   */
+  def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
+    val result = srdd.histogram(bucketCount)
+    (result._1.map(scala.Double.box(_)), result._2)
+  }
+  /**
+   * Compute a histogram using the provided buckets. The buckets are all open
+   * to the left except for the last which is closed
+   *  e.g. for the array
+   *  [1,10,20,50] the buckets are [1,10) [10,20) [20,50]
+   *  e.g 1<=x<10 , 10<=x<20, 20<=x<50
+   *  And on the input of 1 and 50 we would have a histogram of 1,0,0 
+   * 
+   * Note: if your histogram is evenly spaced (e.g. [0,10,20,30]) this switches
+   * from an O(log n) inseration to O(1) per element. (where n = # buckets)
+   * buckets must be sorted and not contain any duplicates.
+   * buckets array must be at least two elements 
+   * All NaN entries are treated the same.
+   */
+  def histogram(buckets: Array[Double]): Array[Long] = {
+    srdd.histogram(buckets.map(_.toDouble))
+  }
 }
 
 object JavaDoubleRDD {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2a372358/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index a4bec41..776a83c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -24,6 +24,8 @@ import org.apache.spark.partial.SumEvaluator
 import org.apache.spark.util.StatCounter
 import org.apache.spark.{TaskContext, Logging}
 
+import scala.collection.immutable.NumericRange
+
 /**
  * Extra functions available on RDDs of Doubles through an implicit conversion.
  * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
@@ -76,4 +78,136 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
     val evaluator = new SumEvaluator(self.partitions.size, confidence)
     self.context.runApproximateJob(self, processPartition, evaluator, timeout)
   }
+
+  /**
+   * Compute a histogram of the data using bucketCount number of buckets evenly
+   *  spaced between the minimum and maximum of the RDD. For example if the min
+   *  value is 0 and the max is 100 and there are two buckets the resulting
+   *  buckets will be [0,50) [50,100]. bucketCount must be at least 1
+   * If the RDD contains infinity, NaN throws an exception
+   * If the elements in RDD do not vary (max == min) throws an exception
+   */
+  def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
+    // Compute the minimum and the maxium
+    val (max: Double, min: Double) = self.mapPartitions { items =>
+      Iterator(items.foldRight(-1/0.0, Double.NaN)((e: Double, x: Pair[Double, Double]) =>
+        (x._1.max(e),x._2.min(e))))
+    }.reduce { (maxmin1, maxmin2) =>
+      (maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
+    }
+    if (max.isNaN() || max.isInfinity || min.isInfinity ) {
+      throw new UnsupportedOperationException("Histogram on either an empty RDD or RDD containing +-infinity or NaN")
+    }
+    if (max == min) {
+      throw new UnsupportedOperationException("Histogram with no range in elements")
+    }
+    val increment: Double = (max-min)/bucketCount.toDouble
+    val range = Range.Double.inclusive(min, max, increment)
+    val buckets: Array[Double] = range.toArray
+    (buckets,histogram(buckets))
+  }
+  /**
+   * Compute a histogram using the provided buckets. The buckets are all open
+   * to the left except for the last which is closed
+   *  e.g. for the array
+   *  [1,10,20,50] the buckets are [1,10) [10,20) [20,50]
+   *  e.g 1<=x<10 , 10<=x<20, 20<=x<50
+   *  And on the input of 1 and 50 we would have a histogram of 1,0,0 
+   * 
+   * Note: if your histogram is evenly spaced (e.g. [0,10,20,30]) this switches
+   * from an O(log n) inseration to O(1) per element. (where n = # buckets)
+   * buckets must be sorted and not contain any duplicates.
+   * buckets array must be at least two elements 
+   * All NaN entries are treated the same.
+   */
+  def histogram(buckets: Array[Double]): Array[Long] = {
+    if (buckets.length < 2) {
+      throw new IllegalArgumentException("buckets array must have at least two elements")
+    }
+    // The histogramPartition function computes the partail histogram for a given
+    // partition. The provided bucketFunction determines which bucket in the array
+    // to increment or returns None if there is no bucket. This is done so we can
+    // specialize for uniformly distributed buckets and save the O(log n) binary
+    // search cost.
+    def histogramPartition(bucketFunction: (Double) => Option[Int])(iter: Iterator[Double]): Iterator[Array[Long]] = {
+      val counters = new Array[Long](buckets.length-1)
+      while (iter.hasNext) {
+        bucketFunction(iter.next()) match {
+          case Some(x: Int) => {counters(x)+=1}
+          case _ => {}
+        }
+      }
+      Iterator(counters)
+    }
+    // Merge the counters.
+    def mergeCounters(a1: Array[Long], a2: Array[Long]): Array[Long] = {
+      a1.indices.foreach(i => a1(i) += a2(i))
+      a1
+    }
+    // Basic bucket function. This works using Java's built in Array
+    // binary search. Takes log(size(buckets))
+    def basicBucketFunction(e: Double): Option[Int] = {
+      val location = java.util.Arrays.binarySearch(buckets, e)
+      if (location < 0) {
+        // If the location is less than 0 then the insertion point in the array
+        // to keep it sorted is -location-1
+        val insertionPoint = -location-1
+        // If we have to insert before the first element or after the last one
+        // its out of bounds.
+        // We do this rather than buckets.lengthCompare(insertionPoint)
+        // because Array[Double] fails to override it (for now).
+        if (insertionPoint > 0 && insertionPoint < buckets.length) {
+          Some(insertionPoint-1)
+        } else {
+          None
+        }
+      } else if (location < buckets.length-1) {
+        // Exact match, just insert here
+        Some(location)
+      } else {
+        // Exact match to the last element
+        Some(location-1)
+      }
+    }
+    // Determine the bucket function in constant time. Requires that buckets are evenly spaced
+    def fastBucketFunction(min: Double, increment: Double, count: Int)(e: Double): Option[Int] = {
+      // If our input is not a number unless the increment is also NaN then we fail fast
+      if (e.isNaN()) {
+        return None
+      }
+      val bucketNumber = (e-min)/(increment)
+      // We do this rather than buckets.lengthCompare(bucketNumber)
+      // because Array[Double] fails to override it (for now).
+      if (bucketNumber > count || bucketNumber < 0) {
+        None
+      } else {
+        Some(bucketNumber.toInt.min(count-1))
+      }
+    }
+    def evenlySpaced(buckets: Array[Double]): Boolean = {
+      val delta = buckets(1)-buckets(0)
+      // Technically you could have an evenly spaced bucket with NaN
+      // increments but then its a single bucket and this makes the
+      // fastBucketFunction simpler.
+      if (delta.isNaN() || delta.isInfinite()) {
+        return false
+      }
+      for (i <- 1 to buckets.length-1) {
+        if (buckets(i)-buckets(i-1) != delta) {
+          return false
+        }
+      }
+      true
+    }
+    // Decide which bucket function to pass to histogramPartition. We decide here
+    // rather than having a general function so that the decission need only be made
+    // once rather than once per shard
+    val bucketFunction = if (evenlySpaced(buckets)) {
+      fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _
+    } else {
+      basicBucketFunction _
+    }
+    self.mapPartitions(histogramPartition(bucketFunction)).reduce(mergeCounters)
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2a372358/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
new file mode 100644
index 0000000..2ec7173
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.math.abs
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.rdd._
+import org.apache.spark._
+
+class DoubleRDDSuite extends FunSuite with SharedSparkContext {
+  // Verify tests on the histogram functionality. We test with both evenly
+  // and non-evenly spaced buckets as the bucket lookup function changes.
+  test("WorksOnEmpty") {
+    // Make sure that it works on an empty input
+    val rdd: RDD[Double] = sc.parallelize(Seq())
+    val buckets: Array[Double] = Array(0.0, 10.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(0)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksWithOutOfRangeWithOneBucket") {
+    // Verify that if all of the elements are out of range the counts are zero
+    val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01))
+    val buckets: Array[Double] = Array(0.0, 10.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(0)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksInRangeWithOneBucket") {
+    // Verify the basic case of one bucket and all elements in that bucket works
+    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+    val buckets: Array[Double] = Array(0.0, 10.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(4)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksInRangeWithOneBucketExactMatch") {
+    // Verify the basic case of one bucket and all elements in that bucket works
+    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+    val buckets: Array[Double] = Array(1.0, 4.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(4)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksWithOutOfRangeWithTwoBuckets") {
+    // Verify that out of range works with two buckets
+    val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01))
+    val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(0,0)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksWithOutOfRangeWithTwoUnEvenBuckets") {
+    // Verify that out of range works with two un even buckets
+    val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01))
+    val buckets: Array[Double] = Array(0.0, 4.0, 10.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(0,0)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksInRangeWithTwoBuckets") {
+    // Make sure that it works with two equally spaced buckets and elements in each
+    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6))
+    val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(3,2)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksInRangeWithTwoBucketsAndNaN") {
+    // Make sure that it works with two equally spaced buckets and elements in each
+    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6,Double.NaN))
+    val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(3,2)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksInRangeWithTwoUnevenBuckets") {
+    // Make sure that it works with two unequally spaced buckets and elements in each
+    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6))
+    val buckets: Array[Double] = Array(0.0, 5.0, 11.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(3,2)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksMixedRangeWithTwoUnevenBuckets") {
+    // Make sure that it works with two unequally spaced buckets and elements in each
+    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.0,11.01))
+    val buckets: Array[Double] = Array(0.0, 5.0, 11.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(4,3)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksMixedRangeWithFourUnevenBuckets") {
+    // Make sure that it works with two unequally spaced buckets and elements in each
+    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1))
+    val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(4,2,1,3)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksMixedRangeWithUnevenBucketsAndNaN") {
+    // Make sure that it works with two unequally spaced buckets and elements in each
+    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,Double.NaN))
+    val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(4,2,1,3)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  // Make sure this works with a NaN end bucket
+  test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRange") {
+    // Make sure that it works with two unequally spaced buckets and elements in each
+    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,Double.NaN))
+    val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0,Double.NaN)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(4,2,1,2,3)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  // Make sure this works with a NaN end bucket and an inifity
+  test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRangeAndInfity") {
+    // Make sure that it works with two unequally spaced buckets and elements in each
+    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,1.0/0.0,-1.0/0.0,Double.NaN))
+    val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0,Double.NaN)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(4,2,1,2,4)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  test("WorksWithOutOfRangeWithInfiniteBuckets") {
+    // Verify that out of range works with two buckets
+    val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01,Double.NaN))
+    val buckets: Array[Double] = Array(-1.0/0.0 ,0.0, 1.0/0.0)
+    val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val expectedHistogramResults: Array[Long] = Array(1,1)
+    assert(histogramResults === expectedHistogramResults)
+  }
+  // Test the failure mode with an invalid bucket array
+  test("ThrowsExceptionOnInvalidBucketArray") {
+    val rdd: RDD[Double] = sc.parallelize(Seq(1.0))
+    // Empty array
+    intercept[IllegalArgumentException]{
+      val buckets: Array[Double] = Array.empty[Double]
+      val result = rdd.histogram(buckets)
+    }
+    // Single element array
+    intercept[IllegalArgumentException]
+    {
+      val buckets: Array[Double] = Array(1.0)
+      val result = rdd.histogram(buckets)
+    }
+  }
+
+  // Test automatic histogram function
+  test("WorksWithoutBucketsBasic") {
+    // Verify the basic case of one bucket and all elements in that bucket works
+    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+    val (histogramBuckets, histogramResults) = rdd.histogram(1)
+    val expectedHistogramResults: Array[Long] = Array(4)
+    val expectedHistogramBuckets: Array[Double] = Array(1.0,4.0)
+    assert(histogramResults === expectedHistogramResults)
+    assert(histogramBuckets === expectedHistogramBuckets)
+  }
+  test("WorksWithoutBucketsBasicTwo") {
+    // Verify the basic case of one bucket and all elements in that bucket works
+    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+    val (histogramBuckets, histogramResults) = rdd.histogram(2)
+    val expectedHistogramResults: Array[Long] = Array(2,2)
+    val expectedHistogramBuckets: Array[Double] = Array(1.0,2.5,4.0)
+    assert(histogramResults === expectedHistogramResults)
+    assert(histogramBuckets === expectedHistogramBuckets)
+  }
+  test("WorksWithoutBucketsWithMoreRequestedThanElements") {
+    // Verify the basic case of one bucket and all elements in that bucket works
+    val rdd: RDD[Double] = sc.parallelize(Seq(1,2))
+    val (histogramBuckets, histogramResults) = rdd.histogram(10)
+    val expectedHistogramResults: Array[Long] =
+      Array(1, 0, 0, 0, 0, 0, 0, 0, 0, 1)
+    val expectedHistogramBuckets: Array[Double] =
+      Array(1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0)
+    assert(histogramResults === expectedHistogramResults)
+    assert(histogramBuckets === expectedHistogramBuckets)
+  }
+  // Test the failure mode with an invalid RDD
+  test("ThrowsExceptionOnInvalidRDDs") {
+    // infinity
+    intercept[UnsupportedOperationException]{
+      val rdd: RDD[Double] = sc.parallelize(Seq(1,1.0/0.0))
+      val result = rdd.histogram(1)
+    }
+    // NaN
+    intercept[UnsupportedOperationException]
+    {
+      val rdd: RDD[Double] = sc.parallelize(Seq(1,Double.NaN))
+      val result = rdd.histogram(1)
+    }
+    // Empty
+    intercept[UnsupportedOperationException]
+    {
+      val rdd: RDD[Double] = sc.parallelize(Seq())
+      val result = rdd.histogram(1)
+    }
+    // Single element
+    intercept[UnsupportedOperationException]
+    {
+      val rdd: RDD[Double] = sc.parallelize(Seq(1))
+      val result = rdd.histogram(1)
+    }
+    // No Range
+    intercept[UnsupportedOperationException]
+    {
+      val rdd: RDD[Double] = sc.parallelize(Seq(1,1,1))
+      val result = rdd.histogram(1)
+    }
+  }
+
+}


[4/9] git commit: Remove extranious type definitions from inside of tests

Posted by ma...@apache.org.
Remove extranious type definitions from inside of tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/092b87e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/092b87e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/092b87e7

Branch: refs/heads/master
Commit: 092b87e7c8f723a0c4ecf1dfb5379cad4c39d37f
Parents: 699f7d2
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Mon Oct 21 00:20:15 2013 -0700
Committer: Holden Karau <ho...@pigscanfly.ca>
Committed: Mon Oct 21 00:20:15 2013 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/DoubleRDDSuite.scala   | 172 +++++++++----------
 1 file changed, 86 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/092b87e7/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index 0710844..0d8ac19 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -32,154 +32,154 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
   test("WorksOnEmpty") {
     // Make sure that it works on an empty input
     val rdd: RDD[Double] = sc.parallelize(Seq())
-    val buckets: Array[Double] = Array(0.0, 10.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
-    val expectedHistogramResults: Array[Long] = Array(0)
+    val buckets = Array(0.0, 10.0)
+    val histogramResults = rdd.histogram(buckets)
+    val histogramResults2 = rdd.histogram(buckets, true)
+    val expectedHistogramResults = Array(0)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksWithOutOfRangeWithOneBucket") {
     // Verify that if all of the elements are out of range the counts are zero
-    val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01))
-    val buckets: Array[Double] = Array(0.0, 10.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
-    val expectedHistogramResults: Array[Long] = Array(0)
+    val rdd = sc.parallelize(Seq(10.01, -0.01))
+    val buckets = Array(0.0, 10.0)
+    val histogramResults = rdd.histogram(buckets)
+    val histogramResults2 = rdd.histogram(buckets, true)
+    val expectedHistogramResults = Array(0)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksInRangeWithOneBucket") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
-    val buckets: Array[Double] = Array(0.0, 10.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
-    val expectedHistogramResults: Array[Long] = Array(4)
+    val rdd = sc.parallelize(Seq(1, 2, 3, 4))
+    val buckets = Array(0.0, 10.0)
+    val histogramResults = rdd.histogram(buckets)
+    val histogramResults2 = rdd.histogram(buckets, true)
+    val expectedHistogramResults = Array(4)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksInRangeWithOneBucketExactMatch") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
-    val buckets: Array[Double] = Array(1.0, 4.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
-    val expectedHistogramResults: Array[Long] = Array(4)
+    val rdd = sc.parallelize(Seq(1, 2, 3, 4))
+    val buckets = Array(1.0, 4.0)
+    val histogramResults = rdd.histogram(buckets)
+    val histogramResults2 = rdd.histogram(buckets, true)
+    val expectedHistogramResults = Array(4)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksWithOutOfRangeWithTwoBuckets") {
     // Verify that out of range works with two buckets
-    val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01))
-    val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
-    val expectedHistogramResults: Array[Long] = Array(0, 0)
+    val rdd = sc.parallelize(Seq(10.01, -0.01))
+    val buckets = Array(0.0, 5.0, 10.0)
+    val histogramResults = rdd.histogram(buckets)
+    val histogramResults2 = rdd.histogram(buckets, true)
+    val expectedHistogramResults = Array(0, 0)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksWithOutOfRangeWithTwoUnEvenBuckets") {
     // Verify that out of range works with two un even buckets
-    val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01))
-    val buckets: Array[Double] = Array(0.0, 4.0, 10.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(0, 0)
+    val rdd = sc.parallelize(Seq(10.01, -0.01))
+    val buckets = Array(0.0, 4.0, 10.0)
+    val histogramResults = rdd.histogram(buckets)
+    val expectedHistogramResults = Array(0, 0)
     assert(histogramResults === expectedHistogramResults)
   }
   test("WorksInRangeWithTwoBuckets") {
     // Make sure that it works with two equally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6))
-    val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
-    val expectedHistogramResults: Array[Long] = Array(3, 2)
+    val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6))
+    val buckets = Array(0.0, 5.0, 10.0)
+    val histogramResults = rdd.histogram(buckets)
+    val histogramResults2 = rdd.histogram(buckets, true)
+    val expectedHistogramResults = Array(3, 2)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksInRangeWithTwoBucketsAndNaN") {
     // Make sure that it works with two equally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6, Double.NaN))
-    val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
-    val expectedHistogramResults: Array[Long] = Array(3, 2)
+    val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6, Double.NaN))
+    val buckets = Array(0.0, 5.0, 10.0)
+    val histogramResults = rdd.histogram(buckets)
+    val histogramResults2 = rdd.histogram(buckets, true)
+    val expectedHistogramResults = Array(3, 2)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksInRangeWithTwoUnevenBuckets") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6))
-    val buckets: Array[Double] = Array(0.0, 5.0, 11.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(3, 2)
+    val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6))
+    val buckets = Array(0.0, 5.0, 11.0)
+    val histogramResults = rdd.histogram(buckets)
+    val expectedHistogramResults = Array(3, 2)
     assert(histogramResults === expectedHistogramResults)
   }
   test("WorksMixedRangeWithTwoUnevenBuckets") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01))
-    val buckets: Array[Double] = Array(0.0, 5.0, 11.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(4, 3)
+    val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01))
+    val buckets = Array(0.0, 5.0, 11.0)
+    val histogramResults = rdd.histogram(buckets)
+    val expectedHistogramResults = Array(4, 3)
     assert(histogramResults === expectedHistogramResults)
   }
   test("WorksMixedRangeWithFourUnevenBuckets") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+    val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
       200.0, 200.1))
-    val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 3)
+    val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0)
+    val histogramResults = rdd.histogram(buckets)
+    val expectedHistogramResults = Array(4, 2, 1, 3)
     assert(histogramResults === expectedHistogramResults)
   }
   test("WorksMixedRangeWithUnevenBucketsAndNaN") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+    val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
       200.0, 200.1, Double.NaN))
-    val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 3)
+    val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0)
+    val histogramResults = rdd.histogram(buckets)
+    val expectedHistogramResults = Array(4, 2, 1, 3)
     assert(histogramResults === expectedHistogramResults)
   }
   // Make sure this works with a NaN end bucket
   test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRange") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+    val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
       200.0, 200.1, Double.NaN))
-    val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 2, 3)
+    val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN)
+    val histogramResults = rdd.histogram(buckets)
+    val expectedHistogramResults = Array(4, 2, 1, 2, 3)
     assert(histogramResults === expectedHistogramResults)
   }
   // Make sure this works with a NaN end bucket and an inifity
   test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRangeAndInfity") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+    val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
       200.0, 200.1, 1.0/0.0, -1.0/0.0, Double.NaN))
-    val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 2, 4)
+    val buckets = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN)
+    val histogramResults = rdd.histogram(buckets)
+    val expectedHistogramResults = Array(4, 2, 1, 2, 4)
     assert(histogramResults === expectedHistogramResults)
   }
   test("WorksWithOutOfRangeWithInfiniteBuckets") {
     // Verify that out of range works with two buckets
-    val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01, Double.NaN))
-    val buckets: Array[Double] = Array(-1.0/0.0 , 0.0, 1.0/0.0)
-    val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(1, 1)
+    val rdd = sc.parallelize(Seq(10.01, -0.01, Double.NaN))
+    val buckets = Array(-1.0/0.0 , 0.0, 1.0/0.0)
+    val histogramResults = rdd.histogram(buckets)
+    val expectedHistogramResults = Array(1, 1)
     assert(histogramResults === expectedHistogramResults)
   }
   // Test the failure mode with an invalid bucket array
   test("ThrowsExceptionOnInvalidBucketArray") {
-    val rdd: RDD[Double] = sc.parallelize(Seq(1.0))
+    val rdd = sc.parallelize(Seq(1.0))
     // Empty array
     intercept[IllegalArgumentException] {
-      val buckets: Array[Double] = Array.empty[Double]
+      val buckets = Array.empty[Double]
       val result = rdd.histogram(buckets)
     }
     // Single element array
     intercept[IllegalArgumentException] {
-      val buckets: Array[Double] = Array(1.0)
+      val buckets = Array(1.0)
       val result = rdd.histogram(buckets)
     }
   }
@@ -187,49 +187,49 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
   // Test automatic histogram function
   test("WorksWithoutBucketsBasic") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
+    val rdd = sc.parallelize(Seq(1, 2, 3, 4))
     val (histogramBuckets, histogramResults) = rdd.histogram(1)
-    val expectedHistogramResults: Array[Long] = Array(4)
-    val expectedHistogramBuckets: Array[Double] = Array(1.0, 4.0)
+    val expectedHistogramResults = Array(4)
+    val expectedHistogramBuckets = Array(1.0, 4.0)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramBuckets === expectedHistogramBuckets)
   }
   // Test automatic histogram function with a single element
   test("WorksWithoutBucketsBasicSingleElement") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1))
+    val rdd = sc.parallelize(Seq(1))
     val (histogramBuckets, histogramResults) = rdd.histogram(1)
-    val expectedHistogramResults: Array[Long] = Array(1)
-    val expectedHistogramBuckets: Array[Double] = Array(1.0, 1.0)
+    val expectedHistogramResults = Array(1)
+    val expectedHistogramBuckets = Array(1.0, 1.0)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramBuckets === expectedHistogramBuckets)
   }
   // Test automatic histogram function with a single element
   test("WorksWithoutBucketsBasicNoRange") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1, 1, 1, 1))
+    val rdd = sc.parallelize(Seq(1, 1, 1, 1))
     val (histogramBuckets, histogramResults) = rdd.histogram(1)
-    val expectedHistogramResults: Array[Long] = Array(4)
-    val expectedHistogramBuckets: Array[Double] = Array(1.0, 1.0)
+    val expectedHistogramResults = Array(4)
+    val expectedHistogramBuckets = Array(1.0, 1.0)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramBuckets === expectedHistogramBuckets)
   }
   test("WorksWithoutBucketsBasicTwo") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
+    val rdd = sc.parallelize(Seq(1, 2, 3, 4))
     val (histogramBuckets, histogramResults) = rdd.histogram(2)
-    val expectedHistogramResults: Array[Long] = Array(2, 2)
-    val expectedHistogramBuckets: Array[Double] = Array(1.0, 2.5, 4.0)
+    val expectedHistogramResults = Array(2, 2)
+    val expectedHistogramBuckets = Array(1.0, 2.5, 4.0)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramBuckets === expectedHistogramBuckets)
   }
   test("WorksWithoutBucketsWithMoreRequestedThanElements") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2))
+    val rdd = sc.parallelize(Seq(1, 2))
     val (histogramBuckets, histogramResults) = rdd.histogram(10)
-    val expectedHistogramResults: Array[Long] =
+    val expectedHistogramResults =
       Array(1, 0, 0, 0, 0, 0, 0, 0, 0, 1)
-    val expectedHistogramBuckets: Array[Double] =
+    val expectedHistogramBuckets =
       Array(1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramBuckets === expectedHistogramBuckets)
@@ -239,12 +239,12 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
   test("ThrowsExceptionOnInvalidRDDs") {
     // infinity
     intercept[UnsupportedOperationException] {
-      val rdd: RDD[Double] = sc.parallelize(Seq(1, 1.0/0.0))
+      val rdd = sc.parallelize(Seq(1, 1.0/0.0))
       val result = rdd.histogram(1)
     }
     // NaN
     intercept[UnsupportedOperationException] {
-      val rdd: RDD[Double] = sc.parallelize(Seq(1, Double.NaN))
+      val rdd = sc.parallelize(Seq(1, Double.NaN))
       val result = rdd.histogram(1)
     }
     // Empty


[2/9] git commit: Add tests for the Java implementation.

Posted by ma...@apache.org.
Add tests for the Java implementation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e58c69d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e58c69d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e58c69d9

Branch: refs/heads/master
Commit: e58c69d955ef8faacb794a0c1666b21c1606453e
Parents: 2a37235
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Sun Oct 20 01:17:13 2013 -0700
Committer: Holden Karau <ho...@pigscanfly.ca>
Committed: Sun Oct 20 01:17:13 2013 -0700

----------------------------------------------------------------------
 .../src/test/scala/org/apache/spark/JavaAPISuite.java | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e58c69d9/core/src/test/scala/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 591c1d4..8a9c6e6 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -365,6 +365,20 @@ public class JavaAPISuite implements Serializable {
   }
 
   @Test
+  public void javaDoubleRDDHistoGram() {
+   JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
+   // Test using generated buckets
+   Tuple2<Double[], long[]> results = rdd.histogram(2);
+   Double[] expected_buckets = {1.0, 2.5, 4.0};
+   long[] expected_counts = {2, 2};
+   Assert.assertArrayEquals(expected_buckets, results._1);
+   Assert.assertArrayEquals(expected_counts, results._2);
+   // Test with provided buckets
+   long[] histogram = rdd.histogram(expected_buckets);
+   Assert.assertArrayEquals(expected_counts, histogram);
+  }
+
+  @Test
   public void map() {
     JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
     JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {


[8/9] git commit: Fix the test

Posted by ma...@apache.org.
Fix the test


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7222ee29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7222ee29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7222ee29

Branch: refs/heads/master
Commit: 7222ee29779c3c5146aa5a3d6d060f3b039c1ff7
Parents: e163e31
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Mon Nov 25 21:06:42 2013 -0800
Committer: Holden Karau <ho...@pigscanfly.ca>
Committed: Mon Nov 25 21:06:42 2013 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala   | 4 ++--
 core/src/test/scala/org/apache/spark/JavaAPISuite.java         | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7222ee29/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index 70f7f01..dad5c72 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -191,8 +191,8 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
    * the maximum value of the last position and all NaN entries will be counted
    * in that bucket.
    */
-  def histogram(buckets: Array[Double]): Array[Long] = {
-    srdd.histogram(buckets.map(_.toDouble), false)
+  def histogram(buckets: Array[scala.Double]): Array[Long] = {
+    srdd.histogram(buckets, false)
   }
 
   def histogram(buckets: Array[Double], evenBuckets: Boolean): Array[Long] = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7222ee29/core/src/test/scala/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/JavaAPISuite.java b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
index 8a9c6e6..44483fd 100644
--- a/core/src/test/scala/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/scala/org/apache/spark/JavaAPISuite.java
@@ -368,10 +368,10 @@ public class JavaAPISuite implements Serializable {
   public void javaDoubleRDDHistoGram() {
    JavaDoubleRDD rdd = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0));
    // Test using generated buckets
-   Tuple2<Double[], long[]> results = rdd.histogram(2);
-   Double[] expected_buckets = {1.0, 2.5, 4.0};
+   Tuple2<double[], long[]> results = rdd.histogram(2);
+   double[] expected_buckets = {1.0, 2.5, 4.0};
    long[] expected_counts = {2, 2};
-   Assert.assertArrayEquals(expected_buckets, results._1);
+   Assert.assertArrayEquals(expected_buckets, results._1, 0.1);
    Assert.assertArrayEquals(expected_counts, results._2);
    // Test with provided buckets
    long[] histogram = rdd.histogram(expected_buckets);


[5/9] git commit: Remove extranious type declerations

Posted by ma...@apache.org.
Remove extranious type declerations


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/20b33bc4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/20b33bc4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/20b33bc4

Branch: refs/heads/master
Commit: 20b33bc4b5de1addd943c7a1e6d5d2366d9cd445
Parents: 092b87e
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Mon Oct 21 00:21:37 2013 -0700
Committer: Holden Karau <ho...@pigscanfly.ca>
Committed: Mon Oct 21 00:21:37 2013 -0700

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/20b33bc4/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 33738ee..02d75ec 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -99,13 +99,13 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
       throw new UnsupportedOperationException(
         "Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
     }
-    val increment: Double = (max-min)/bucketCount.toDouble
+    val increment = (max-min)/bucketCount.toDouble
     val range = if (increment != 0) {
       Range.Double.inclusive(min, max, increment)
     } else {
       List(min, min)
     }
-    val buckets: Array[Double] = range.toArray
+    val buckets = range.toArray
     (buckets, histogram(buckets, true))
   }
 


[7/9] git commit: Add spaces

Posted by ma...@apache.org.
Add spaces


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e163e31c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e163e31c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e163e31c

Branch: refs/heads/master
Commit: e163e31c2003558d304ba5ac7b67361956037041
Parents: 7de180f
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Mon Nov 18 20:13:25 2013 -0800
Committer: Holden Karau <ho...@pigscanfly.ca>
Committed: Mon Nov 18 20:13:25 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/DoubleRDDSuite.scala   | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e163e31c/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index 0d8ac19..7f50a5a 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -39,6 +39,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
+
   test("WorksWithOutOfRangeWithOneBucket") {
     // Verify that if all of the elements are out of range the counts are zero
     val rdd = sc.parallelize(Seq(10.01, -0.01))
@@ -49,6 +50,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
+
   test("WorksInRangeWithOneBucket") {
     // Verify the basic case of one bucket and all elements in that bucket works
     val rdd = sc.parallelize(Seq(1, 2, 3, 4))
@@ -59,6 +61,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
+
   test("WorksInRangeWithOneBucketExactMatch") {
     // Verify the basic case of one bucket and all elements in that bucket works
     val rdd = sc.parallelize(Seq(1, 2, 3, 4))
@@ -69,6 +72,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
+
   test("WorksWithOutOfRangeWithTwoBuckets") {
     // Verify that out of range works with two buckets
     val rdd = sc.parallelize(Seq(10.01, -0.01))
@@ -79,6 +83,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
+
   test("WorksWithOutOfRangeWithTwoUnEvenBuckets") {
     // Verify that out of range works with two un even buckets
     val rdd = sc.parallelize(Seq(10.01, -0.01))
@@ -87,6 +92,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     val expectedHistogramResults = Array(0, 0)
     assert(histogramResults === expectedHistogramResults)
   }
+
   test("WorksInRangeWithTwoBuckets") {
     // Make sure that it works with two equally spaced buckets and elements in each
     val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6))
@@ -97,6 +103,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
+
   test("WorksInRangeWithTwoBucketsAndNaN") {
     // Make sure that it works with two equally spaced buckets and elements in each
     val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6, Double.NaN))
@@ -107,6 +114,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     assert(histogramResults === expectedHistogramResults)
     assert(histogramResults2 === expectedHistogramResults)
   }
+
   test("WorksInRangeWithTwoUnevenBuckets") {
     // Make sure that it works with two unequally spaced buckets and elements in each
     val rdd = sc.parallelize(Seq(1, 2, 3, 5, 6))
@@ -115,6 +123,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     val expectedHistogramResults = Array(3, 2)
     assert(histogramResults === expectedHistogramResults)
   }
+
   test("WorksMixedRangeWithTwoUnevenBuckets") {
     // Make sure that it works with two unequally spaced buckets and elements in each
     val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01))
@@ -123,6 +132,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     val expectedHistogramResults = Array(4, 3)
     assert(histogramResults === expectedHistogramResults)
   }
+
   test("WorksMixedRangeWithFourUnevenBuckets") {
     // Make sure that it works with two unequally spaced buckets and elements in each
     val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
@@ -132,6 +142,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     val expectedHistogramResults = Array(4, 2, 1, 3)
     assert(histogramResults === expectedHistogramResults)
   }
+
   test("WorksMixedRangeWithUnevenBucketsAndNaN") {
     // Make sure that it works with two unequally spaced buckets and elements in each
     val rdd = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
@@ -161,6 +172,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     val expectedHistogramResults = Array(4, 2, 1, 2, 4)
     assert(histogramResults === expectedHistogramResults)
   }
+
   test("WorksWithOutOfRangeWithInfiniteBuckets") {
     // Verify that out of range works with two buckets
     val rdd = sc.parallelize(Seq(10.01, -0.01, Double.NaN))
@@ -214,6 +226,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     assert(histogramResults === expectedHistogramResults)
     assert(histogramBuckets === expectedHistogramBuckets)
   }
+
   test("WorksWithoutBucketsBasicTwo") {
     // Verify the basic case of one bucket and all elements in that bucket works
     val rdd = sc.parallelize(Seq(1, 2, 3, 4))
@@ -223,6 +236,7 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     assert(histogramResults === expectedHistogramResults)
     assert(histogramBuckets === expectedHistogramBuckets)
   }
+
   test("WorksWithoutBucketsWithMoreRequestedThanElements") {
     // Verify the basic case of one bucket and all elements in that bucket works
     val rdd = sc.parallelize(Seq(1, 2))


[3/9] git commit: CR feedback

Posted by ma...@apache.org.
CR feedback


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/699f7d28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/699f7d28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/699f7d28

Branch: refs/heads/master
Commit: 699f7d28c0347cb516fa17f94b53d7bc50f18346
Parents: e58c69d
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Mon Oct 21 00:10:03 2013 -0700
Committer: Holden Karau <ho...@pigscanfly.ca>
Committed: Mon Oct 21 00:10:03 2013 -0700

----------------------------------------------------------------------
 .../apache/spark/api/java/JavaDoubleRDD.scala   |  18 ++-
 .../apache/spark/rdd/DoubleRDDFunctions.scala   |  68 ++++-----
 .../org/apache/spark/rdd/DoubleRDDSuite.scala   | 140 +++++++++++--------
 3 files changed, 125 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/699f7d28/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index d2a2818..b002468 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -167,12 +167,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
    *  value is 0 and the max is 100 and there are two buckets the resulting
    *  buckets will be [0,50) [50,100]. bucketCount must be at least 1
    * If the RDD contains infinity, NaN throws an exception
-   * If the elements in RDD do not vary (max == min) throws an exception
+   * If the elements in RDD do not vary (max == min) always returns a single bucket.
    */
   def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
     val result = srdd.histogram(bucketCount)
     (result._1.map(scala.Double.box(_)), result._2)
   }
+
   /**
    * Compute a histogram using the provided buckets. The buckets are all open
    * to the left except for the last which is closed
@@ -181,14 +182,21 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
    *  e.g 1<=x<10 , 10<=x<20, 20<=x<50
    *  And on the input of 1 and 50 we would have a histogram of 1,0,0 
    * 
-   * Note: if your histogram is evenly spaced (e.g. [0,10,20,30]) this switches
-   * from an O(log n) inseration to O(1) per element. (where n = # buckets)
+   * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched
+   * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets
+   * to true.
    * buckets must be sorted and not contain any duplicates.
    * buckets array must be at least two elements 
-   * All NaN entries are treated the same.
+   * All NaN entries are treated the same. If you have a NaN bucket it must be
+   * the maximum value of the last position and all NaN entries will be counted
+   * in that bucket.
    */
   def histogram(buckets: Array[Double]): Array[Long] = {
-    srdd.histogram(buckets.map(_.toDouble))
+    srdd.histogram(buckets.map(_.toDouble), false)
+  }
+
+  def histogram(buckets: Array[Double], evenBuckets: Boolean): Array[Long] = {
+    srdd.histogram(buckets.map(_.toDouble), evenBuckets)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/699f7d28/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
index 776a83c..33738ee 100644
--- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -83,44 +83,50 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
    * Compute a histogram of the data using bucketCount number of buckets evenly
    *  spaced between the minimum and maximum of the RDD. For example if the min
    *  value is 0 and the max is 100 and there are two buckets the resulting
-   *  buckets will be [0,50) [50,100]. bucketCount must be at least 1
+   *  buckets will be [0, 50) [50, 100]. bucketCount must be at least 1
    * If the RDD contains infinity, NaN throws an exception
-   * If the elements in RDD do not vary (max == min) throws an exception
+   * If the elements in RDD do not vary (max == min) always returns a single bucket.
    */
   def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
     // Compute the minimum and the maxium
     val (max: Double, min: Double) = self.mapPartitions { items =>
       Iterator(items.foldRight(-1/0.0, Double.NaN)((e: Double, x: Pair[Double, Double]) =>
-        (x._1.max(e),x._2.min(e))))
+        (x._1.max(e), x._2.min(e))))
     }.reduce { (maxmin1, maxmin2) =>
       (maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2))
     }
     if (max.isNaN() || max.isInfinity || min.isInfinity ) {
-      throw new UnsupportedOperationException("Histogram on either an empty RDD or RDD containing +-infinity or NaN")
-    }
-    if (max == min) {
-      throw new UnsupportedOperationException("Histogram with no range in elements")
+      throw new UnsupportedOperationException(
+        "Histogram on either an empty RDD or RDD containing +/-infinity or NaN")
     }
     val increment: Double = (max-min)/bucketCount.toDouble
-    val range = Range.Double.inclusive(min, max, increment)
+    val range = if (increment != 0) {
+      Range.Double.inclusive(min, max, increment)
+    } else {
+      List(min, min)
+    }
     val buckets: Array[Double] = range.toArray
-    (buckets,histogram(buckets))
+    (buckets, histogram(buckets, true))
   }
+
   /**
    * Compute a histogram using the provided buckets. The buckets are all open
    * to the left except for the last which is closed
    *  e.g. for the array
-   *  [1,10,20,50] the buckets are [1,10) [10,20) [20,50]
+   *  [1, 10, 20, 50] the buckets are [1, 10) [10, 20) [20, 50]
    *  e.g 1<=x<10 , 10<=x<20, 20<=x<50
-   *  And on the input of 1 and 50 we would have a histogram of 1,0,0 
+   *  And on the input of 1 and 50 we would have a histogram of 1, 0, 0 
    * 
-   * Note: if your histogram is evenly spaced (e.g. [0,10,20,30]) this switches
-   * from an O(log n) inseration to O(1) per element. (where n = # buckets)
+   * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched
+   * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets
+   * to true.
    * buckets must be sorted and not contain any duplicates.
    * buckets array must be at least two elements 
-   * All NaN entries are treated the same.
+   * All NaN entries are treated the same. If you have a NaN bucket it must be
+   * the maximum value of the last position and all NaN entries will be counted
+   * in that bucket.
    */
-  def histogram(buckets: Array[Double]): Array[Long] = {
+  def histogram(buckets: Array[Double], evenBuckets: Boolean = false): Array[Long] = {
     if (buckets.length < 2) {
       throw new IllegalArgumentException("buckets array must have at least two elements")
     }
@@ -129,11 +135,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
     // to increment or returns None if there is no bucket. This is done so we can
     // specialize for uniformly distributed buckets and save the O(log n) binary
     // search cost.
-    def histogramPartition(bucketFunction: (Double) => Option[Int])(iter: Iterator[Double]): Iterator[Array[Long]] = {
-      val counters = new Array[Long](buckets.length-1)
+    def histogramPartition(bucketFunction: (Double) => Option[Int])(iter: Iterator[Double]):
+        Iterator[Array[Long]] = {
+      val counters = new Array[Long](buckets.length - 1)
       while (iter.hasNext) {
         bucketFunction(iter.next()) match {
-          case Some(x: Int) => {counters(x)+=1}
+          case Some(x: Int) => {counters(x) += 1}
           case _ => {}
         }
       }
@@ -161,12 +168,12 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
         } else {
           None
         }
-      } else if (location < buckets.length-1) {
+      } else if (location < buckets.length - 1) {
         // Exact match, just insert here
         Some(location)
       } else {
         // Exact match to the last element
-        Some(location-1)
+        Some(location - 1)
       }
     }
     // Determine the bucket function in constant time. Requires that buckets are evenly spaced
@@ -175,34 +182,19 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
       if (e.isNaN()) {
         return None
       }
-      val bucketNumber = (e-min)/(increment)
+      val bucketNumber = (e - min)/(increment)
       // We do this rather than buckets.lengthCompare(bucketNumber)
       // because Array[Double] fails to override it (for now).
       if (bucketNumber > count || bucketNumber < 0) {
         None
       } else {
-        Some(bucketNumber.toInt.min(count-1))
-      }
-    }
-    def evenlySpaced(buckets: Array[Double]): Boolean = {
-      val delta = buckets(1)-buckets(0)
-      // Technically you could have an evenly spaced bucket with NaN
-      // increments but then its a single bucket and this makes the
-      // fastBucketFunction simpler.
-      if (delta.isNaN() || delta.isInfinite()) {
-        return false
-      }
-      for (i <- 1 to buckets.length-1) {
-        if (buckets(i)-buckets(i-1) != delta) {
-          return false
-        }
+        Some(bucketNumber.toInt.min(count - 1))
       }
-      true
     }
     // Decide which bucket function to pass to histogramPartition. We decide here
     // rather than having a general function so that the decission need only be made
     // once rather than once per shard
-    val bucketFunction = if (evenlySpaced(buckets)) {
+    val bucketFunction = if (evenBuckets) {
       fastBucketFunction(buckets(0), buckets(1)-buckets(0), buckets.length-1) _
     } else {
       basicBucketFunction _

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/699f7d28/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
index 2ec7173..0710844 100644
--- a/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/DoubleRDDSuite.scala
@@ -34,134 +34,151 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     val rdd: RDD[Double] = sc.parallelize(Seq())
     val buckets: Array[Double] = Array(0.0, 10.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
     val expectedHistogramResults: Array[Long] = Array(0)
     assert(histogramResults === expectedHistogramResults)
+    assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksWithOutOfRangeWithOneBucket") {
     // Verify that if all of the elements are out of range the counts are zero
-    val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01))
+    val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01))
     val buckets: Array[Double] = Array(0.0, 10.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
     val expectedHistogramResults: Array[Long] = Array(0)
     assert(histogramResults === expectedHistogramResults)
+    assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksInRangeWithOneBucket") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
     val buckets: Array[Double] = Array(0.0, 10.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
     val expectedHistogramResults: Array[Long] = Array(4)
     assert(histogramResults === expectedHistogramResults)
+    assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksInRangeWithOneBucketExactMatch") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
     val buckets: Array[Double] = Array(1.0, 4.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
+    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
     val expectedHistogramResults: Array[Long] = Array(4)
     assert(histogramResults === expectedHistogramResults)
+    assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksWithOutOfRangeWithTwoBuckets") {
     // Verify that out of range works with two buckets
-    val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01))
+    val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01))
     val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(0,0)
+    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
+    val expectedHistogramResults: Array[Long] = Array(0, 0)
     assert(histogramResults === expectedHistogramResults)
+    assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksWithOutOfRangeWithTwoUnEvenBuckets") {
     // Verify that out of range works with two un even buckets
-    val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01))
+    val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01))
     val buckets: Array[Double] = Array(0.0, 4.0, 10.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(0,0)
+    val expectedHistogramResults: Array[Long] = Array(0, 0)
     assert(histogramResults === expectedHistogramResults)
   }
   test("WorksInRangeWithTwoBuckets") {
     // Make sure that it works with two equally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6))
+    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6))
     val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(3,2)
+    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
+    val expectedHistogramResults: Array[Long] = Array(3, 2)
     assert(histogramResults === expectedHistogramResults)
+    assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksInRangeWithTwoBucketsAndNaN") {
     // Make sure that it works with two equally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6,Double.NaN))
+    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6, Double.NaN))
     val buckets: Array[Double] = Array(0.0, 5.0, 10.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(3,2)
+    val histogramResults2: Array[Long] = rdd.histogram(buckets, true)
+    val expectedHistogramResults: Array[Long] = Array(3, 2)
     assert(histogramResults === expectedHistogramResults)
+    assert(histogramResults2 === expectedHistogramResults)
   }
   test("WorksInRangeWithTwoUnevenBuckets") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,5,6))
+    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 5, 6))
     val buckets: Array[Double] = Array(0.0, 5.0, 11.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(3,2)
+    val expectedHistogramResults: Array[Long] = Array(3, 2)
     assert(histogramResults === expectedHistogramResults)
   }
   test("WorksMixedRangeWithTwoUnevenBuckets") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.0,11.01))
+    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.0, 11.01))
     val buckets: Array[Double] = Array(0.0, 5.0, 11.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(4,3)
+    val expectedHistogramResults: Array[Long] = Array(4, 3)
     assert(histogramResults === expectedHistogramResults)
   }
   test("WorksMixedRangeWithFourUnevenBuckets") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1))
-    val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0)
+    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+      200.0, 200.1))
+    val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(4,2,1,3)
+    val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 3)
     assert(histogramResults === expectedHistogramResults)
   }
   test("WorksMixedRangeWithUnevenBucketsAndNaN") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,Double.NaN))
-    val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0)
+    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+      200.0, 200.1, Double.NaN))
+    val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(4,2,1,3)
+    val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 3)
     assert(histogramResults === expectedHistogramResults)
   }
   // Make sure this works with a NaN end bucket
   test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRange") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,Double.NaN))
-    val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0,Double.NaN)
+    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+      200.0, 200.1, Double.NaN))
+    val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(4,2,1,2,3)
+    val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 2, 3)
     assert(histogramResults === expectedHistogramResults)
   }
   // Make sure this works with a NaN end bucket and an inifity
   test("WorksMixedRangeWithUnevenBucketsAndNaNAndNaNRangeAndInfity") {
     // Make sure that it works with two unequally spaced buckets and elements in each
-    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01,0.0,1,2,3,5,6,11.01,12.0,199.0,200.0,200.1,1.0/0.0,-1.0/0.0,Double.NaN))
-    val buckets: Array[Double] = Array(0.0, 5.0, 11.0,12.0,200.0,Double.NaN)
+    val rdd: RDD[Double] = sc.parallelize(Seq(-0.01, 0.0, 1, 2, 3, 5, 6, 11.01, 12.0, 199.0,
+      200.0, 200.1, 1.0/0.0, -1.0/0.0, Double.NaN))
+    val buckets: Array[Double] = Array(0.0, 5.0, 11.0, 12.0, 200.0, Double.NaN)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(4,2,1,2,4)
+    val expectedHistogramResults: Array[Long] = Array(4, 2, 1, 2, 4)
     assert(histogramResults === expectedHistogramResults)
   }
   test("WorksWithOutOfRangeWithInfiniteBuckets") {
     // Verify that out of range works with two buckets
-    val rdd: RDD[Double] = sc.parallelize(Seq(10.01,-0.01,Double.NaN))
-    val buckets: Array[Double] = Array(-1.0/0.0 ,0.0, 1.0/0.0)
+    val rdd: RDD[Double] = sc.parallelize(Seq(10.01, -0.01, Double.NaN))
+    val buckets: Array[Double] = Array(-1.0/0.0 , 0.0, 1.0/0.0)
     val histogramResults: Array[Long] = rdd.histogram(buckets)
-    val expectedHistogramResults: Array[Long] = Array(1,1)
+    val expectedHistogramResults: Array[Long] = Array(1, 1)
     assert(histogramResults === expectedHistogramResults)
   }
   // Test the failure mode with an invalid bucket array
   test("ThrowsExceptionOnInvalidBucketArray") {
     val rdd: RDD[Double] = sc.parallelize(Seq(1.0))
     // Empty array
-    intercept[IllegalArgumentException]{
+    intercept[IllegalArgumentException] {
       val buckets: Array[Double] = Array.empty[Double]
       val result = rdd.histogram(buckets)
     }
     // Single element array
-    intercept[IllegalArgumentException]
-    {
+    intercept[IllegalArgumentException] {
       val buckets: Array[Double] = Array(1.0)
       val result = rdd.histogram(buckets)
     }
@@ -170,25 +187,45 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
   // Test automatic histogram function
   test("WorksWithoutBucketsBasic") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
     val (histogramBuckets, histogramResults) = rdd.histogram(1)
     val expectedHistogramResults: Array[Long] = Array(4)
-    val expectedHistogramBuckets: Array[Double] = Array(1.0,4.0)
+    val expectedHistogramBuckets: Array[Double] = Array(1.0, 4.0)
+    assert(histogramResults === expectedHistogramResults)
+    assert(histogramBuckets === expectedHistogramBuckets)
+  }
+  // Test automatic histogram function with a single element
+  test("WorksWithoutBucketsBasicSingleElement") {
+    // Verify the basic case of one bucket and all elements in that bucket works
+    val rdd: RDD[Double] = sc.parallelize(Seq(1))
+    val (histogramBuckets, histogramResults) = rdd.histogram(1)
+    val expectedHistogramResults: Array[Long] = Array(1)
+    val expectedHistogramBuckets: Array[Double] = Array(1.0, 1.0)
+    assert(histogramResults === expectedHistogramResults)
+    assert(histogramBuckets === expectedHistogramBuckets)
+  }
+  // Test automatic histogram function with a single element
+  test("WorksWithoutBucketsBasicNoRange") {
+    // Verify the basic case of one bucket and all elements in that bucket works
+    val rdd: RDD[Double] = sc.parallelize(Seq(1, 1, 1, 1))
+    val (histogramBuckets, histogramResults) = rdd.histogram(1)
+    val expectedHistogramResults: Array[Long] = Array(4)
+    val expectedHistogramBuckets: Array[Double] = Array(1.0, 1.0)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramBuckets === expectedHistogramBuckets)
   }
   test("WorksWithoutBucketsBasicTwo") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1,2,3,4))
+    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2, 3, 4))
     val (histogramBuckets, histogramResults) = rdd.histogram(2)
-    val expectedHistogramResults: Array[Long] = Array(2,2)
-    val expectedHistogramBuckets: Array[Double] = Array(1.0,2.5,4.0)
+    val expectedHistogramResults: Array[Long] = Array(2, 2)
+    val expectedHistogramBuckets: Array[Double] = Array(1.0, 2.5, 4.0)
     assert(histogramResults === expectedHistogramResults)
     assert(histogramBuckets === expectedHistogramBuckets)
   }
   test("WorksWithoutBucketsWithMoreRequestedThanElements") {
     // Verify the basic case of one bucket and all elements in that bucket works
-    val rdd: RDD[Double] = sc.parallelize(Seq(1,2))
+    val rdd: RDD[Double] = sc.parallelize(Seq(1, 2))
     val (histogramBuckets, histogramResults) = rdd.histogram(10)
     val expectedHistogramResults: Array[Long] =
       Array(1, 0, 0, 0, 0, 0, 0, 0, 0, 1)
@@ -197,37 +234,24 @@ class DoubleRDDSuite extends FunSuite with SharedSparkContext {
     assert(histogramResults === expectedHistogramResults)
     assert(histogramBuckets === expectedHistogramBuckets)
   }
+
   // Test the failure mode with an invalid RDD
   test("ThrowsExceptionOnInvalidRDDs") {
     // infinity
-    intercept[UnsupportedOperationException]{
-      val rdd: RDD[Double] = sc.parallelize(Seq(1,1.0/0.0))
+    intercept[UnsupportedOperationException] {
+      val rdd: RDD[Double] = sc.parallelize(Seq(1, 1.0/0.0))
       val result = rdd.histogram(1)
     }
     // NaN
-    intercept[UnsupportedOperationException]
-    {
-      val rdd: RDD[Double] = sc.parallelize(Seq(1,Double.NaN))
+    intercept[UnsupportedOperationException] {
+      val rdd: RDD[Double] = sc.parallelize(Seq(1, Double.NaN))
       val result = rdd.histogram(1)
     }
     // Empty
-    intercept[UnsupportedOperationException]
-    {
+    intercept[UnsupportedOperationException] {
       val rdd: RDD[Double] = sc.parallelize(Seq())
       val result = rdd.histogram(1)
     }
-    // Single element
-    intercept[UnsupportedOperationException]
-    {
-      val rdd: RDD[Double] = sc.parallelize(Seq(1))
-      val result = rdd.histogram(1)
-    }
-    // No Range
-    intercept[UnsupportedOperationException]
-    {
-      val rdd: RDD[Double] = sc.parallelize(Seq(1,1,1))
-      val result = rdd.histogram(1)
-    }
   }
 
 }


[9/9] git commit: Merge pull request #86 from holdenk/master

Posted by ma...@apache.org.
Merge pull request #86 from holdenk/master

Add histogram functionality to DoubleRDDFunctions

This pull request add histogram functionality to the DoubleRDDFunctions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/18d6df0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/18d6df0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/18d6df0e

Branch: refs/heads/master
Commit: 18d6df0e171454ada4d260bfe8b909eedf25304f
Parents: 0e2109d 7222ee2
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Tue Nov 26 00:00:07 2013 -0800
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Tue Nov 26 00:00:07 2013 -0800

----------------------------------------------------------------------
 .../apache/spark/api/java/JavaDoubleRDD.scala   |  40 +++
 .../apache/spark/rdd/DoubleRDDFunctions.scala   | 126 +++++++++
 .../scala/org/apache/spark/JavaAPISuite.java    |  14 +
 .../org/apache/spark/rdd/DoubleRDDSuite.scala   | 271 +++++++++++++++++++
 4 files changed, 451 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18d6df0e/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/18d6df0e/core/src/test/scala/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------


[6/9] git commit: Remove explicit boxing

Posted by ma...@apache.org.
Remove explicit boxing


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7de180fd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7de180fd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7de180fd

Branch: refs/heads/master
Commit: 7de180fd13fda2e5d4486dfca9e2a9997ec7f4d0
Parents: 20b33bc
Author: Holden Karau <ho...@pigscanfly.ca>
Authored: Mon Nov 18 20:05:05 2013 -0800
Committer: Holden Karau <ho...@pigscanfly.ca>
Committed: Mon Nov 18 20:05:05 2013 -0800

----------------------------------------------------------------------
 .../src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7de180fd/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index b002468..70f7f01 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -169,9 +169,9 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
    * If the RDD contains infinity, NaN throws an exception
    * If the elements in RDD do not vary (max == min) always returns a single bucket.
    */
-  def histogram(bucketCount: Int): Pair[Array[Double], Array[Long]] = {
+  def histogram(bucketCount: Int): Pair[Array[scala.Double], Array[Long]] = {
     val result = srdd.histogram(bucketCount)
-    (result._1.map(scala.Double.box(_)), result._2)
+    (result._1, result._2)
   }
 
   /**