You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:21 UTC
[37/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
new file mode 100644
index 0000000..c5d51be
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import org.apache.spark._
+import org.apache.spark.scheduler.JobListener
+
+/**
+ * A JobListener for an approximate single-result action, such as count() or non-parallel reduce().
+ * This listener waits up to timeout milliseconds and will return a partial answer even if the
+ * complete answer is not available by then.
+ *
+ * This class assumes that the action is performed on an entire RDD[T] via a function that computes
+ * a result of type U for each partition, and that the action returns a partial or complete result
+ * of type R. Note that the type R must *include* any error bars on it (e.g. see BoundedInt).
+ */
+private[spark] class ApproximateActionListener[T, U, R](
+ rdd: RDD[T],
+ func: (TaskContext, Iterator[T]) => U,
+ evaluator: ApproximateEvaluator[U, R],
+ timeout: Long)
+ extends JobListener {
+
+ val startTime = System.currentTimeMillis()
+ val totalTasks = rdd.partitions.size
+ var finishedTasks = 0
+ var failure: Option[Exception] = None // Set if the job has failed (permanently)
+ var resultObject: Option[PartialResult[R]] = None // Set if we've already returned a PartialResult
+
+ override def taskSucceeded(index: Int, result: Any) {
+ synchronized {
+ evaluator.merge(index, result.asInstanceOf[U])
+ finishedTasks += 1
+ if (finishedTasks == totalTasks) {
+ // If we had already returned a PartialResult, set its final value
+ resultObject.foreach(r => r.setFinalValue(evaluator.currentResult()))
+ // Notify any waiting thread that may have called awaitResult
+ this.notifyAll()
+ }
+ }
+ }
+
+ override def jobFailed(exception: Exception) {
+ synchronized {
+ failure = Some(exception)
+ this.notifyAll()
+ }
+ }
+
+ /**
+ * Waits for up to timeout milliseconds since the listener was created and then returns a
+ * PartialResult with the result so far. This may be complete if the whole job is done.
+ */
+ def awaitResult(): PartialResult[R] = synchronized {
+ val finishTime = startTime + timeout
+ while (true) {
+ val time = System.currentTimeMillis()
+ if (failure != None) {
+ throw failure.get
+ } else if (finishedTasks == totalTasks) {
+ return new PartialResult(evaluator.currentResult(), true)
+ } else if (time >= finishTime) {
+ resultObject = Some(new PartialResult(evaluator.currentResult(), false))
+ return resultObject.get
+ } else {
+ this.wait(finishTime - time)
+ }
+ }
+ // Should never be reached, but required to keep the compiler happy
+ return null
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/ApproximateEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/ApproximateEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/ApproximateEvaluator.scala
new file mode 100644
index 0000000..9c2859c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/ApproximateEvaluator.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+/**
+ * An object that computes a function incrementally by merging in results of type U from multiple
+ * tasks. Allows partial evaluation at any point by calling currentResult().
+ */
+private[spark] trait ApproximateEvaluator[U, R] {
+ def merge(outputId: Int, taskResult: U): Unit
+ def currentResult(): R
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
new file mode 100644
index 0000000..5f44508
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+/**
+ * A Double with error bars on it.
+ */
+class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
+ override def toString(): String = "[%.3f, %.3f]".format(low, high)
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
new file mode 100644
index 0000000..3155dfe
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import cern.jet.stat.Probability
+
+/**
+ * An ApproximateEvaluator for counts.
+ *
+ * TODO: There's currently a lot of shared code between this and GroupedCountEvaluator. It might
+ * be best to make this a special case of GroupedCountEvaluator with one group.
+ */
+private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double)
+ extends ApproximateEvaluator[Long, BoundedDouble] {
+
+ var outputsMerged = 0
+ var sum: Long = 0
+
+ override def merge(outputId: Int, taskResult: Long) {
+ outputsMerged += 1
+ sum += taskResult
+ }
+
+ override def currentResult(): BoundedDouble = {
+ if (outputsMerged == totalOutputs) {
+ new BoundedDouble(sum, 1.0, sum, sum)
+ } else if (outputsMerged == 0) {
+ new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
+ } else {
+ val p = outputsMerged.toDouble / totalOutputs
+ val mean = (sum + 1 - p) / p
+ val variance = (sum + 1) * (1 - p) / (p * p)
+ val stdev = math.sqrt(variance)
+ val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
+ val low = mean - confFactor * stdev
+ val high = mean + confFactor * stdev
+ new BoundedDouble(mean, confidence, low, high)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
new file mode 100644
index 0000000..e519e3a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import java.util.{HashMap => JHashMap}
+import java.util.{Map => JMap}
+
+import scala.collection.Map
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions.mapAsScalaMap
+
+import cern.jet.stat.Probability
+
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+
+/**
+ * An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
+ */
+private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
+ extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
+
+ var outputsMerged = 0
+ var sums = new OLMap[T] // Sum of counts for each key
+
+ override def merge(outputId: Int, taskResult: OLMap[T]) {
+ outputsMerged += 1
+ val iter = taskResult.object2LongEntrySet.fastIterator()
+ while (iter.hasNext) {
+ val entry = iter.next()
+ sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue)
+ }
+ }
+
+ override def currentResult(): Map[T, BoundedDouble] = {
+ if (outputsMerged == totalOutputs) {
+ val result = new JHashMap[T, BoundedDouble](sums.size)
+ val iter = sums.object2LongEntrySet.fastIterator()
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val sum = entry.getLongValue()
+ result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
+ }
+ result
+ } else if (outputsMerged == 0) {
+ new HashMap[T, BoundedDouble]
+ } else {
+ val p = outputsMerged.toDouble / totalOutputs
+ val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
+ val result = new JHashMap[T, BoundedDouble](sums.size)
+ val iter = sums.object2LongEntrySet.fastIterator()
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val sum = entry.getLongValue
+ val mean = (sum + 1 - p) / p
+ val variance = (sum + 1) * (1 - p) / (p * p)
+ val stdev = math.sqrt(variance)
+ val low = mean - confFactor * stdev
+ val high = mean + confFactor * stdev
+ result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
+ }
+ result
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
new file mode 100644
index 0000000..cf8a568
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import java.util.{HashMap => JHashMap}
+import java.util.{Map => JMap}
+
+import scala.collection.mutable.HashMap
+import scala.collection.Map
+import scala.collection.JavaConversions.mapAsScalaMap
+
+import org.apache.spark.util.StatCounter
+
+/**
+ * An ApproximateEvaluator for means by key. Returns a map of key to confidence interval.
+ */
+private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Double)
+ extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
+
+ var outputsMerged = 0
+ var sums = new JHashMap[T, StatCounter] // Sum of counts for each key
+
+ override def merge(outputId: Int, taskResult: JHashMap[T, StatCounter]) {
+ outputsMerged += 1
+ val iter = taskResult.entrySet.iterator()
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val old = sums.get(entry.getKey)
+ if (old != null) {
+ old.merge(entry.getValue)
+ } else {
+ sums.put(entry.getKey, entry.getValue)
+ }
+ }
+ }
+
+ override def currentResult(): Map[T, BoundedDouble] = {
+ if (outputsMerged == totalOutputs) {
+ val result = new JHashMap[T, BoundedDouble](sums.size)
+ val iter = sums.entrySet.iterator()
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val mean = entry.getValue.mean
+ result(entry.getKey) = new BoundedDouble(mean, 1.0, mean, mean)
+ }
+ result
+ } else if (outputsMerged == 0) {
+ new HashMap[T, BoundedDouble]
+ } else {
+ val p = outputsMerged.toDouble / totalOutputs
+ val studentTCacher = new StudentTCacher(confidence)
+ val result = new JHashMap[T, BoundedDouble](sums.size)
+ val iter = sums.entrySet.iterator()
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val counter = entry.getValue
+ val mean = counter.mean
+ val stdev = math.sqrt(counter.sampleVariance / counter.count)
+ val confFactor = studentTCacher.get(counter.count)
+ val low = mean - confFactor * stdev
+ val high = mean + confFactor * stdev
+ result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
+ }
+ result
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
new file mode 100644
index 0000000..8225a5d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import java.util.{HashMap => JHashMap}
+import java.util.{Map => JMap}
+
+import scala.collection.mutable.HashMap
+import scala.collection.Map
+import scala.collection.JavaConversions.mapAsScalaMap
+
+import org.apache.spark.util.StatCounter
+
+/**
+ * An ApproximateEvaluator for sums by key. Returns a map of key to confidence interval.
+ */
+private[spark] class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Double)
+ extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
+
+ var outputsMerged = 0
+ var sums = new JHashMap[T, StatCounter] // Sum of counts for each key
+
+ override def merge(outputId: Int, taskResult: JHashMap[T, StatCounter]) {
+ outputsMerged += 1
+ val iter = taskResult.entrySet.iterator()
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val old = sums.get(entry.getKey)
+ if (old != null) {
+ old.merge(entry.getValue)
+ } else {
+ sums.put(entry.getKey, entry.getValue)
+ }
+ }
+ }
+
+ override def currentResult(): Map[T, BoundedDouble] = {
+ if (outputsMerged == totalOutputs) {
+ val result = new JHashMap[T, BoundedDouble](sums.size)
+ val iter = sums.entrySet.iterator()
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val sum = entry.getValue.sum
+ result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
+ }
+ result
+ } else if (outputsMerged == 0) {
+ new HashMap[T, BoundedDouble]
+ } else {
+ val p = outputsMerged.toDouble / totalOutputs
+ val studentTCacher = new StudentTCacher(confidence)
+ val result = new JHashMap[T, BoundedDouble](sums.size)
+ val iter = sums.entrySet.iterator()
+ while (iter.hasNext) {
+ val entry = iter.next()
+ val counter = entry.getValue
+ val meanEstimate = counter.mean
+ val meanVar = counter.sampleVariance / counter.count
+ val countEstimate = (counter.count + 1 - p) / p
+ val countVar = (counter.count + 1) * (1 - p) / (p * p)
+ val sumEstimate = meanEstimate * countEstimate
+ val sumVar = (meanEstimate * meanEstimate * countVar) +
+ (countEstimate * countEstimate * meanVar) +
+ (meanVar * countVar)
+ val sumStdev = math.sqrt(sumVar)
+ val confFactor = studentTCacher.get(counter.count)
+ val low = sumEstimate - confFactor * sumStdev
+ val high = sumEstimate + confFactor * sumStdev
+ result(entry.getKey) = new BoundedDouble(sumEstimate, confidence, low, high)
+ }
+ result
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala
new file mode 100644
index 0000000..d24959c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import cern.jet.stat.Probability
+
+import org.apache.spark.util.StatCounter
+
+/**
+ * An ApproximateEvaluator for means.
+ */
+private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double)
+ extends ApproximateEvaluator[StatCounter, BoundedDouble] {
+
+ var outputsMerged = 0
+ var counter = new StatCounter
+
+ override def merge(outputId: Int, taskResult: StatCounter) {
+ outputsMerged += 1
+ counter.merge(taskResult)
+ }
+
+ override def currentResult(): BoundedDouble = {
+ if (outputsMerged == totalOutputs) {
+ new BoundedDouble(counter.mean, 1.0, counter.mean, counter.mean)
+ } else if (outputsMerged == 0) {
+ new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
+ } else {
+ val mean = counter.mean
+ val stdev = math.sqrt(counter.sampleVariance / counter.count)
+ val confFactor = {
+ if (counter.count > 100) {
+ Probability.normalInverse(1 - (1 - confidence) / 2)
+ } else {
+ Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt)
+ }
+ }
+ val low = mean - confFactor * stdev
+ val high = mean + confFactor * stdev
+ new BoundedDouble(mean, confidence, low, high)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
new file mode 100644
index 0000000..5ce49b8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+class PartialResult[R](initialVal: R, isFinal: Boolean) {
+ private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None
+ private var failure: Option[Exception] = None
+ private var completionHandler: Option[R => Unit] = None
+ private var failureHandler: Option[Exception => Unit] = None
+
+ def initialValue: R = initialVal
+
+ def isInitialValueFinal: Boolean = isFinal
+
+ /**
+ * Blocking method to wait for and return the final value.
+ */
+ def getFinalValue(): R = synchronized {
+ while (finalValue == None && failure == None) {
+ this.wait()
+ }
+ if (finalValue != None) {
+ return finalValue.get
+ } else {
+ throw failure.get
+ }
+ }
+
+ /**
+ * Set a handler to be called when this PartialResult completes. Only one completion handler
+ * is supported per PartialResult.
+ */
+ def onComplete(handler: R => Unit): PartialResult[R] = synchronized {
+ if (completionHandler != None) {
+ throw new UnsupportedOperationException("onComplete cannot be called twice")
+ }
+ completionHandler = Some(handler)
+ if (finalValue != None) {
+ // We already have a final value, so let's call the handler
+ handler(finalValue.get)
+ }
+ return this
+ }
+
+ /**
+ * Set a handler to be called if this PartialResult's job fails. Only one failure handler
+ * is supported per PartialResult.
+ */
+ def onFail(handler: Exception => Unit) {
+ synchronized {
+ if (failureHandler != None) {
+ throw new UnsupportedOperationException("onFail cannot be called twice")
+ }
+ failureHandler = Some(handler)
+ if (failure != None) {
+ // We already have a failure, so let's call the handler
+ handler(failure.get)
+ }
+ }
+ }
+
+ /**
+ * Transform this PartialResult into a PartialResult of type T.
+ */
+ def map[T](f: R => T) : PartialResult[T] = {
+ new PartialResult[T](f(initialVal), isFinal) {
+ override def getFinalValue() : T = synchronized {
+ f(PartialResult.this.getFinalValue())
+ }
+ override def onComplete(handler: T => Unit): PartialResult[T] = synchronized {
+ PartialResult.this.onComplete(handler.compose(f)).map(f)
+ }
+ override def onFail(handler: Exception => Unit) {
+ synchronized {
+ PartialResult.this.onFail(handler)
+ }
+ }
+ override def toString : String = synchronized {
+ PartialResult.this.getFinalValueInternal() match {
+ case Some(value) => "(final: " + f(value) + ")"
+ case None => "(partial: " + initialValue + ")"
+ }
+ }
+ def getFinalValueInternal() = PartialResult.this.getFinalValueInternal().map(f)
+ }
+ }
+
+ private[spark] def setFinalValue(value: R) {
+ synchronized {
+ if (finalValue != None) {
+ throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult")
+ }
+ finalValue = Some(value)
+ // Call the completion handler if it was set
+ completionHandler.foreach(h => h(value))
+ // Notify any threads that may be calling getFinalValue()
+ this.notifyAll()
+ }
+ }
+
+ private def getFinalValueInternal() = finalValue
+
+ private[spark] def setFailure(exception: Exception) {
+ synchronized {
+ if (failure != None) {
+ throw new UnsupportedOperationException("setFailure called twice on a PartialResult")
+ }
+ failure = Some(exception)
+ // Call the failure handler if it was set
+ failureHandler.foreach(h => h(exception))
+ // Notify any threads that may be calling getFinalValue()
+ this.notifyAll()
+ }
+ }
+
+ override def toString: String = synchronized {
+ finalValue match {
+ case Some(value) => "(final: " + value + ")"
+ case None => "(partial: " + initialValue + ")"
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala b/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala
new file mode 100644
index 0000000..92915ee
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import cern.jet.stat.Probability
+
+/**
+ * A utility class for caching Student's T distribution values for a given confidence level
+ * and various sample sizes. This is used by the MeanEvaluator to efficiently calculate
+ * confidence intervals for many keys.
+ */
+private[spark] class StudentTCacher(confidence: Double) {
+ val NORMAL_APPROX_SAMPLE_SIZE = 100 // For samples bigger than this, use Gaussian approximation
+ val normalApprox = Probability.normalInverse(1 - (1 - confidence) / 2)
+ val cache = Array.fill[Double](NORMAL_APPROX_SAMPLE_SIZE)(-1.0)
+
+ def get(sampleSize: Long): Double = {
+ if (sampleSize >= NORMAL_APPROX_SAMPLE_SIZE) {
+ normalApprox
+ } else {
+ val size = sampleSize.toInt
+ if (cache(size) < 0) {
+ cache(size) = Probability.studentTInverse(1 - confidence, size - 1)
+ }
+ cache(size)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala
new file mode 100644
index 0000000..a74f800
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import cern.jet.stat.Probability
+
+import org.apache.spark.util.StatCounter
+
+/**
+ * An ApproximateEvaluator for sums. It estimates the mean and the cont and multiplies them
+ * together, then uses the formula for the variance of two independent random variables to get
+ * a variance for the result and compute a confidence interval.
+ */
+private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double)
+ extends ApproximateEvaluator[StatCounter, BoundedDouble] {
+
+ var outputsMerged = 0
+ var counter = new StatCounter
+
+ override def merge(outputId: Int, taskResult: StatCounter) {
+ outputsMerged += 1
+ counter.merge(taskResult)
+ }
+
+ override def currentResult(): BoundedDouble = {
+ if (outputsMerged == totalOutputs) {
+ new BoundedDouble(counter.sum, 1.0, counter.sum, counter.sum)
+ } else if (outputsMerged == 0) {
+ new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
+ } else {
+ val p = outputsMerged.toDouble / totalOutputs
+ val meanEstimate = counter.mean
+ val meanVar = counter.sampleVariance / counter.count
+ val countEstimate = (counter.count + 1 - p) / p
+ val countVar = (counter.count + 1) * (1 - p) / (p * p)
+ val sumEstimate = meanEstimate * countEstimate
+ val sumVar = (meanEstimate * meanEstimate * countVar) +
+ (countEstimate * countEstimate * meanVar) +
+ (meanVar * countVar)
+ val sumStdev = math.sqrt(sumVar)
+ val confFactor = {
+ if (counter.count > 100) {
+ Probability.normalInverse(1 - (1 - confidence) / 2)
+ } else {
+ Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt)
+ }
+ }
+ val low = sumEstimate - confFactor * sumStdev
+ val high = sumEstimate + confFactor * sumStdev
+ new BoundedDouble(sumEstimate, confidence, low, high)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
new file mode 100644
index 0000000..4bb01ef
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+import org.apache.spark.storage.BlockManager
+
+private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
+ val index = idx
+}
+
+private[spark]
+class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
+ extends RDD[T](sc, Nil) {
+
+ @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
+
+ override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
+ new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
+ }).toArray
+
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ val blockManager = SparkEnv.get.blockManager
+ val blockId = split.asInstanceOf[BlockRDDPartition].blockId
+ blockManager.get(blockId) match {
+ case Some(block) => block.asInstanceOf[Iterator[T]]
+ case None =>
+ throw new Exception("Could not compute split, block " + blockId + " not found")
+ }
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ locations_(split.asInstanceOf[BlockRDDPartition].blockId)
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
new file mode 100644
index 0000000..9b0c882
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.{ObjectOutputStream, IOException}
+import org.apache.spark._
+
+
+private[spark]
+class CartesianPartition(
+ idx: Int,
+ @transient rdd1: RDD[_],
+ @transient rdd2: RDD[_],
+ s1Index: Int,
+ s2Index: Int
+ ) extends Partition {
+ var s1 = rdd1.partitions(s1Index)
+ var s2 = rdd2.partitions(s2Index)
+ override val index: Int = idx
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ // Update the reference to parent split at the time of task serialization
+ s1 = rdd1.partitions(s1Index)
+ s2 = rdd2.partitions(s2Index)
+ oos.defaultWriteObject()
+ }
+}
+
+private[spark]
+class CartesianRDD[T: ClassManifest, U:ClassManifest](
+ sc: SparkContext,
+ var rdd1 : RDD[T],
+ var rdd2 : RDD[U])
+ extends RDD[Pair[T, U]](sc, Nil)
+ with Serializable {
+
+ val numPartitionsInRdd2 = rdd2.partitions.size
+
+ override def getPartitions: Array[Partition] = {
+ // create the cross product split
+ val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
+ for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
+ val idx = s1.index * numPartitionsInRdd2 + s2.index
+ array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
+ }
+ array
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val currSplit = split.asInstanceOf[CartesianPartition]
+ (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
+ }
+
+ override def compute(split: Partition, context: TaskContext) = {
+ val currSplit = split.asInstanceOf[CartesianPartition]
+ for (x <- rdd1.iterator(currSplit.s1, context);
+ y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+ }
+
+ override def getDependencies: Seq[Dependency[_]] = List(
+ new NarrowDependency(rdd1) {
+ def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
+ },
+ new NarrowDependency(rdd2) {
+ def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
+ }
+ )
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdd1 = null
+ rdd2 = null
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
new file mode 100644
index 0000000..3311757
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark._
+import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.{NullWritable, BytesWritable}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.hadoop.fs.Path
+import java.io.{File, IOException, EOFException}
+import java.text.NumberFormat
+
+private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
+
+/**
+ * This RDD represents a RDD checkpoint file (similar to HadoopRDD).
+ */
+private[spark]
+class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
+ extends RDD[T](sc, Nil) {
+
+ @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
+
+ override def getPartitions: Array[Partition] = {
+ val cpath = new Path(checkpointPath)
+ val numPartitions =
+ // listStatus can throw exception if path does not exist.
+ if (fs.exists(cpath)) {
+ val dirContents = fs.listStatus(cpath)
+ val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
+ val numPart = partitionFiles.size
+ if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
+ ! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
+ throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
+ }
+ numPart
+ } else 0
+
+ Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i))
+ }
+
+ checkpointData = Some(new RDDCheckpointData[T](this))
+ checkpointData.get.cpFile = Some(checkpointPath)
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ val status = fs.getFileStatus(new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)))
+ val locations = fs.getFileBlockLocations(status, 0, status.getLen)
+ locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
+ }
+
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
+ CheckpointRDD.readFromFile(file, context)
+ }
+
+ override def checkpoint() {
+ // Do nothing. CheckpointRDD should not be checkpointed.
+ }
+}
+
+private[spark] object CheckpointRDD extends Logging {
+
+ def splitIdToFile(splitId: Int): String = {
+ "part-%05d".format(splitId)
+ }
+
+ def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
+ val env = SparkEnv.get
+ val outputDir = new Path(path)
+ val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
+
+ val finalOutputName = splitIdToFile(ctx.splitId)
+ val finalOutputPath = new Path(outputDir, finalOutputName)
+ val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId)
+
+ if (fs.exists(tempOutputPath)) {
+ throw new IOException("Checkpoint failed: temporary path " +
+ tempOutputPath + " already exists")
+ }
+ val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+
+ val fileOutputStream = if (blockSize < 0) {
+ fs.create(tempOutputPath, false, bufferSize)
+ } else {
+ // This is mainly for testing purpose
+ fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
+ }
+ val serializer = env.serializer.newInstance()
+ val serializeStream = serializer.serializeStream(fileOutputStream)
+ serializeStream.writeAll(iterator)
+ serializeStream.close()
+
+ if (!fs.rename(tempOutputPath, finalOutputPath)) {
+ if (!fs.exists(finalOutputPath)) {
+ logInfo("Deleting tempOutputPath " + tempOutputPath)
+ fs.delete(tempOutputPath, false)
+ throw new IOException("Checkpoint failed: failed to save output of task: "
+ + ctx.attemptId + " and final output path does not exist")
+ } else {
+ // Some other copy of this task must've finished before us and renamed it
+ logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
+ fs.delete(tempOutputPath, false)
+ }
+ }
+ }
+
+ def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
+ val env = SparkEnv.get
+ val fs = path.getFileSystem(env.hadoop.newConfiguration())
+ val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val fileInputStream = fs.open(path, bufferSize)
+ val serializer = env.serializer.newInstance()
+ val deserializeStream = serializer.deserializeStream(fileInputStream)
+
+ // Register an on-task-completion callback to close the input stream.
+ context.addOnCompleteCallback(() => deserializeStream.close())
+
+ deserializeStream.asIterator.asInstanceOf[Iterator[T]]
+ }
+
+ // Test whether CheckpointRDD generate expected number of partitions despite
+ // each split file having multiple blocks. This needs to be run on a
+ // cluster (mesos or standalone) using HDFS.
+ def main(args: Array[String]) {
+ import org.apache.spark._
+
+ val Array(cluster, hdfsPath) = args
+ val env = SparkEnv.get
+ val sc = new SparkContext(cluster, "CheckpointRDD Test")
+ val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
+ val path = new Path(hdfsPath, "temp")
+ val fs = path.getFileSystem(env.hadoop.newConfiguration())
+ sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
+ val cpRDD = new CheckpointRDD[Int](sc, path.toString)
+ assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
+ assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
+ fs.delete(path, true)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
new file mode 100644
index 0000000..dcc35e8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.{ObjectOutputStream, IOException}
+import java.util.{HashMap => JHashMap}
+
+import scala.collection.JavaConversions
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
+import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
+
+
+private[spark] sealed trait CoGroupSplitDep extends Serializable
+
+private[spark] case class NarrowCoGroupSplitDep(
+ rdd: RDD[_],
+ splitIndex: Int,
+ var split: Partition
+ ) extends CoGroupSplitDep {
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ // Update the reference to parent split at the time of task serialization
+ split = rdd.partitions(splitIndex)
+ oos.defaultWriteObject()
+ }
+}
+
+private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
+
+private[spark]
+class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
+ extends Partition with Serializable {
+ override val index: Int = idx
+ override def hashCode(): Int = idx
+}
+
+
+/**
+ * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
+ * tuple with the list of values for that key.
+ *
+ * @param rdds parent RDDs.
+ * @param part partitioner used to partition the shuffle output.
+ */
+class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
+ extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
+
+ private var serializerClass: String = null
+
+ def setSerializer(cls: String): CoGroupedRDD[K] = {
+ serializerClass = cls
+ this
+ }
+
+ override def getDependencies: Seq[Dependency[_]] = {
+ rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
+ if (rdd.partitioner == Some(part)) {
+ logDebug("Adding one-to-one dependency with " + rdd)
+ new OneToOneDependency(rdd)
+ } else {
+ logDebug("Adding shuffle dependency with " + rdd)
+ new ShuffleDependency[Any, Any](rdd, part, serializerClass)
+ }
+ }
+ }
+
+ override def getPartitions: Array[Partition] = {
+ val array = new Array[Partition](part.numPartitions)
+ for (i <- 0 until array.size) {
+ // Each CoGroupPartition will have a dependency per contributing RDD
+ array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
+ // Assume each RDD contributed a single dependency, and get it
+ dependencies(j) match {
+ case s: ShuffleDependency[_, _] =>
+ new ShuffleCoGroupSplitDep(s.shuffleId)
+ case _ =>
+ new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
+ }
+ }.toArray)
+ }
+ array
+ }
+
+ override val partitioner = Some(part)
+
+ override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
+ val split = s.asInstanceOf[CoGroupPartition]
+ val numRdds = split.deps.size
+ // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
+ val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
+
+ def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
+ val seq = map.get(k)
+ if (seq != null) {
+ seq
+ } else {
+ val seq = Array.fill(numRdds)(new ArrayBuffer[Any])
+ map.put(k, seq)
+ seq
+ }
+ }
+
+ val ser = SparkEnv.get.serializerManager.get(serializerClass)
+ for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
+ case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
+ // Read them from the parent
+ rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv =>
+ getSeq(kv._1)(depNum) += kv._2
+ }
+ }
+ case ShuffleCoGroupSplitDep(shuffleId) => {
+ // Read map outputs of shuffle
+ val fetcher = SparkEnv.get.shuffleFetcher
+ fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
+ kv => getSeq(kv._1)(depNum) += kv._2
+ }
+ }
+ }
+ JavaConversions.mapAsScalaMap(map).iterator
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ rdds = null
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
new file mode 100644
index 0000000..c5de636
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark._
+import java.io.{ObjectOutputStream, IOException}
+import scala.collection.mutable
+import scala.Some
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Class that captures a coalesced RDD by essentially keeping track of parent partitions
+ * @param index of this coalesced partition
+ * @param rdd which it belongs to
+ * @param parentsIndices list of indices in the parent that have been coalesced into this partition
+ * @param preferredLocation the preferred location for this partition
+ */
+case class CoalescedRDDPartition(
+ index: Int,
+ @transient rdd: RDD[_],
+ parentsIndices: Array[Int],
+ @transient preferredLocation: String = ""
+ ) extends Partition {
+ var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ // Update the reference to parent partition at the time of task serialization
+ parents = parentsIndices.map(rdd.partitions(_))
+ oos.defaultWriteObject()
+ }
+
+ /**
+ * Computes how many of the parents partitions have getPreferredLocation
+ * as one of their preferredLocations
+ * @return locality of this coalesced partition between 0 and 1
+ */
+ def localFraction: Double = {
+ val loc = parents.count(p =>
+ rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))
+
+ if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
+ }
+}
+
+/**
+ * Represents a coalesced RDD that has fewer partitions than its parent RDD
+ * This class uses the PartitionCoalescer class to find a good partitioning of the parent RDD
+ * so that each new partition has roughly the same number of parent partitions and that
+ * the preferred location of each new partition overlaps with as many preferred locations of its
+ * parent partitions
+ * @param prev RDD to be coalesced
+ * @param maxPartitions number of desired partitions in the coalesced RDD
+ * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
+ */
+class CoalescedRDD[T: ClassManifest](
+ @transient var prev: RDD[T],
+ maxPartitions: Int,
+ balanceSlack: Double = 0.10)
+ extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
+
+ override def getPartitions: Array[Partition] = {
+ val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
+
+ pc.run().zipWithIndex.map {
+ case (pg, i) =>
+ val ids = pg.arr.map(_.index).toArray
+ new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
+ }
+ }
+
+ override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
+ partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
+ firstParent[T].iterator(parentPartition, context)
+ }
+ }
+
+ override def getDependencies: Seq[Dependency[_]] = {
+ Seq(new NarrowDependency(prev) {
+ def getParents(id: Int): Seq[Int] =
+ partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
+ })
+ }
+
+ override def clearDependencies() {
+ super.clearDependencies()
+ prev = null
+ }
+
+ /**
+ * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
+ * then the preferred machine will be one which most parent splits prefer too.
+ * @param partition
+ * @return the machine most preferred by split
+ */
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
+ List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
+ }
+}
+
+/**
+ * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
+ * this RDD computes one or more of the parent ones. It will produce exactly `maxPartitions` if the
+ * parent had more than maxPartitions, or fewer if the parent had fewer.
+ *
+ * This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
+ * or to avoid having a large number of small tasks when processing a directory with many files.
+ *
+ * If there is no locality information (no preferredLocations) in the parent, then the coalescing
+ * is very simple: chunk parents that are close in the Array in chunks.
+ * If there is locality information, it proceeds to pack them with the following four goals:
+ *
+ * (1) Balance the groups so they roughly have the same number of parent partitions
+ * (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer
+ * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard)
+ * (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine
+ *
+ * Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000.
+ * We assume the final number of desired partitions is small, e.g. less than 1000.
+ *
+ * The algorithm tries to assign unique preferred machines to each partition. If the number of
+ * desired partitions is greater than the number of preferred machines (can happen), it needs to
+ * start picking duplicate preferred machines. This is determined using coupon collector estimation
+ * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist:
+ * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two
+ * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions
+ * according to locality. (contact alig for questions)
+ *
+ */
+
+private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
+
+ def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
+ def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
+ if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
+
+ val rnd = new scala.util.Random(7919) // keep this class deterministic
+
+ // each element of groupArr represents one coalesced partition
+ val groupArr = ArrayBuffer[PartitionGroup]()
+
+ // hash used to check whether some machine is already in groupArr
+ val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]()
+
+ // hash used for the first maxPartitions (to avoid duplicates)
+ val initialHash = mutable.Set[Partition]()
+
+ // determines the tradeoff between load-balancing the partitions sizes and their locality
+ // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
+ val slack = (balanceSlack * prev.partitions.size).toInt
+
+ var noLocality = true // if true if no preferredLocations exists for parent RDD
+
+ // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
+ def currPrefLocs(part: Partition): Seq[String] = {
+ prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
+ }
+
+ // this class just keeps iterating and rotating infinitely over the partitions of the RDD
+ // next() returns the next preferred machine that a partition is replicated on
+ // the rotator first goes through the first replica copy of each partition, then second, third
+ // the iterators return type is a tuple: (replicaString, partition)
+ class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] {
+
+ var it: Iterator[(String, Partition)] = resetIterator()
+
+ override val isEmpty = !it.hasNext
+
+ // initializes/resets to start iterating from the beginning
+ def resetIterator() = {
+ val iterators = (0 to 2).map( x =>
+ prev.partitions.iterator.flatMap(p => {
+ if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
+ } )
+ )
+ iterators.reduceLeft((x, y) => x ++ y)
+ }
+
+ // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
+ def hasNext(): Boolean = { !isEmpty }
+
+ // return the next preferredLocation of some partition of the RDD
+ def next(): (String, Partition) = {
+ if (it.hasNext)
+ it.next()
+ else {
+ it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
+ it.next()
+ }
+ }
+ }
+
+ /**
+ * Sorts and gets the least element of the list associated with key in groupHash
+ * The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
+ * @param key string representing a partitioned group on preferred machine key
+ * @return Option of PartitionGroup that has least elements for key
+ */
+ def getLeastGroupHash(key: String): Option[PartitionGroup] = {
+ groupHash.get(key).map(_.sortWith(compare).head)
+ }
+
+ def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
+ if (!initialHash.contains(part)) {
+ pgroup.arr += part // already assign this element
+ initialHash += part // needed to avoid assigning partitions to multiple buckets
+ true
+ } else { false }
+ }
+
+ /**
+ * Initializes targetLen partition groups and assigns a preferredLocation
+ * This uses coupon collector to estimate how many preferredLocations it must rotate through
+ * until it has seen most of the preferred locations (2 * n log(n))
+ * @param targetLen
+ */
+ def setupGroups(targetLen: Int) {
+ val rotIt = new LocationIterator(prev)
+
+ // deal with empty case, just create targetLen partition groups with no preferred location
+ if (!rotIt.hasNext()) {
+ (1 to targetLen).foreach(x => groupArr += PartitionGroup())
+ return
+ }
+
+ noLocality = false
+
+ // number of iterations needed to be certain that we've seen most preferred locations
+ val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt
+ var numCreated = 0
+ var tries = 0
+
+ // rotate through until either targetLen unique/distinct preferred locations have been created
+ // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations,
+ // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
+ while (numCreated < targetLen && tries < expectedCoupons2) {
+ tries += 1
+ val (nxt_replica, nxt_part) = rotIt.next()
+ if (!groupHash.contains(nxt_replica)) {
+ val pgroup = PartitionGroup(nxt_replica)
+ groupArr += pgroup
+ addPartToPGroup(nxt_part, pgroup)
+ groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
+ numCreated += 1
+ }
+ }
+
+ while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates
+ var (nxt_replica, nxt_part) = rotIt.next()
+ val pgroup = PartitionGroup(nxt_replica)
+ groupArr += pgroup
+ groupHash.get(nxt_replica).get += pgroup
+ var tries = 0
+ while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
+ nxt_part = rotIt.next()._2
+ tries += 1
+ }
+ numCreated += 1
+ }
+
+ }
+
+ /**
+ * Takes a parent RDD partition and decides which of the partition groups to put it in
+ * Takes locality into account, but also uses power of 2 choices to load balance
+ * It strikes a balance between the two use the balanceSlack variable
+ * @param p partition (ball to be thrown)
+ * @return partition group (bin to be put in)
+ */
+ def pickBin(p: Partition): PartitionGroup = {
+ val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
+ val prefPart = if (pref == Nil) None else pref.head
+
+ val r1 = rnd.nextInt(groupArr.size)
+ val r2 = rnd.nextInt(groupArr.size)
+ val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
+ if (prefPart== None) // if no preferred locations, just use basic power of two
+ return minPowerOfTwo
+
+ val prefPartActual = prefPart.get
+
+ if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows
+ return minPowerOfTwo // prefer balance over locality
+ else {
+ return prefPartActual // prefer locality over balance
+ }
+ }
+
+ def throwBalls() {
+ if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
+ if (maxPartitions > groupArr.size) { // just return prev.partitions
+ for ((p,i) <- prev.partitions.zipWithIndex) {
+ groupArr(i).arr += p
+ }
+ } else { // no locality available, then simply split partitions based on positions in array
+ for(i <- 0 until maxPartitions) {
+ val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
+ val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
+ (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
+ }
+ }
+ } else {
+ for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
+ pickBin(p).arr += p
+ }
+ }
+ }
+
+ def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
+
+ /**
+ * Runs the packing algorithm and returns an array of PartitionGroups that if possible are
+ * load balanced and grouped by locality
+ * @return array of partition groups
+ */
+ def run(): Array[PartitionGroup] = {
+ setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
+ throwBalls() // assign partitions (balls) to each group (bins)
+ getPartitions
+ }
+}
+
+private[spark] case class PartitionGroup(prefLoc: String = "") {
+ var arr = mutable.ArrayBuffer[Partition]()
+
+ def size = arr.size
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
new file mode 100644
index 0000000..24ce4ab
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+
+
+/**
+ * An RDD that is empty, i.e. has no element in it.
+ */
+class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
+
+ override def getPartitions: Array[Partition] = Array.empty
+
+ override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+ throw new UnsupportedOperationException("empty RDD")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
new file mode 100644
index 0000000..4df8ceb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{OneToOneDependency, RDD, Partition, TaskContext}
+
+private[spark] class FilteredRDD[T: ClassManifest](
+ prev: RDD[T],
+ f: T => Boolean)
+ extends RDD[T](prev) {
+
+ override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+ override val partitioner = prev.partitioner // Since filter cannot change a partition's keys
+
+ override def compute(split: Partition, context: TaskContext) =
+ firstParent[T].iterator(split, context).filter(f)
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
new file mode 100644
index 0000000..2bf7653
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, Partition, TaskContext}
+
+
+private[spark]
+class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
+ prev: RDD[T],
+ f: T => TraversableOnce[U])
+ extends RDD[U](prev) {
+
+ override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+ override def compute(split: Partition, context: TaskContext) =
+ firstParent[T].iterator(split, context).flatMap(f)
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
new file mode 100644
index 0000000..e544720
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{TaskContext, Partition, RDD}
+
+
+private[spark]
+class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U])
+ extends RDD[(K, U)](prev) {
+
+ override def getPartitions = firstParent[Product2[K, V]].partitions
+
+ override val partitioner = firstParent[Product2[K, V]].partitioner
+
+ override def compute(split: Partition, context: TaskContext) = {
+ firstParent[Product2[K, V]].iterator(split, context).flatMap { case Product2(k, v) =>
+ f(v).map(x => (k, x))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
new file mode 100644
index 0000000..2ce9419
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, Partition, TaskContext}
+
+private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
+ extends RDD[Array[T]](prev) {
+
+ override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+ override def compute(split: Partition, context: TaskContext) =
+ Array(firstParent[T].iterator(split, context).toArray).iterator
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
new file mode 100644
index 0000000..08e6154
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.EOFException
+import java.util.NoSuchElementException
+
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.FileInputFormat
+import org.apache.hadoop.mapred.InputFormat
+import org.apache.hadoop.mapred.InputSplit
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapred.RecordReader
+import org.apache.hadoop.mapred.Reporter
+import org.apache.hadoop.util.ReflectionUtils
+
+import org.apache.spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.util.NextIterator
+import org.apache.hadoop.conf.{Configuration, Configurable}
+
+
+/**
+ * A Spark split class that wraps around a Hadoop InputSplit.
+ */
+private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
+ extends Partition {
+
+ val inputSplit = new SerializableWritable[InputSplit](s)
+
+ override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
+
+ override val index: Int = idx
+}
+
+/**
+ * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
+ * system, or S3, tables in HBase, etc).
+ */
+class HadoopRDD[K, V](
+ sc: SparkContext,
+ @transient conf: JobConf,
+ inputFormatClass: Class[_ <: InputFormat[K, V]],
+ keyClass: Class[K],
+ valueClass: Class[V],
+ minSplits: Int)
+ extends RDD[(K, V)](sc, Nil) with Logging {
+
+ // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
+ private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+
+ override def getPartitions: Array[Partition] = {
+ val env = SparkEnv.get
+ env.hadoop.addCredentials(conf)
+ val inputFormat = createInputFormat(conf)
+ if (inputFormat.isInstanceOf[Configurable]) {
+ inputFormat.asInstanceOf[Configurable].setConf(conf)
+ }
+ val inputSplits = inputFormat.getSplits(conf, minSplits)
+ val array = new Array[Partition](inputSplits.size)
+ for (i <- 0 until inputSplits.size) {
+ array(i) = new HadoopPartition(id, i, inputSplits(i))
+ }
+ array
+ }
+
+ def createInputFormat(conf: JobConf): InputFormat[K, V] = {
+ ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+ .asInstanceOf[InputFormat[K, V]]
+ }
+
+ override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
+ val split = theSplit.asInstanceOf[HadoopPartition]
+ logInfo("Input split: " + split.inputSplit)
+ var reader: RecordReader[K, V] = null
+
+ val conf = confBroadcast.value.value
+ val fmt = createInputFormat(conf)
+ if (fmt.isInstanceOf[Configurable]) {
+ fmt.asInstanceOf[Configurable].setConf(conf)
+ }
+ reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
+
+ // Register an on-task-completion callback to close the input stream.
+ context.addOnCompleteCallback{ () => closeIfNeeded() }
+
+ val key: K = reader.createKey()
+ val value: V = reader.createValue()
+
+ override def getNext() = {
+ try {
+ finished = !reader.next(key, value)
+ } catch {
+ case eof: EOFException =>
+ finished = true
+ }
+ (key, value)
+ }
+
+ override def close() {
+ try {
+ reader.close()
+ } catch {
+ case e: Exception => logWarning("Exception in RecordReader.close()", e)
+ }
+ }
+ }
+
+ override def getPreferredLocations(split: Partition): Seq[String] = {
+ // TODO: Filtering out "localhost" in case of file:// URLs
+ val hadoopSplit = split.asInstanceOf[HadoopPartition]
+ hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
+ }
+
+ override def checkpoint() {
+ // Do nothing. Hadoop RDD should not be checkpointed.
+ }
+
+ def getConf: Configuration = confBroadcast.value.value
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
new file mode 100644
index 0000000..3db460b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.sql.{Connection, ResultSet}
+
+import org.apache.spark.{Logging, Partition, RDD, SparkContext, TaskContext}
+import org.apache.spark.util.NextIterator
+
+private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
+ override def index = idx
+}
+
+/**
+ * An RDD that executes an SQL query on a JDBC connection and reads results.
+ * For usage example, see test case JdbcRDDSuite.
+ *
+ * @param getConnection a function that returns an open Connection.
+ * The RDD takes care of closing the connection.
+ * @param sql the text of the query.
+ * The query must contain two ? placeholders for parameters used to partition the results.
+ * E.g. "select title, author from books where ? <= id and id <= ?"
+ * @param lowerBound the minimum value of the first placeholder
+ * @param upperBound the maximum value of the second placeholder
+ * The lower and upper bounds are inclusive.
+ * @param numPartitions the number of partitions.
+ * Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
+ * the query would be executed twice, once with (1, 10) and once with (11, 20)
+ * @param mapRow a function from a ResultSet to a single row of the desired result type(s).
+ * This should only call getInt, getString, etc; the RDD takes care of calling next.
+ * The default maps a ResultSet to an array of Object.
+ */
+class JdbcRDD[T: ClassManifest](
+ sc: SparkContext,
+ getConnection: () => Connection,
+ sql: String,
+ lowerBound: Long,
+ upperBound: Long,
+ numPartitions: Int,
+ mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
+ extends RDD[T](sc, Nil) with Logging {
+
+ override def getPartitions: Array[Partition] = {
+ // bounds are inclusive, hence the + 1 here and - 1 on end
+ val length = 1 + upperBound - lowerBound
+ (0 until numPartitions).map(i => {
+ val start = lowerBound + ((i * length) / numPartitions).toLong
+ val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
+ new JdbcPartition(i, start, end)
+ }).toArray
+ }
+
+ override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
+ context.addOnCompleteCallback{ () => closeIfNeeded() }
+ val part = thePart.asInstanceOf[JdbcPartition]
+ val conn = getConnection()
+ val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
+
+ // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
+ // rather than pulling entire resultset into memory.
+ // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
+ if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
+ stmt.setFetchSize(Integer.MIN_VALUE)
+ logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
+ }
+
+ stmt.setLong(1, part.lower)
+ stmt.setLong(2, part.upper)
+ val rs = stmt.executeQuery()
+
+ override def getNext: T = {
+ if (rs.next()) {
+ mapRow(rs)
+ } else {
+ finished = true
+ null.asInstanceOf[T]
+ }
+ }
+
+ override def close() {
+ try {
+ if (null != rs && ! rs.isClosed()) rs.close()
+ } catch {
+ case e: Exception => logWarning("Exception closing resultset", e)
+ }
+ try {
+ if (null != stmt && ! stmt.isClosed()) stmt.close()
+ } catch {
+ case e: Exception => logWarning("Exception closing statement", e)
+ }
+ try {
+ if (null != conn && ! stmt.isClosed()) conn.close()
+ logInfo("closed connection")
+ } catch {
+ case e: Exception => logWarning("Exception closing connection", e)
+ }
+ }
+ }
+}
+
+object JdbcRDD {
+ def resultSetToObjectArray(rs: ResultSet) = {
+ Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
new file mode 100644
index 0000000..13009d3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, Partition, TaskContext}
+
+
+private[spark]
+class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
+ prev: RDD[T],
+ f: Iterator[T] => Iterator[U],
+ preservesPartitioning: Boolean = false)
+ extends RDD[U](prev) {
+
+ override val partitioner =
+ if (preservesPartitioning) firstParent[T].partitioner else None
+
+ override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+ override def compute(split: Partition, context: TaskContext) =
+ f(firstParent[T].iterator(split, context))
+}