You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/09/16 02:01:45 UTC
spark git commit: [SPARK-10575] [SPARK CORE] Wrapped RDD.takeSample
with Scope
Repository: spark
Updated Branches:
refs/heads/master a63cdc769 -> 99ecfa594
[SPARK-10575] [SPARK CORE] Wrapped RDD.takeSample with Scope
Remove return statements in RDD.takeSample and wrap it withScope
Author: vinodkc <vi...@gmail.com>
Author: vinodkc <vi...@users.noreply.github.com>
Author: Vinod K C <vi...@huawei.com>
Closes #8730 from vinodkc/fix_takesample_return.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/99ecfa59
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/99ecfa59
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/99ecfa59
Branch: refs/heads/master
Commit: 99ecfa5945aedaa71765ecf5cce59964ae52eebe
Parents: a63cdc7
Author: vinodkc <vi...@gmail.com>
Authored: Tue Sep 15 17:01:10 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Sep 15 17:01:39 2015 -0700
----------------------------------------------------------------------
.../main/scala/org/apache/spark/rdd/RDD.scala | 68 +++++++++-----------
1 file changed, 31 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/99ecfa59/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 7dd2bc5..a56e542 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -469,50 +469,44 @@ abstract class RDD[T: ClassTag](
* @param seed seed for the random number generator
* @return sample of specified size in an array
*/
- // TODO: rewrite this without return statements so we can wrap it in a scope
def takeSample(
withReplacement: Boolean,
num: Int,
- seed: Long = Utils.random.nextLong): Array[T] = {
+ seed: Long = Utils.random.nextLong): Array[T] = withScope {
val numStDev = 10.0
- if (num < 0) {
- throw new IllegalArgumentException("Negative number of elements requested")
- } else if (num == 0) {
- return new Array[T](0)
- }
-
- val initialCount = this.count()
- if (initialCount == 0) {
- return new Array[T](0)
- }
-
- val maxSampleSize = Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt
- if (num > maxSampleSize) {
- throw new IllegalArgumentException("Cannot support a sample size > Int.MaxValue - " +
- s"$numStDev * math.sqrt(Int.MaxValue)")
- }
-
- val rand = new Random(seed)
- if (!withReplacement && num >= initialCount) {
- return Utils.randomizeInPlace(this.collect(), rand)
- }
-
- val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
- withReplacement)
-
- var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+ require(num >= 0, "Negative number of elements requested")
+ require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
+ "Cannot support a sample size > Int.MaxValue - " +
+ s"$numStDev * math.sqrt(Int.MaxValue)")
- // If the first sample didn't turn out large enough, keep trying to take samples;
- // this shouldn't happen often because we use a big multiplier for the initial size
- var numIters = 0
- while (samples.length < num) {
- logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
- samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
- numIters += 1
+ if (num == 0) {
+ new Array[T](0)
+ } else {
+ val initialCount = this.count()
+ if (initialCount == 0) {
+ new Array[T](0)
+ } else {
+ val rand = new Random(seed)
+ if (!withReplacement && num >= initialCount) {
+ Utils.randomizeInPlace(this.collect(), rand)
+ } else {
+ val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
+ withReplacement)
+ var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+
+ // If the first sample didn't turn out large enough, keep trying to take samples;
+ // this shouldn't happen often because we use a big multiplier for the initial size
+ var numIters = 0
+ while (samples.length < num) {
+ logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
+ samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+ numIters += 1
+ }
+ Utils.randomizeInPlace(samples, rand).take(num)
+ }
+ }
}
-
- Utils.randomizeInPlace(samples, rand).take(num)
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org