You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/09/29 03:33:44 UTC

git commit: Revert "[SPARK-1021] Defer the data-driven computation of partition bounds in so..."

Repository: spark
Updated Branches:
  refs/heads/master 1f13a40cc -> 8e874185e


Revert "[SPARK-1021] Defer the data-driven computation of partition bounds in so..."

This reverts commit 2d972fd84ac54a89e416442508a6d4eaeff452c1.

The commit was hanging correlationoptimizer14.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8e874185
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8e874185
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8e874185

Branch: refs/heads/master
Commit: 8e874185ed9efae8a1dc6b61d56ff401d72bb087
Parents: 1f13a40
Author: Reynold Xin <rx...@apache.org>
Authored: Sun Sep 28 18:33:11 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Sep 28 18:33:11 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/FutureAction.scala   |  7 +--
 .../scala/org/apache/spark/Partitioner.scala    | 29 ++-------
 .../org/apache/spark/rdd/AsyncRDDActions.scala  | 64 ++++++++------------
 3 files changed, 34 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8e874185/core/src/main/scala/org/apache/spark/FutureAction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index c277c3a..75ea535 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -208,7 +208,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
       processPartition: Iterator[T] => U,
       partitions: Seq[Int],
       resultHandler: (Int, U) => Unit,
-      resultFunc: => R): R = {
+      resultFunc: => R) {
     // If the action hasn't been cancelled yet, submit the job. The check and the submitJob
     // command need to be in an atomic block.
     val job = this.synchronized {
@@ -223,10 +223,7 @@ class ComplexFutureAction[T] extends FutureAction[T] {
     // cancel the job and stop the execution. This is not in a synchronized block because
     // Await.ready eventually waits on the monitor in FutureJob.jobWaiter.
     try {
-      Await.ready(job, Duration.Inf).value.get match {
-        case scala.util.Failure(e) => throw e
-        case scala.util.Success(v) => v
-      }
+      Await.ready(job, Duration.Inf)
     } catch {
       case e: InterruptedException =>
         job.cancel()

http://git-wip-us.apache.org/repos/asf/spark/blob/8e874185/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index d40b152..37053bb 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -29,10 +29,6 @@ import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.util.{CollectionsUtils, Utils}
 import org.apache.spark.util.random.{XORShiftRandom, SamplingUtils}
 
-import org.apache.spark.SparkContext.rddToAsyncRDDActions
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
 /**
  * An object that defines how the elements in a key-value pair RDD are partitioned by key.
  * Maps each key to a partition ID, from 0 to `numPartitions - 1`.
@@ -117,12 +113,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   private var ordering = implicitly[Ordering[K]]
 
   // An array of upper bounds for the first (partitions - 1) partitions
-  @volatile private var valRB: Array[K] = null
-
-  private def rangeBounds: Array[K] = this.synchronized {
-    if (valRB != null) return valRB
-
-    valRB = if (partitions <= 1) {
+  private var rangeBounds: Array[K] = {
+    if (partitions <= 1) {
       Array.empty
     } else {
       // This is the sample size we need to have roughly balanced output partitions, capped at 1M.
@@ -160,8 +152,6 @@ class RangePartitioner[K : Ordering : ClassTag, V](
         RangePartitioner.determineBounds(candidates, partitions)
       }
     }
-
-    valRB
   }
 
   def numPartitions = rangeBounds.length + 1
@@ -232,8 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
   }
 
   @throws(classOf[IOException])
-  private def readObject(in: ObjectInputStream): Unit = this.synchronized {
-    if (valRB != null) return
+  private def readObject(in: ObjectInputStream) {
     val sfactory = SparkEnv.get.serializer
     sfactory match {
       case js: JavaSerializer => in.defaultReadObject()
@@ -245,7 +234,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
         val ser = sfactory.newInstance()
         Utils.deserializeViaNestedStream(in, ser) { ds =>
           implicit val classTag = ds.readObject[ClassTag[Array[K]]]()
-          valRB = ds.readObject[Array[K]]()
+          rangeBounds = ds.readObject[Array[K]]()
         }
     }
   }
@@ -265,18 +254,12 @@ private[spark] object RangePartitioner {
       sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
     val shift = rdd.id
     // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
-    // use collectAsync here to run this job as a future, which is cancellable
-    val sketchFuture = rdd.mapPartitionsWithIndex { (idx, iter) =>
+    val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
       val seed = byteswap32(idx ^ (shift << 16))
       val (sample, n) = SamplingUtils.reservoirSampleAndCount(
         iter, sampleSizePerPartition, seed)
       Iterator((idx, n, sample))
-    }.collectAsync()
-    // We do need the future's value to continue any further
-    val sketched = Await.ready(sketchFuture, Duration.Inf).value.get match {
-      case scala.util.Success(v) => v.toArray
-      case scala.util.Failure(e) => throw e
-    }
+    }.collect()
     val numItems = sketched.map(_._2.toLong).sum
     (numItems, sketched)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8e874185/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 7a68b3a..b62f3fb 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -23,7 +23,6 @@ import scala.collection.mutable.ArrayBuffer
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.reflect.ClassTag
 
-import org.apache.spark.util.Utils
 import org.apache.spark.{ComplexFutureAction, FutureAction, Logging}
 import org.apache.spark.annotation.Experimental
 
@@ -39,30 +38,29 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
    * Returns a future for counting the number of elements in the RDD.
    */
   def countAsync(): FutureAction[Long] = {
-    val f = new ComplexFutureAction[Long]
-    f.run {
-      val totalCount = new AtomicLong
-      f.runJob(self,
-               (iter: Iterator[T]) => Utils.getIteratorSize(iter),
-               Range(0, self.partitions.size),
-               (index: Int, data: Long) => totalCount.addAndGet(data),
-               totalCount.get())
-    }
+    val totalCount = new AtomicLong
+    self.context.submitJob(
+      self,
+      (iter: Iterator[T]) => {
+        var result = 0L
+        while (iter.hasNext) {
+          result += 1L
+          iter.next()
+        }
+        result
+      },
+      Range(0, self.partitions.size),
+      (index: Int, data: Long) => totalCount.addAndGet(data),
+      totalCount.get())
   }
 
   /**
    * Returns a future for retrieving all elements of this RDD.
    */
   def collectAsync(): FutureAction[Seq[T]] = {
-    val f = new ComplexFutureAction[Seq[T]]
-    f.run {
-      val results = new Array[Array[T]](self.partitions.size)
-      f.runJob(self,
-               (iter: Iterator[T]) => iter.toArray,
-               Range(0, self.partitions.size),
-               (index: Int, data: Array[T]) => results(index) = data,
-               results.flatten.toSeq)
-    }
+    val results = new Array[Array[T]](self.partitions.size)
+    self.context.submitJob[T, Array[T], Seq[T]](self, _.toArray, Range(0, self.partitions.size),
+      (index, data) => results(index) = data, results.flatten.toSeq)
   }
 
   /**
@@ -106,34 +104,24 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
       }
       results.toSeq
     }
+
+    f
   }
 
   /**
    * Applies a function f to all elements of this RDD.
    */
-  def foreachAsync(expr: T => Unit): FutureAction[Unit] = {
-    val f = new ComplexFutureAction[Unit]
-    val exprClean = self.context.clean(expr)
-    f.run {
-      f.runJob(self,
-               (iter: Iterator[T]) => iter.foreach(exprClean),
-               Range(0, self.partitions.size),
-               (index: Int, data: Unit) => Unit,
-               Unit)
-    }
+  def foreachAsync(f: T => Unit): FutureAction[Unit] = {
+    val cleanF = self.context.clean(f)
+    self.context.submitJob[T, Unit, Unit](self, _.foreach(cleanF), Range(0, self.partitions.size),
+      (index, data) => Unit, Unit)
   }
 
   /**
    * Applies a function f to each partition of this RDD.
    */
-  def foreachPartitionAsync(expr: Iterator[T] => Unit): FutureAction[Unit] = {
-    val f = new ComplexFutureAction[Unit]
-    f.run {
-      f.runJob(self,
-               expr,
-               Range(0, self.partitions.size),
-               (index: Int, data: Unit) => Unit,
-               Unit)
-    }
+  def foreachPartitionAsync(f: Iterator[T] => Unit): FutureAction[Unit] = {
+    self.context.submitJob[T, Unit, Unit](self, f, Range(0, self.partitions.size),
+      (index, data) => Unit, Unit)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org