You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:04 UTC
[20/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/ApproximateEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/ApproximateEvaluator.scala b/core/src/main/scala/spark/partial/ApproximateEvaluator.scala
deleted file mode 100644
index 5eae144..0000000
--- a/core/src/main/scala/spark/partial/ApproximateEvaluator.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-/**
- * An object that computes a function incrementally by merging in results of type U from multiple
- * tasks. Allows partial evaluation at any point by calling currentResult().
- */
-private[spark] trait ApproximateEvaluator[U, R] {
- def merge(outputId: Int, taskResult: U): Unit
- def currentResult(): R
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/BoundedDouble.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/BoundedDouble.scala b/core/src/main/scala/spark/partial/BoundedDouble.scala
deleted file mode 100644
index 8bdbe6c..0000000
--- a/core/src/main/scala/spark/partial/BoundedDouble.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-/**
- * A Double with error bars on it.
- */
-class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
- override def toString(): String = "[%.3f, %.3f]".format(low, high)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/CountEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/CountEvaluator.scala b/core/src/main/scala/spark/partial/CountEvaluator.scala
deleted file mode 100644
index 6aa9209..0000000
--- a/core/src/main/scala/spark/partial/CountEvaluator.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import cern.jet.stat.Probability
-
-/**
- * An ApproximateEvaluator for counts.
- *
- * TODO: There's currently a lot of shared code between this and GroupedCountEvaluator. It might
- * be best to make this a special case of GroupedCountEvaluator with one group.
- */
-private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double)
- extends ApproximateEvaluator[Long, BoundedDouble] {
-
- var outputsMerged = 0
- var sum: Long = 0
-
- override def merge(outputId: Int, taskResult: Long) {
- outputsMerged += 1
- sum += taskResult
- }
-
- override def currentResult(): BoundedDouble = {
- if (outputsMerged == totalOutputs) {
- new BoundedDouble(sum, 1.0, sum, sum)
- } else if (outputsMerged == 0) {
- new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
- } else {
- val p = outputsMerged.toDouble / totalOutputs
- val mean = (sum + 1 - p) / p
- val variance = (sum + 1) * (1 - p) / (p * p)
- val stdev = math.sqrt(variance)
- val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
- val low = mean - confFactor * stdev
- val high = mean + confFactor * stdev
- new BoundedDouble(mean, confidence, low, high)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala
deleted file mode 100644
index ebe2e5a..0000000
--- a/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import java.util.{HashMap => JHashMap}
-import java.util.{Map => JMap}
-
-import scala.collection.Map
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import cern.jet.stat.Probability
-
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-
-/**
- * An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
- */
-private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
- extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
-
- var outputsMerged = 0
- var sums = new OLMap[T] // Sum of counts for each key
-
- override def merge(outputId: Int, taskResult: OLMap[T]) {
- outputsMerged += 1
- val iter = taskResult.object2LongEntrySet.fastIterator()
- while (iter.hasNext) {
- val entry = iter.next()
- sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue)
- }
- }
-
- override def currentResult(): Map[T, BoundedDouble] = {
- if (outputsMerged == totalOutputs) {
- val result = new JHashMap[T, BoundedDouble](sums.size)
- val iter = sums.object2LongEntrySet.fastIterator()
- while (iter.hasNext) {
- val entry = iter.next()
- val sum = entry.getLongValue()
- result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
- }
- result
- } else if (outputsMerged == 0) {
- new HashMap[T, BoundedDouble]
- } else {
- val p = outputsMerged.toDouble / totalOutputs
- val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
- val result = new JHashMap[T, BoundedDouble](sums.size)
- val iter = sums.object2LongEntrySet.fastIterator()
- while (iter.hasNext) {
- val entry = iter.next()
- val sum = entry.getLongValue
- val mean = (sum + 1 - p) / p
- val variance = (sum + 1) * (1 - p) / (p * p)
- val stdev = math.sqrt(variance)
- val low = mean - confFactor * stdev
- val high = mean + confFactor * stdev
- result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
- }
- result
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala b/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala
deleted file mode 100644
index 2dadbbd..0000000
--- a/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import java.util.{HashMap => JHashMap}
-import java.util.{Map => JMap}
-
-import scala.collection.mutable.HashMap
-import scala.collection.Map
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import spark.util.StatCounter
-
-/**
- * An ApproximateEvaluator for means by key. Returns a map of key to confidence interval.
- */
-private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Double)
- extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
-
- var outputsMerged = 0
- var sums = new JHashMap[T, StatCounter] // Sum of counts for each key
-
- override def merge(outputId: Int, taskResult: JHashMap[T, StatCounter]) {
- outputsMerged += 1
- val iter = taskResult.entrySet.iterator()
- while (iter.hasNext) {
- val entry = iter.next()
- val old = sums.get(entry.getKey)
- if (old != null) {
- old.merge(entry.getValue)
- } else {
- sums.put(entry.getKey, entry.getValue)
- }
- }
- }
-
- override def currentResult(): Map[T, BoundedDouble] = {
- if (outputsMerged == totalOutputs) {
- val result = new JHashMap[T, BoundedDouble](sums.size)
- val iter = sums.entrySet.iterator()
- while (iter.hasNext) {
- val entry = iter.next()
- val mean = entry.getValue.mean
- result(entry.getKey) = new BoundedDouble(mean, 1.0, mean, mean)
- }
- result
- } else if (outputsMerged == 0) {
- new HashMap[T, BoundedDouble]
- } else {
- val p = outputsMerged.toDouble / totalOutputs
- val studentTCacher = new StudentTCacher(confidence)
- val result = new JHashMap[T, BoundedDouble](sums.size)
- val iter = sums.entrySet.iterator()
- while (iter.hasNext) {
- val entry = iter.next()
- val counter = entry.getValue
- val mean = counter.mean
- val stdev = math.sqrt(counter.sampleVariance / counter.count)
- val confFactor = studentTCacher.get(counter.count)
- val low = mean - confFactor * stdev
- val high = mean + confFactor * stdev
- result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
- }
- result
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala b/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala
deleted file mode 100644
index ae2b63f..0000000
--- a/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import java.util.{HashMap => JHashMap}
-import java.util.{Map => JMap}
-
-import scala.collection.mutable.HashMap
-import scala.collection.Map
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import spark.util.StatCounter
-
-/**
- * An ApproximateEvaluator for sums by key. Returns a map of key to confidence interval.
- */
-private[spark] class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Double)
- extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
-
- var outputsMerged = 0
- var sums = new JHashMap[T, StatCounter] // Sum of counts for each key
-
- override def merge(outputId: Int, taskResult: JHashMap[T, StatCounter]) {
- outputsMerged += 1
- val iter = taskResult.entrySet.iterator()
- while (iter.hasNext) {
- val entry = iter.next()
- val old = sums.get(entry.getKey)
- if (old != null) {
- old.merge(entry.getValue)
- } else {
- sums.put(entry.getKey, entry.getValue)
- }
- }
- }
-
- override def currentResult(): Map[T, BoundedDouble] = {
- if (outputsMerged == totalOutputs) {
- val result = new JHashMap[T, BoundedDouble](sums.size)
- val iter = sums.entrySet.iterator()
- while (iter.hasNext) {
- val entry = iter.next()
- val sum = entry.getValue.sum
- result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
- }
- result
- } else if (outputsMerged == 0) {
- new HashMap[T, BoundedDouble]
- } else {
- val p = outputsMerged.toDouble / totalOutputs
- val studentTCacher = new StudentTCacher(confidence)
- val result = new JHashMap[T, BoundedDouble](sums.size)
- val iter = sums.entrySet.iterator()
- while (iter.hasNext) {
- val entry = iter.next()
- val counter = entry.getValue
- val meanEstimate = counter.mean
- val meanVar = counter.sampleVariance / counter.count
- val countEstimate = (counter.count + 1 - p) / p
- val countVar = (counter.count + 1) * (1 - p) / (p * p)
- val sumEstimate = meanEstimate * countEstimate
- val sumVar = (meanEstimate * meanEstimate * countVar) +
- (countEstimate * countEstimate * meanVar) +
- (meanVar * countVar)
- val sumStdev = math.sqrt(sumVar)
- val confFactor = studentTCacher.get(counter.count)
- val low = sumEstimate - confFactor * sumStdev
- val high = sumEstimate + confFactor * sumStdev
- result(entry.getKey) = new BoundedDouble(sumEstimate, confidence, low, high)
- }
- result
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/MeanEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/MeanEvaluator.scala b/core/src/main/scala/spark/partial/MeanEvaluator.scala
deleted file mode 100644
index 5ddcad7..0000000
--- a/core/src/main/scala/spark/partial/MeanEvaluator.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import cern.jet.stat.Probability
-
-import spark.util.StatCounter
-
-/**
- * An ApproximateEvaluator for means.
- */
-private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double)
- extends ApproximateEvaluator[StatCounter, BoundedDouble] {
-
- var outputsMerged = 0
- var counter = new StatCounter
-
- override def merge(outputId: Int, taskResult: StatCounter) {
- outputsMerged += 1
- counter.merge(taskResult)
- }
-
- override def currentResult(): BoundedDouble = {
- if (outputsMerged == totalOutputs) {
- new BoundedDouble(counter.mean, 1.0, counter.mean, counter.mean)
- } else if (outputsMerged == 0) {
- new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
- } else {
- val mean = counter.mean
- val stdev = math.sqrt(counter.sampleVariance / counter.count)
- val confFactor = {
- if (counter.count > 100) {
- Probability.normalInverse(1 - (1 - confidence) / 2)
- } else {
- Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt)
- }
- }
- val low = mean - confFactor * stdev
- val high = mean + confFactor * stdev
- new BoundedDouble(mean, confidence, low, high)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/PartialResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/PartialResult.scala b/core/src/main/scala/spark/partial/PartialResult.scala
deleted file mode 100644
index 922a9f9..0000000
--- a/core/src/main/scala/spark/partial/PartialResult.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-class PartialResult[R](initialVal: R, isFinal: Boolean) {
- private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None
- private var failure: Option[Exception] = None
- private var completionHandler: Option[R => Unit] = None
- private var failureHandler: Option[Exception => Unit] = None
-
- def initialValue: R = initialVal
-
- def isInitialValueFinal: Boolean = isFinal
-
- /**
- * Blocking method to wait for and return the final value.
- */
- def getFinalValue(): R = synchronized {
- while (finalValue == None && failure == None) {
- this.wait()
- }
- if (finalValue != None) {
- return finalValue.get
- } else {
- throw failure.get
- }
- }
-
- /**
- * Set a handler to be called when this PartialResult completes. Only one completion handler
- * is supported per PartialResult.
- */
- def onComplete(handler: R => Unit): PartialResult[R] = synchronized {
- if (completionHandler != None) {
- throw new UnsupportedOperationException("onComplete cannot be called twice")
- }
- completionHandler = Some(handler)
- if (finalValue != None) {
- // We already have a final value, so let's call the handler
- handler(finalValue.get)
- }
- return this
- }
-
- /**
- * Set a handler to be called if this PartialResult's job fails. Only one failure handler
- * is supported per PartialResult.
- */
- def onFail(handler: Exception => Unit) {
- synchronized {
- if (failureHandler != None) {
- throw new UnsupportedOperationException("onFail cannot be called twice")
- }
- failureHandler = Some(handler)
- if (failure != None) {
- // We already have a failure, so let's call the handler
- handler(failure.get)
- }
- }
- }
-
- /**
- * Transform this PartialResult into a PartialResult of type T.
- */
- def map[T](f: R => T) : PartialResult[T] = {
- new PartialResult[T](f(initialVal), isFinal) {
- override def getFinalValue() : T = synchronized {
- f(PartialResult.this.getFinalValue())
- }
- override def onComplete(handler: T => Unit): PartialResult[T] = synchronized {
- PartialResult.this.onComplete(handler.compose(f)).map(f)
- }
- override def onFail(handler: Exception => Unit) {
- synchronized {
- PartialResult.this.onFail(handler)
- }
- }
- override def toString : String = synchronized {
- PartialResult.this.getFinalValueInternal() match {
- case Some(value) => "(final: " + f(value) + ")"
- case None => "(partial: " + initialValue + ")"
- }
- }
- def getFinalValueInternal() = PartialResult.this.getFinalValueInternal().map(f)
- }
- }
-
- private[spark] def setFinalValue(value: R) {
- synchronized {
- if (finalValue != None) {
- throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult")
- }
- finalValue = Some(value)
- // Call the completion handler if it was set
- completionHandler.foreach(h => h(value))
- // Notify any threads that may be calling getFinalValue()
- this.notifyAll()
- }
- }
-
- private def getFinalValueInternal() = finalValue
-
- private[spark] def setFailure(exception: Exception) {
- synchronized {
- if (failure != None) {
- throw new UnsupportedOperationException("setFailure called twice on a PartialResult")
- }
- failure = Some(exception)
- // Call the failure handler if it was set
- failureHandler.foreach(h => h(exception))
- // Notify any threads that may be calling getFinalValue()
- this.notifyAll()
- }
- }
-
- override def toString: String = synchronized {
- finalValue match {
- case Some(value) => "(final: " + value + ")"
- case None => "(partial: " + initialValue + ")"
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/StudentTCacher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/StudentTCacher.scala b/core/src/main/scala/spark/partial/StudentTCacher.scala
deleted file mode 100644
index f3bb987..0000000
--- a/core/src/main/scala/spark/partial/StudentTCacher.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import cern.jet.stat.Probability
-
-/**
- * A utility class for caching Student's T distribution values for a given confidence level
- * and various sample sizes. This is used by the MeanEvaluator to efficiently calculate
- * confidence intervals for many keys.
- */
-private[spark] class StudentTCacher(confidence: Double) {
- val NORMAL_APPROX_SAMPLE_SIZE = 100 // For samples bigger than this, use Gaussian approximation
- val normalApprox = Probability.normalInverse(1 - (1 - confidence) / 2)
- val cache = Array.fill[Double](NORMAL_APPROX_SAMPLE_SIZE)(-1.0)
-
- def get(sampleSize: Long): Double = {
- if (sampleSize >= NORMAL_APPROX_SAMPLE_SIZE) {
- normalApprox
- } else {
- val size = sampleSize.toInt
- if (cache(size) < 0) {
- cache(size) = Probability.studentTInverse(1 - confidence, size - 1)
- }
- cache(size)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/SumEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/SumEvaluator.scala b/core/src/main/scala/spark/partial/SumEvaluator.scala
deleted file mode 100644
index 4083abe..0000000
--- a/core/src/main/scala/spark/partial/SumEvaluator.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import cern.jet.stat.Probability
-
-import spark.util.StatCounter
-
-/**
- * An ApproximateEvaluator for sums. It estimates the mean and the cont and multiplies them
- * together, then uses the formula for the variance of two independent random variables to get
- * a variance for the result and compute a confidence interval.
- */
-private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double)
- extends ApproximateEvaluator[StatCounter, BoundedDouble] {
-
- var outputsMerged = 0
- var counter = new StatCounter
-
- override def merge(outputId: Int, taskResult: StatCounter) {
- outputsMerged += 1
- counter.merge(taskResult)
- }
-
- override def currentResult(): BoundedDouble = {
- if (outputsMerged == totalOutputs) {
- new BoundedDouble(counter.sum, 1.0, counter.sum, counter.sum)
- } else if (outputsMerged == 0) {
- new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
- } else {
- val p = outputsMerged.toDouble / totalOutputs
- val meanEstimate = counter.mean
- val meanVar = counter.sampleVariance / counter.count
- val countEstimate = (counter.count + 1 - p) / p
- val countVar = (counter.count + 1) * (1 - p) / (p * p)
- val sumEstimate = meanEstimate * countEstimate
- val sumVar = (meanEstimate * meanEstimate * countVar) +
- (countEstimate * countEstimate * meanVar) +
- (meanVar * countVar)
- val sumStdev = math.sqrt(sumVar)
- val confFactor = {
- if (counter.count > 100) {
- Probability.normalInverse(1 - (1 - confidence) / 2)
- } else {
- Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt)
- }
- }
- val low = sumEstimate - confFactor * sumStdev
- val high = sumEstimate + confFactor * sumStdev
- new BoundedDouble(sumEstimate, confidence, low, high)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
deleted file mode 100644
index 0380058..0000000
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
-import spark.storage.BlockManager
-
-private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
- val index = idx
-}
-
-private[spark]
-class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
- extends RDD[T](sc, Nil) {
-
- @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
-
- override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
- new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
- }).toArray
-
- override def compute(split: Partition, context: TaskContext): Iterator[T] = {
- val blockManager = SparkEnv.get.blockManager
- val blockId = split.asInstanceOf[BlockRDDPartition].blockId
- blockManager.get(blockId) match {
- case Some(block) => block.asInstanceOf[Iterator[T]]
- case None =>
- throw new Exception("Could not compute split, block " + blockId + " not found")
- }
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- locations_(split.asInstanceOf[BlockRDDPartition].blockId)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/CartesianRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
deleted file mode 100644
index 91b3e69..0000000
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import java.io.{ObjectOutputStream, IOException}
-import spark._
-
-
-private[spark]
-class CartesianPartition(
- idx: Int,
- @transient rdd1: RDD[_],
- @transient rdd2: RDD[_],
- s1Index: Int,
- s2Index: Int
- ) extends Partition {
- var s1 = rdd1.partitions(s1Index)
- var s2 = rdd2.partitions(s2Index)
- override val index: Int = idx
-
- @throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
- // Update the reference to parent split at the time of task serialization
- s1 = rdd1.partitions(s1Index)
- s2 = rdd2.partitions(s2Index)
- oos.defaultWriteObject()
- }
-}
-
-private[spark]
-class CartesianRDD[T: ClassManifest, U:ClassManifest](
- sc: SparkContext,
- var rdd1 : RDD[T],
- var rdd2 : RDD[U])
- extends RDD[Pair[T, U]](sc, Nil)
- with Serializable {
-
- val numPartitionsInRdd2 = rdd2.partitions.size
-
- override def getPartitions: Array[Partition] = {
- // create the cross product split
- val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
- for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
- val idx = s1.index * numPartitionsInRdd2 + s2.index
- array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
- }
- array
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val currSplit = split.asInstanceOf[CartesianPartition]
- (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
- }
-
- override def compute(split: Partition, context: TaskContext) = {
- val currSplit = split.asInstanceOf[CartesianPartition]
- for (x <- rdd1.iterator(currSplit.s1, context);
- y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
- }
-
- override def getDependencies: Seq[Dependency[_]] = List(
- new NarrowDependency(rdd1) {
- def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
- },
- new NarrowDependency(rdd2) {
- def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
- }
- )
-
- override def clearDependencies() {
- super.clearDependencies()
- rdd1 = null
- rdd2 = null
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
deleted file mode 100644
index 1ad5fe6..0000000
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark._
-import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.{NullWritable, BytesWritable}
-import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.fs.Path
-import java.io.{File, IOException, EOFException}
-import java.text.NumberFormat
-
-private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
-
-/**
- * This RDD represents a RDD checkpoint file (similar to HadoopRDD).
- */
-private[spark]
-class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
- extends RDD[T](sc, Nil) {
-
- @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
-
- override def getPartitions: Array[Partition] = {
- val cpath = new Path(checkpointPath)
- val numPartitions =
- // listStatus can throw exception if path does not exist.
- if (fs.exists(cpath)) {
- val dirContents = fs.listStatus(cpath)
- val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
- val numPart = partitionFiles.size
- if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
- ! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
- throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
- }
- numPart
- } else 0
-
- Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i))
- }
-
- checkpointData = Some(new RDDCheckpointData[T](this))
- checkpointData.get.cpFile = Some(checkpointPath)
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val status = fs.getFileStatus(new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)))
- val locations = fs.getFileBlockLocations(status, 0, status.getLen)
- locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
- }
-
- override def compute(split: Partition, context: TaskContext): Iterator[T] = {
- val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
- CheckpointRDD.readFromFile(file, context)
- }
-
- override def checkpoint() {
- // Do nothing. CheckpointRDD should not be checkpointed.
- }
-}
-
-private[spark] object CheckpointRDD extends Logging {
-
- def splitIdToFile(splitId: Int): String = {
- "part-%05d".format(splitId)
- }
-
- def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
- val env = SparkEnv.get
- val outputDir = new Path(path)
- val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
-
- val finalOutputName = splitIdToFile(ctx.splitId)
- val finalOutputPath = new Path(outputDir, finalOutputName)
- val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId)
-
- if (fs.exists(tempOutputPath)) {
- throw new IOException("Checkpoint failed: temporary path " +
- tempOutputPath + " already exists")
- }
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
-
- val fileOutputStream = if (blockSize < 0) {
- fs.create(tempOutputPath, false, bufferSize)
- } else {
- // This is mainly for testing purpose
- fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
- }
- val serializer = env.serializer.newInstance()
- val serializeStream = serializer.serializeStream(fileOutputStream)
- serializeStream.writeAll(iterator)
- serializeStream.close()
-
- if (!fs.rename(tempOutputPath, finalOutputPath)) {
- if (!fs.exists(finalOutputPath)) {
- logInfo("Deleting tempOutputPath " + tempOutputPath)
- fs.delete(tempOutputPath, false)
- throw new IOException("Checkpoint failed: failed to save output of task: "
- + ctx.attemptId + " and final output path does not exist")
- } else {
- // Some other copy of this task must've finished before us and renamed it
- logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
- fs.delete(tempOutputPath, false)
- }
- }
- }
-
- def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
- val env = SparkEnv.get
- val fs = path.getFileSystem(env.hadoop.newConfiguration())
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- val fileInputStream = fs.open(path, bufferSize)
- val serializer = env.serializer.newInstance()
- val deserializeStream = serializer.deserializeStream(fileInputStream)
-
- // Register an on-task-completion callback to close the input stream.
- context.addOnCompleteCallback(() => deserializeStream.close())
-
- deserializeStream.asIterator.asInstanceOf[Iterator[T]]
- }
-
- // Test whether CheckpointRDD generate expected number of partitions despite
- // each split file having multiple blocks. This needs to be run on a
- // cluster (mesos or standalone) using HDFS.
- def main(args: Array[String]) {
- import spark._
-
- val Array(cluster, hdfsPath) = args
- val env = SparkEnv.get
- val sc = new SparkContext(cluster, "CheckpointRDD Test")
- val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
- val path = new Path(hdfsPath, "temp")
- val fs = path.getFileSystem(env.hadoop.newConfiguration())
- sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
- val cpRDD = new CheckpointRDD[Int](sc, path.toString)
- assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
- assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
- fs.delete(path, true)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
deleted file mode 100644
index 01b6c23..0000000
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import java.io.{ObjectOutputStream, IOException}
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.JavaConversions
-import scala.collection.mutable.ArrayBuffer
-
-import spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
-import spark.{Dependency, OneToOneDependency, ShuffleDependency}
-
-
-private[spark] sealed trait CoGroupSplitDep extends Serializable
-
-private[spark] case class NarrowCoGroupSplitDep(
- rdd: RDD[_],
- splitIndex: Int,
- var split: Partition
- ) extends CoGroupSplitDep {
-
- @throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
- // Update the reference to parent split at the time of task serialization
- split = rdd.partitions(splitIndex)
- oos.defaultWriteObject()
- }
-}
-
-private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
-
-private[spark]
-class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
- extends Partition with Serializable {
- override val index: Int = idx
- override def hashCode(): Int = idx
-}
-
-
-/**
- * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
- * tuple with the list of values for that key.
- *
- * @param rdds parent RDDs.
- * @param part partitioner used to partition the shuffle output.
- */
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
- extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
-
- private var serializerClass: String = null
-
- def setSerializer(cls: String): CoGroupedRDD[K] = {
- serializerClass = cls
- this
- }
-
- override def getDependencies: Seq[Dependency[_]] = {
- rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
- if (rdd.partitioner == Some(part)) {
- logDebug("Adding one-to-one dependency with " + rdd)
- new OneToOneDependency(rdd)
- } else {
- logDebug("Adding shuffle dependency with " + rdd)
- new ShuffleDependency[Any, Any](rdd, part, serializerClass)
- }
- }
- }
-
- override def getPartitions: Array[Partition] = {
- val array = new Array[Partition](part.numPartitions)
- for (i <- 0 until array.size) {
- // Each CoGroupPartition will have a dependency per contributing RDD
- array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
- // Assume each RDD contributed a single dependency, and get it
- dependencies(j) match {
- case s: ShuffleDependency[_, _] =>
- new ShuffleCoGroupSplitDep(s.shuffleId)
- case _ =>
- new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
- }
- }.toArray)
- }
- array
- }
-
- override val partitioner = Some(part)
-
- override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
- val split = s.asInstanceOf[CoGroupPartition]
- val numRdds = split.deps.size
- // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
- val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
-
- def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
- val seq = map.get(k)
- if (seq != null) {
- seq
- } else {
- val seq = Array.fill(numRdds)(new ArrayBuffer[Any])
- map.put(k, seq)
- seq
- }
- }
-
- val ser = SparkEnv.get.serializerManager.get(serializerClass)
- for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
- case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
- // Read them from the parent
- rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv =>
- getSeq(kv._1)(depNum) += kv._2
- }
- }
- case ShuffleCoGroupSplitDep(shuffleId) => {
- // Read map outputs of shuffle
- val fetcher = SparkEnv.get.shuffleFetcher
- fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
- kv => getSeq(kv._1)(depNum) += kv._2
- }
- }
- }
- JavaConversions.mapAsScalaMap(map).iterator
- }
-
- override def clearDependencies() {
- super.clearDependencies()
- rdds = null
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
deleted file mode 100644
index e612d02..0000000
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark._
-import java.io.{ObjectOutputStream, IOException}
-import scala.collection.mutable
-import scala.Some
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Class that captures a coalesced RDD by essentially keeping track of parent partitions
- * @param index of this coalesced partition
- * @param rdd which it belongs to
- * @param parentsIndices list of indices in the parent that have been coalesced into this partition
- * @param preferredLocation the preferred location for this partition
- */
-case class CoalescedRDDPartition(
- index: Int,
- @transient rdd: RDD[_],
- parentsIndices: Array[Int],
- @transient preferredLocation: String = ""
- ) extends Partition {
- var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
-
- @throws(classOf[IOException])
- private def writeObject(oos: ObjectOutputStream) {
- // Update the reference to parent partition at the time of task serialization
- parents = parentsIndices.map(rdd.partitions(_))
- oos.defaultWriteObject()
- }
-
- /**
- * Computes how many of the parents partitions have getPreferredLocation
- * as one of their preferredLocations
- * @return locality of this coalesced partition between 0 and 1
- */
- def localFraction: Double = {
- val loc = parents.count(p =>
- rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))
-
- if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
- }
-}
-
-/**
- * Represents a coalesced RDD that has fewer partitions than its parent RDD
- * This class uses the PartitionCoalescer class to find a good partitioning of the parent RDD
- * so that each new partition has roughly the same number of parent partitions and that
- * the preferred location of each new partition overlaps with as many preferred locations of its
- * parent partitions
- * @param prev RDD to be coalesced
- * @param maxPartitions number of desired partitions in the coalesced RDD
- * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
- */
-class CoalescedRDD[T: ClassManifest](
- @transient var prev: RDD[T],
- maxPartitions: Int,
- balanceSlack: Double = 0.10)
- extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies
-
- override def getPartitions: Array[Partition] = {
- val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
-
- pc.run().zipWithIndex.map {
- case (pg, i) =>
- val ids = pg.arr.map(_.index).toArray
- new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
- }
- }
-
- override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
- partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
- firstParent[T].iterator(parentPartition, context)
- }
- }
-
- override def getDependencies: Seq[Dependency[_]] = {
- Seq(new NarrowDependency(prev) {
- def getParents(id: Int): Seq[Int] =
- partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
- })
- }
-
- override def clearDependencies() {
- super.clearDependencies()
- prev = null
- }
-
- /**
- * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
- * then the preferred machine will be one which most parent splits prefer too.
- * @param partition
- * @return the machine most preferred by split
- */
- override def getPreferredLocations(partition: Partition): Seq[String] = {
- List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
- }
-}
-
-/**
- * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
- * this RDD computes one or more of the parent ones. It will produce exactly `maxPartitions` if the
- * parent had more than maxPartitions, or fewer if the parent had fewer.
- *
- * This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
- * or to avoid having a large number of small tasks when processing a directory with many files.
- *
- * If there is no locality information (no preferredLocations) in the parent, then the coalescing
- * is very simple: chunk parents that are close in the Array in chunks.
- * If there is locality information, it proceeds to pack them with the following four goals:
- *
- * (1) Balance the groups so they roughly have the same number of parent partitions
- * (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer
- * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard)
- * (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine
- *
- * Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000.
- * We assume the final number of desired partitions is small, e.g. less than 1000.
- *
- * The algorithm tries to assign unique preferred machines to each partition. If the number of
- * desired partitions is greater than the number of preferred machines (can happen), it needs to
- * start picking duplicate preferred machines. This is determined using coupon collector estimation
- * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist:
- * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two
- * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions
- * according to locality. (contact alig for questions)
- *
- */
-
-private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
-
- def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
- def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
- if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
-
- val rnd = new scala.util.Random(7919) // keep this class deterministic
-
- // each element of groupArr represents one coalesced partition
- val groupArr = ArrayBuffer[PartitionGroup]()
-
- // hash used to check whether some machine is already in groupArr
- val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]()
-
- // hash used for the first maxPartitions (to avoid duplicates)
- val initialHash = mutable.Set[Partition]()
-
- // determines the tradeoff between load-balancing the partitions sizes and their locality
- // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
- val slack = (balanceSlack * prev.partitions.size).toInt
-
- var noLocality = true // if true if no preferredLocations exists for parent RDD
-
- // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
- def currPrefLocs(part: Partition): Seq[String] = {
- prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
- }
-
- // this class just keeps iterating and rotating infinitely over the partitions of the RDD
- // next() returns the next preferred machine that a partition is replicated on
- // the rotator first goes through the first replica copy of each partition, then second, third
- // the iterators return type is a tuple: (replicaString, partition)
- class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] {
-
- var it: Iterator[(String, Partition)] = resetIterator()
-
- override val isEmpty = !it.hasNext
-
- // initializes/resets to start iterating from the beginning
- def resetIterator() = {
- val iterators = (0 to 2).map( x =>
- prev.partitions.iterator.flatMap(p => {
- if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
- } )
- )
- iterators.reduceLeft((x, y) => x ++ y)
- }
-
- // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
- def hasNext(): Boolean = { !isEmpty }
-
- // return the next preferredLocation of some partition of the RDD
- def next(): (String, Partition) = {
- if (it.hasNext)
- it.next()
- else {
- it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
- it.next()
- }
- }
- }
-
- /**
- * Sorts and gets the least element of the list associated with key in groupHash
- * The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
- * @param key string representing a partitioned group on preferred machine key
- * @return Option of PartitionGroup that has least elements for key
- */
- def getLeastGroupHash(key: String): Option[PartitionGroup] = {
- groupHash.get(key).map(_.sortWith(compare).head)
- }
-
- def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
- if (!initialHash.contains(part)) {
- pgroup.arr += part // already assign this element
- initialHash += part // needed to avoid assigning partitions to multiple buckets
- true
- } else { false }
- }
-
- /**
- * Initializes targetLen partition groups and assigns a preferredLocation
- * This uses coupon collector to estimate how many preferredLocations it must rotate through
- * until it has seen most of the preferred locations (2 * n log(n))
- * @param targetLen
- */
- def setupGroups(targetLen: Int) {
- val rotIt = new LocationIterator(prev)
-
- // deal with empty case, just create targetLen partition groups with no preferred location
- if (!rotIt.hasNext()) {
- (1 to targetLen).foreach(x => groupArr += PartitionGroup())
- return
- }
-
- noLocality = false
-
- // number of iterations needed to be certain that we've seen most preferred locations
- val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt
- var numCreated = 0
- var tries = 0
-
- // rotate through until either targetLen unique/distinct preferred locations have been created
- // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations,
- // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
- while (numCreated < targetLen && tries < expectedCoupons2) {
- tries += 1
- val (nxt_replica, nxt_part) = rotIt.next()
- if (!groupHash.contains(nxt_replica)) {
- val pgroup = PartitionGroup(nxt_replica)
- groupArr += pgroup
- addPartToPGroup(nxt_part, pgroup)
- groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
- numCreated += 1
- }
- }
-
- while (numCreated < targetLen) { // if we don't have enough partition groups, create duplicates
- var (nxt_replica, nxt_part) = rotIt.next()
- val pgroup = PartitionGroup(nxt_replica)
- groupArr += pgroup
- groupHash.get(nxt_replica).get += pgroup
- var tries = 0
- while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
- nxt_part = rotIt.next()._2
- tries += 1
- }
- numCreated += 1
- }
-
- }
-
- /**
- * Takes a parent RDD partition and decides which of the partition groups to put it in
- * Takes locality into account, but also uses power of 2 choices to load balance
- * It strikes a balance between the two use the balanceSlack variable
- * @param p partition (ball to be thrown)
- * @return partition group (bin to be put in)
- */
- def pickBin(p: Partition): PartitionGroup = {
- val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
- val prefPart = if (pref == Nil) None else pref.head
-
- val r1 = rnd.nextInt(groupArr.size)
- val r2 = rnd.nextInt(groupArr.size)
- val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
- if (prefPart== None) // if no preferred locations, just use basic power of two
- return minPowerOfTwo
-
- val prefPartActual = prefPart.get
-
- if (minPowerOfTwo.size + slack <= prefPartActual.size) // more imbalance than the slack allows
- return minPowerOfTwo // prefer balance over locality
- else {
- return prefPartActual // prefer locality over balance
- }
- }
-
- def throwBalls() {
- if (noLocality) { // no preferredLocations in parent RDD, no randomization needed
- if (maxPartitions > groupArr.size) { // just return prev.partitions
- for ((p,i) <- prev.partitions.zipWithIndex) {
- groupArr(i).arr += p
- }
- } else { // no locality available, then simply split partitions based on positions in array
- for(i <- 0 until maxPartitions) {
- val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
- val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
- (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
- }
- }
- } else {
- for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
- pickBin(p).arr += p
- }
- }
- }
-
- def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
-
- /**
- * Runs the packing algorithm and returns an array of PartitionGroups that if possible are
- * load balanced and grouped by locality
- * @return array of partition groups
- */
- def run(): Array[PartitionGroup] = {
- setupGroups(math.min(prev.partitions.length, maxPartitions)) // setup the groups (bins)
- throwBalls() // assign partitions (balls) to each group (bins)
- getPartitions
- }
-}
-
-private[spark] case class PartitionGroup(prefLoc: String = "") {
- var arr = mutable.ArrayBuffer[Partition]()
-
- def size = arr.size
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/EmptyRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/EmptyRDD.scala b/core/src/main/scala/spark/rdd/EmptyRDD.scala
deleted file mode 100644
index d7d4db5..0000000
--- a/core/src/main/scala/spark/rdd/EmptyRDD.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
-
-
-/**
- * An RDD that is empty, i.e. has no element in it.
- */
-class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
-
- override def getPartitions: Array[Partition] = Array.empty
-
- override def compute(split: Partition, context: TaskContext): Iterator[T] = {
- throw new UnsupportedOperationException("empty RDD")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/FilteredRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala
deleted file mode 100644
index 783508c..0000000
--- a/core/src/main/scala/spark/rdd/FilteredRDD.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{OneToOneDependency, RDD, Partition, TaskContext}
-
-private[spark] class FilteredRDD[T: ClassManifest](
- prev: RDD[T],
- f: T => Boolean)
- extends RDD[T](prev) {
-
- override def getPartitions: Array[Partition] = firstParent[T].partitions
-
- override val partitioner = prev.partitioner // Since filter cannot change a partition's keys
-
- override def compute(split: Partition, context: TaskContext) =
- firstParent[T].iterator(split, context).filter(f)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
deleted file mode 100644
index ed75eac..0000000
--- a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, Partition, TaskContext}
-
-
-private[spark]
-class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: T => TraversableOnce[U])
- extends RDD[U](prev) {
-
- override def getPartitions: Array[Partition] = firstParent[T].partitions
-
- override def compute(split: Partition, context: TaskContext) =
- firstParent[T].iterator(split, context).flatMap(f)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
deleted file mode 100644
index a6bdce8..0000000
--- a/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{TaskContext, Partition, RDD}
-
-
-private[spark]
-class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U])
- extends RDD[(K, U)](prev) {
-
- override def getPartitions = firstParent[Product2[K, V]].partitions
-
- override val partitioner = firstParent[Product2[K, V]].partitioner
-
- override def compute(split: Partition, context: TaskContext) = {
- firstParent[Product2[K, V]].iterator(split, context).flatMap { case Product2(k, v) =>
- f(v).map(x => (k, x))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/GlommedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala
deleted file mode 100644
index 1573f8a..0000000
--- a/core/src/main/scala/spark/rdd/GlommedRDD.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, Partition, TaskContext}
-
-private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
- extends RDD[Array[T]](prev) {
-
- override def getPartitions: Array[Partition] = firstParent[T].partitions
-
- override def compute(split: Partition, context: TaskContext) =
- Array(firstParent[T].iterator(split, context).toArray).iterator
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
deleted file mode 100644
index e512423..0000000
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import java.io.EOFException
-import java.util.NoSuchElementException
-
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.FileInputFormat
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.InputSplit
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapred.RecordReader
-import org.apache.hadoop.mapred.Reporter
-import org.apache.hadoop.util.ReflectionUtils
-
-import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext}
-import spark.util.NextIterator
-import org.apache.hadoop.conf.{Configuration, Configurable}
-
-
-/**
- * A Spark split class that wraps around a Hadoop InputSplit.
- */
-private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
- extends Partition {
-
- val inputSplit = new SerializableWritable[InputSplit](s)
-
- override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
-
- override val index: Int = idx
-}
-
-/**
- * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
- * system, or S3, tables in HBase, etc).
- */
-class HadoopRDD[K, V](
- sc: SparkContext,
- @transient conf: JobConf,
- inputFormatClass: Class[_ <: InputFormat[K, V]],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int)
- extends RDD[(K, V)](sc, Nil) with Logging {
-
- // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
- private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
-
- override def getPartitions: Array[Partition] = {
- val env = SparkEnv.get
- env.hadoop.addCredentials(conf)
- val inputFormat = createInputFormat(conf)
- if (inputFormat.isInstanceOf[Configurable]) {
- inputFormat.asInstanceOf[Configurable].setConf(conf)
- }
- val inputSplits = inputFormat.getSplits(conf, minSplits)
- val array = new Array[Partition](inputSplits.size)
- for (i <- 0 until inputSplits.size) {
- array(i) = new HadoopPartition(id, i, inputSplits(i))
- }
- array
- }
-
- def createInputFormat(conf: JobConf): InputFormat[K, V] = {
- ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
- .asInstanceOf[InputFormat[K, V]]
- }
-
- override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
- val split = theSplit.asInstanceOf[HadoopPartition]
- logInfo("Input split: " + split.inputSplit)
- var reader: RecordReader[K, V] = null
-
- val conf = confBroadcast.value.value
- val fmt = createInputFormat(conf)
- if (fmt.isInstanceOf[Configurable]) {
- fmt.asInstanceOf[Configurable].setConf(conf)
- }
- reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
-
- // Register an on-task-completion callback to close the input stream.
- context.addOnCompleteCallback{ () => closeIfNeeded() }
-
- val key: K = reader.createKey()
- val value: V = reader.createValue()
-
- override def getNext() = {
- try {
- finished = !reader.next(key, value)
- } catch {
- case eof: EOFException =>
- finished = true
- }
- (key, value)
- }
-
- override def close() {
- try {
- reader.close()
- } catch {
- case e: Exception => logWarning("Exception in RecordReader.close()", e)
- }
- }
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- // TODO: Filtering out "localhost" in case of file:// URLs
- val hadoopSplit = split.asInstanceOf[HadoopPartition]
- hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
- }
-
- override def checkpoint() {
- // Do nothing. Hadoop RDD should not be checkpointed.
- }
-
- def getConf: Configuration = confBroadcast.value.value
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/JdbcRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala
deleted file mode 100644
index 5913243..0000000
--- a/core/src/main/scala/spark/rdd/JdbcRDD.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import java.sql.{Connection, ResultSet}
-
-import spark.{Logging, Partition, RDD, SparkContext, TaskContext}
-import spark.util.NextIterator
-
-private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
- override def index = idx
-}
-
-/**
- * An RDD that executes an SQL query on a JDBC connection and reads results.
- * For usage example, see test case JdbcRDDSuite.
- *
- * @param getConnection a function that returns an open Connection.
- * The RDD takes care of closing the connection.
- * @param sql the text of the query.
- * The query must contain two ? placeholders for parameters used to partition the results.
- * E.g. "select title, author from books where ? <= id and id <= ?"
- * @param lowerBound the minimum value of the first placeholder
- * @param upperBound the maximum value of the second placeholder
- * The lower and upper bounds are inclusive.
- * @param numPartitions the number of partitions.
- * Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
- * the query would be executed twice, once with (1, 10) and once with (11, 20)
- * @param mapRow a function from a ResultSet to a single row of the desired result type(s).
- * This should only call getInt, getString, etc; the RDD takes care of calling next.
- * The default maps a ResultSet to an array of Object.
- */
-class JdbcRDD[T: ClassManifest](
- sc: SparkContext,
- getConnection: () => Connection,
- sql: String,
- lowerBound: Long,
- upperBound: Long,
- numPartitions: Int,
- mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
- extends RDD[T](sc, Nil) with Logging {
-
- override def getPartitions: Array[Partition] = {
- // bounds are inclusive, hence the + 1 here and - 1 on end
- val length = 1 + upperBound - lowerBound
- (0 until numPartitions).map(i => {
- val start = lowerBound + ((i * length) / numPartitions).toLong
- val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
- new JdbcPartition(i, start, end)
- }).toArray
- }
-
- override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
- context.addOnCompleteCallback{ () => closeIfNeeded() }
- val part = thePart.asInstanceOf[JdbcPartition]
- val conn = getConnection()
- val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
-
- // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
- // rather than pulling entire resultset into memory.
- // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
- if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
- stmt.setFetchSize(Integer.MIN_VALUE)
- logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
- }
-
- stmt.setLong(1, part.lower)
- stmt.setLong(2, part.upper)
- val rs = stmt.executeQuery()
-
- override def getNext: T = {
- if (rs.next()) {
- mapRow(rs)
- } else {
- finished = true
- null.asInstanceOf[T]
- }
- }
-
- override def close() {
- try {
- if (null != rs && ! rs.isClosed()) rs.close()
- } catch {
- case e: Exception => logWarning("Exception closing resultset", e)
- }
- try {
- if (null != stmt && ! stmt.isClosed()) stmt.close()
- } catch {
- case e: Exception => logWarning("Exception closing statement", e)
- }
- try {
- if (null != conn && ! stmt.isClosed()) conn.close()
- logInfo("closed connection")
- } catch {
- case e: Exception => logWarning("Exception closing connection", e)
- }
- }
- }
-}
-
-object JdbcRDD {
- def resultSetToObjectArray(rs: ResultSet) = {
- Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
deleted file mode 100644
index af8f0a1..0000000
--- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, Partition, TaskContext}
-
-
-private[spark]
-class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: Iterator[T] => Iterator[U],
- preservesPartitioning: Boolean = false)
- extends RDD[U](prev) {
-
- override val partitioner =
- if (preservesPartitioning) firstParent[T].partitioner else None
-
- override def getPartitions: Array[Partition] = firstParent[T].partitions
-
- override def compute(split: Partition, context: TaskContext) =
- f(firstParent[T].iterator(split, context))
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/MapPartitionsWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithIndexRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithIndexRDD.scala
deleted file mode 100644
index 3b4e951..0000000
--- a/core/src/main/scala/spark/rdd/MapPartitionsWithIndexRDD.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, Partition, TaskContext}
-
-
-/**
- * A variant of the MapPartitionsRDD that passes the partition index into the
- * closure. This can be used to generate or collect partition specific
- * information such as the number of tuples in a partition.
- */
-private[spark]
-class MapPartitionsWithIndexRDD[U: ClassManifest, T: ClassManifest](
- prev: RDD[T],
- f: (Int, Iterator[T]) => Iterator[U],
- preservesPartitioning: Boolean
- ) extends RDD[U](prev) {
-
- override def getPartitions: Array[Partition] = firstParent[T].partitions
-
- override val partitioner = if (preservesPartitioning) prev.partitioner else None
-
- override def compute(split: Partition, context: TaskContext) =
- f(split.index, firstParent[T].iterator(split, context))
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/MappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala
deleted file mode 100644
index 8b411dd..0000000
--- a/core/src/main/scala/spark/rdd/MappedRDD.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, Partition, TaskContext}
-
-private[spark]
-class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
- extends RDD[U](prev) {
-
- override def getPartitions: Array[Partition] = firstParent[T].partitions
-
- override def compute(split: Partition, context: TaskContext) =
- firstParent[T].iterator(split, context).map(f)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
deleted file mode 100644
index 8334e3b..0000000
--- a/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-
-import spark.{TaskContext, Partition, RDD}
-
-private[spark]
-class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
- extends RDD[(K, U)](prev) {
-
- override def getPartitions = firstParent[Product2[K, U]].partitions
-
- override val partitioner = firstParent[Product2[K, U]].partitioner
-
- override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
- firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
- }
-}