You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:13 UTC
[29/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala
deleted file mode 100644
index 6ff92ce..0000000
--- a/core/src/main/scala/spark/Accumulators.scala
+++ /dev/null
@@ -1,256 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-
-import scala.collection.mutable.Map
-import scala.collection.generic.Growable
-
-/**
- * A datatype that can be accumulated, i.e. has an commutative and associative "add" operation,
- * but where the result type, `R`, may be different from the element type being added, `T`.
- *
- * You must define how to add data, and how to merge two of these together. For some datatypes,
- * such as a counter, these might be the same operation. In that case, you can use the simpler
- * [[spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
- * accumulating a set. You will add items to the set, and you will union two sets together.
- *
- * @param initialValue initial value of accumulator
- * @param param helper object defining how to add elements of type `R` and `T`
- * @tparam R the full accumulated data (result type)
- * @tparam T partial data that can be added in
- */
-class Accumulable[R, T] (
- @transient initialValue: R,
- param: AccumulableParam[R, T])
- extends Serializable {
-
- val id = Accumulators.newId
- @transient private var value_ = initialValue // Current value on master
- val zero = param.zero(initialValue) // Zero value to be passed to workers
- var deserialized = false
-
- Accumulators.register(this, true)
-
- /**
- * Add more data to this accumulator / accumulable
- * @param term the data to add
- */
- def += (term: T) { value_ = param.addAccumulator(value_, term) }
-
- /**
- * Add more data to this accumulator / accumulable
- * @param term the data to add
- */
- def add(term: T) { value_ = param.addAccumulator(value_, term) }
-
- /**
- * Merge two accumulable objects together
- *
- * Normally, a user will not want to use this version, but will instead call `+=`.
- * @param term the other `R` that will get merged with this
- */
- def ++= (term: R) { value_ = param.addInPlace(value_, term)}
-
- /**
- * Merge two accumulable objects together
- *
- * Normally, a user will not want to use this version, but will instead call `add`.
- * @param term the other `R` that will get merged with this
- */
- def merge(term: R) { value_ = param.addInPlace(value_, term)}
-
- /**
- * Access the accumulator's current value; only allowed on master.
- */
- def value: R = {
- if (!deserialized) {
- value_
- } else {
- throw new UnsupportedOperationException("Can't read accumulator value in task")
- }
- }
-
- /**
- * Get the current value of this accumulator from within a task.
- *
- * This is NOT the global value of the accumulator. To get the global value after a
- * completed operation on the dataset, call `value`.
- *
- * The typical use of this method is to directly mutate the local value, eg., to add
- * an element to a Set.
- */
- def localValue = value_
-
- /**
- * Set the accumulator's value; only allowed on master.
- */
- def value_= (newValue: R) {
- if (!deserialized) value_ = newValue
- else throw new UnsupportedOperationException("Can't assign accumulator value in task")
- }
-
- /**
- * Set the accumulator's value; only allowed on master
- */
- def setValue(newValue: R) {
- this.value = newValue
- }
-
- // Called by Java when deserializing an object
- private def readObject(in: ObjectInputStream) {
- in.defaultReadObject()
- value_ = zero
- deserialized = true
- Accumulators.register(this, false)
- }
-
- override def toString = value_.toString
-}
-
-/**
- * Helper object defining how to accumulate values of a particular type. An implicit
- * AccumulableParam needs to be available when you create Accumulables of a specific type.
- *
- * @tparam R the full accumulated data (result type)
- * @tparam T partial data that can be added in
- */
-trait AccumulableParam[R, T] extends Serializable {
- /**
- * Add additional data to the accumulator value. Is allowed to modify and return `r`
- * for efficiency (to avoid allocating objects).
- *
- * @param r the current value of the accumulator
- * @param t the data to be added to the accumulator
- * @return the new value of the accumulator
- */
- def addAccumulator(r: R, t: T): R
-
- /**
- * Merge two accumulated values together. Is allowed to modify and return the first value
- * for efficiency (to avoid allocating objects).
- *
- * @param r1 one set of accumulated data
- * @param r2 another set of accumulated data
- * @return both data sets merged together
- */
- def addInPlace(r1: R, r2: R): R
-
- /**
- * Return the "zero" (identity) value for an accumulator type, given its initial value. For
- * example, if R was a vector of N dimensions, this would return a vector of N zeroes.
- */
- def zero(initialValue: R): R
-}
-
-private[spark]
-class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
- extends AccumulableParam[R,T] {
-
- def addAccumulator(growable: R, elem: T): R = {
- growable += elem
- growable
- }
-
- def addInPlace(t1: R, t2: R): R = {
- t1 ++= t2
- t1
- }
-
- def zero(initialValue: R): R = {
- // We need to clone initialValue, but it's hard to specify that R should also be Cloneable.
- // Instead we'll serialize it to a buffer and load it back.
- val ser = (new spark.JavaSerializer).newInstance()
- val copy = ser.deserialize[R](ser.serialize(initialValue))
- copy.clear() // In case it contained stuff
- copy
- }
-}
-
-/**
- * A simpler value of [[spark.Accumulable]] where the result type being accumulated is the same
- * as the types of elements being merged.
- *
- * @param initialValue initial value of accumulator
- * @param param helper object defining how to add elements of type `T`
- * @tparam T result type
- */
-class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
- extends Accumulable[T,T](initialValue, param)
-
-/**
- * A simpler version of [[spark.AccumulableParam]] where the only datatype you can add in is the same type
- * as the accumulated value. An implicit AccumulatorParam object needs to be available when you create
- * Accumulators of a specific type.
- *
- * @tparam T type of value to accumulate
- */
-trait AccumulatorParam[T] extends AccumulableParam[T, T] {
- def addAccumulator(t1: T, t2: T): T = {
- addInPlace(t1, t2)
- }
-}
-
-// TODO: The multi-thread support in accumulators is kind of lame; check
-// if there's a more intuitive way of doing it right
-private object Accumulators {
- // TODO: Use soft references? => need to make readObject work properly then
- val originals = Map[Long, Accumulable[_, _]]()
- val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
- var lastId: Long = 0
-
- def newId: Long = synchronized {
- lastId += 1
- return lastId
- }
-
- def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
- if (original) {
- originals(a.id) = a
- } else {
- val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
- accums(a.id) = a
- }
- }
-
- // Clear the local (non-original) accumulators for the current thread
- def clear() {
- synchronized {
- localAccums.remove(Thread.currentThread)
- }
- }
-
- // Get the values of the local accumulators for the current thread (by ID)
- def values: Map[Long, Any] = synchronized {
- val ret = Map[Long, Any]()
- for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
- ret(id) = accum.localValue
- }
- return ret
- }
-
- // Add values to the original accumulators with some given IDs
- def add(values: Map[Long, Any]): Unit = synchronized {
- for ((id, value) <- values) {
- if (originals.contains(id)) {
- originals(id).asInstanceOf[Accumulable[Any, Any]] ++= value
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
deleted file mode 100644
index 9af4019..0000000
--- a/core/src/main/scala/spark/Aggregator.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.JavaConversions._
-
-/** A set of functions used to aggregate data.
- *
- * @param createCombiner function to create the initial value of the aggregation.
- * @param mergeValue function to merge a new value into the aggregation result.
- * @param mergeCombiners function to merge outputs from multiple mergeValue function.
- */
-case class Aggregator[K, V, C] (
- createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C) {
-
- def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
- val combiners = new JHashMap[K, C]
- for (kv <- iter) {
- val oldC = combiners.get(kv._1)
- if (oldC == null) {
- combiners.put(kv._1, createCombiner(kv._2))
- } else {
- combiners.put(kv._1, mergeValue(oldC, kv._2))
- }
- }
- combiners.iterator
- }
-
- def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
- val combiners = new JHashMap[K, C]
- iter.foreach { case(k, c) =>
- val oldC = combiners.get(k)
- if (oldC == null) {
- combiners.put(k, c)
- } else {
- combiners.put(k, mergeCombiners(oldC, c))
- }
- }
- combiners.iterator
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
deleted file mode 100644
index 1ec95ed..0000000
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-
-import spark.executor.{ShuffleReadMetrics, TaskMetrics}
-import spark.serializer.Serializer
-import spark.storage.BlockManagerId
-import spark.util.CompletionIterator
-
-
-private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
-
- override def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer)
- : Iterator[T] =
- {
-
- logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
- val blockManager = SparkEnv.get.blockManager
-
- val startTime = System.currentTimeMillis
- val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
- logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
- shuffleId, reduceId, System.currentTimeMillis - startTime))
-
- val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
- for (((address, size), index) <- statuses.zipWithIndex) {
- splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size))
- }
-
- val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] = splitsByAddress.toSeq.map {
- case (address, splits) =>
- (address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2)))
- }
-
- def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[T] = {
- val blockId = blockPair._1
- val blockOption = blockPair._2
- blockOption match {
- case Some(block) => {
- block.asInstanceOf[Iterator[T]]
- }
- case None => {
- val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r
- blockId match {
- case regex(shufId, mapId, _) =>
- val address = statuses(mapId.toInt)._1
- throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, null)
- case _ =>
- throw new SparkException(
- "Failed to get block " + blockId + ", which is not a shuffle block")
- }
- }
- }
- }
-
- val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
- val itr = blockFetcherItr.flatMap(unpackBlock)
-
- CompletionIterator[T, Iterator[T]](itr, {
- val shuffleMetrics = new ShuffleReadMetrics
- shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
- shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
- shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime
- shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead
- shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks
- shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks
- shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks
- metrics.shuffleReadMetrics = Some(shuffleMetrics)
- })
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/CacheManager.scala b/core/src/main/scala/spark/CacheManager.scala
deleted file mode 100644
index 8131480..0000000
--- a/core/src/main/scala/spark/CacheManager.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import scala.collection.mutable.{ArrayBuffer, HashSet}
-import spark.storage.{BlockManager, StorageLevel}
-
-
-/** Spark class responsible for passing RDDs split contents to the BlockManager and making
- sure a node doesn't load two copies of an RDD at once.
- */
-private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
- private val loading = new HashSet[String]
-
- /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
- def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
- : Iterator[T] = {
- val key = "rdd_%d_%d".format(rdd.id, split.index)
- logInfo("Cache key is " + key)
- blockManager.get(key) match {
- case Some(cachedValues) =>
- // Partition is in cache, so just return its values
- logInfo("Found partition in cache!")
- return cachedValues.asInstanceOf[Iterator[T]]
-
- case None =>
- // Mark the split as loading (unless someone else marks it first)
- loading.synchronized {
- if (loading.contains(key)) {
- logInfo("Loading contains " + key + ", waiting...")
- while (loading.contains(key)) {
- try {loading.wait()} catch {case _ : Throwable =>}
- }
- logInfo("Loading no longer contains " + key + ", so returning cached result")
- // See whether someone else has successfully loaded it. The main way this would fail
- // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
- // partition but we didn't want to make space for it. However, that case is unlikely
- // because it's unlikely that two threads would work on the same RDD partition. One
- // downside of the current code is that threads wait serially if this does happen.
- blockManager.get(key) match {
- case Some(values) =>
- return values.asInstanceOf[Iterator[T]]
- case None =>
- logInfo("Whoever was loading " + key + " failed; we'll try it ourselves")
- loading.add(key)
- }
- } else {
- loading.add(key)
- }
- }
- try {
- // If we got here, we have to load the split
- val elements = new ArrayBuffer[Any]
- logInfo("Computing partition " + split)
- elements ++= rdd.computeOrReadCheckpoint(split, context)
- // Try to put this block in the blockManager
- blockManager.put(key, elements, storageLevel, true)
- return elements.iterator.asInstanceOf[Iterator[T]]
- } finally {
- loading.synchronized {
- loading.remove(key)
- loading.notifyAll()
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/ClosureCleaner.scala b/core/src/main/scala/spark/ClosureCleaner.scala
deleted file mode 100644
index 8b39241..0000000
--- a/core/src/main/scala/spark/ClosureCleaner.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.lang.reflect.Field
-
-import scala.collection.mutable.Map
-import scala.collection.mutable.Set
-
-import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
-import org.objectweb.asm.Opcodes._
-import java.io.{InputStream, IOException, ByteArrayOutputStream, ByteArrayInputStream, BufferedInputStream}
-
-private[spark] object ClosureCleaner extends Logging {
- // Get an ASM class reader for a given class from the JAR that loaded it
- private def getClassReader(cls: Class[_]): ClassReader = {
- // Copy data over, before delegating to ClassReader - else we can run out of open file handles.
- val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
- val resourceStream = cls.getResourceAsStream(className)
- // todo: Fixme - continuing with earlier behavior ...
- if (resourceStream == null) return new ClassReader(resourceStream)
-
- val baos = new ByteArrayOutputStream(128)
- Utils.copyStream(resourceStream, baos, true)
- new ClassReader(new ByteArrayInputStream(baos.toByteArray))
- }
-
- // Check whether a class represents a Scala closure
- private def isClosure(cls: Class[_]): Boolean = {
- cls.getName.contains("$anonfun$")
- }
-
- // Get a list of the classes of the outer objects of a given closure object, obj;
- // the outer objects are defined as any closures that obj is nested within, plus
- // possibly the class that the outermost closure is in, if any. We stop searching
- // for outer objects beyond that because cloning the user's object is probably
- // not a good idea (whereas we can clone closure objects just fine since we
- // understand how all their fields are used).
- private def getOuterClasses(obj: AnyRef): List[Class[_]] = {
- for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
- f.setAccessible(true)
- if (isClosure(f.getType)) {
- return f.getType :: getOuterClasses(f.get(obj))
- } else {
- return f.getType :: Nil // Stop at the first $outer that is not a closure
- }
- }
- return Nil
- }
-
- // Get a list of the outer objects for a given closure object.
- private def getOuterObjects(obj: AnyRef): List[AnyRef] = {
- for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
- f.setAccessible(true)
- if (isClosure(f.getType)) {
- return f.get(obj) :: getOuterObjects(f.get(obj))
- } else {
- return f.get(obj) :: Nil // Stop at the first $outer that is not a closure
- }
- }
- return Nil
- }
-
- private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
- val seen = Set[Class[_]](obj.getClass)
- var stack = List[Class[_]](obj.getClass)
- while (!stack.isEmpty) {
- val cr = getClassReader(stack.head)
- stack = stack.tail
- val set = Set[Class[_]]()
- cr.accept(new InnerClosureFinder(set), 0)
- for (cls <- set -- seen) {
- seen += cls
- stack = cls :: stack
- }
- }
- return (seen - obj.getClass).toList
- }
-
- private def createNullValue(cls: Class[_]): AnyRef = {
- if (cls.isPrimitive) {
- new java.lang.Byte(0: Byte) // Should be convertible to any primitive type
- } else {
- null
- }
- }
-
- def clean(func: AnyRef) {
- // TODO: cache outerClasses / innerClasses / accessedFields
- val outerClasses = getOuterClasses(func)
- val innerClasses = getInnerClasses(func)
- val outerObjects = getOuterObjects(func)
-
- val accessedFields = Map[Class[_], Set[String]]()
- for (cls <- outerClasses)
- accessedFields(cls) = Set[String]()
- for (cls <- func.getClass :: innerClasses)
- getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0)
- //logInfo("accessedFields: " + accessedFields)
-
- val inInterpreter = {
- try {
- val interpClass = Class.forName("spark.repl.Main")
- interpClass.getMethod("interp").invoke(null) != null
- } catch {
- case _: ClassNotFoundException => true
- }
- }
-
- var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
- var outer: AnyRef = null
- if (outerPairs.size > 0 && !isClosure(outerPairs.head._1)) {
- // The closure is ultimately nested inside a class; keep the object of that
- // class without cloning it since we don't want to clone the user's objects.
- outer = outerPairs.head._2
- outerPairs = outerPairs.tail
- }
- // Clone the closure objects themselves, nulling out any fields that are not
- // used in the closure we're working on or any of its inner closures.
- for ((cls, obj) <- outerPairs) {
- outer = instantiateClass(cls, outer, inInterpreter)
- for (fieldName <- accessedFields(cls)) {
- val field = cls.getDeclaredField(fieldName)
- field.setAccessible(true)
- val value = field.get(obj)
- //logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
- field.set(outer, value)
- }
- }
-
- if (outer != null) {
- //logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
- val field = func.getClass.getDeclaredField("$outer")
- field.setAccessible(true)
- field.set(func, outer)
- }
- }
-
- private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
- //logInfo("Creating a " + cls + " with outer = " + outer)
- if (!inInterpreter) {
- // This is a bona fide closure class, whose constructor has no effects
- // other than to set its fields, so use its constructor
- val cons = cls.getConstructors()(0)
- val params = cons.getParameterTypes.map(createNullValue).toArray
- if (outer != null)
- params(0) = outer // First param is always outer object
- return cons.newInstance(params: _*).asInstanceOf[AnyRef]
- } else {
- // Use reflection to instantiate object without calling constructor
- val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
- val parentCtor = classOf[java.lang.Object].getDeclaredConstructor()
- val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
- val obj = newCtor.newInstance().asInstanceOf[AnyRef]
- if (outer != null) {
- //logInfo("3: Setting $outer on " + cls + " to " + outer);
- val field = cls.getDeclaredField("$outer")
- field.setAccessible(true)
- field.set(obj, outer)
- }
- return obj
- }
- }
-}
-
-private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
- override def visitMethod(access: Int, name: String, desc: String,
- sig: String, exceptions: Array[String]): MethodVisitor = {
- return new MethodVisitor(ASM4) {
- override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
- if (op == GETFIELD) {
- for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
- output(cl) += name
- }
- }
- }
-
- override def visitMethodInsn(op: Int, owner: String, name: String,
- desc: String) {
- // Check for calls a getter method for a variable in an interpreter wrapper object.
- // This means that the corresponding field will be accessed, so we should save it.
- if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) {
- for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
- output(cl) += name
- }
- }
- }
- }
- }
-}
-
-private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) {
- var myName: String = null
-
- override def visit(version: Int, access: Int, name: String, sig: String,
- superName: String, interfaces: Array[String]) {
- myName = name
- }
-
- override def visitMethod(access: Int, name: String, desc: String,
- sig: String, exceptions: Array[String]): MethodVisitor = {
- return new MethodVisitor(ASM4) {
- override def visitMethodInsn(op: Int, owner: String, name: String,
- desc: String) {
- val argTypes = Type.getArgumentTypes(desc)
- if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
- && argTypes(0).toString.startsWith("L") // is it an object?
- && argTypes(0).getInternalName == myName)
- output += Class.forName(
- owner.replace('/', '.'),
- false,
- Thread.currentThread.getContextClassLoader)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/Dependency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
deleted file mode 100644
index d5a9606..0000000
--- a/core/src/main/scala/spark/Dependency.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-/**
- * Base class for dependencies.
- */
-abstract class Dependency[T](val rdd: RDD[T]) extends Serializable
-
-
-/**
- * Base class for dependencies where each partition of the parent RDD is used by at most one
- * partition of the child RDD. Narrow dependencies allow for pipelined execution.
- */
-abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
- /**
- * Get the parent partitions for a child partition.
- * @param partitionId a partition of the child RDD
- * @return the partitions of the parent RDD that the child partition depends upon
- */
- def getParents(partitionId: Int): Seq[Int]
-}
-
-
-/**
- * Represents a dependency on the output of a shuffle stage.
- * @param rdd the parent RDD
- * @param partitioner partitioner used to partition the shuffle output
- * @param serializerClass class name of the serializer to use
- */
-class ShuffleDependency[K, V](
- @transient rdd: RDD[_ <: Product2[K, V]],
- val partitioner: Partitioner,
- val serializerClass: String = null)
- extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
-
- val shuffleId: Int = rdd.context.newShuffleId()
-}
-
-
-/**
- * Represents a one-to-one dependency between partitions of the parent and child RDDs.
- */
-class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
- override def getParents(partitionId: Int) = List(partitionId)
-}
-
-
-/**
- * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs.
- * @param rdd the parent RDD
- * @param inStart the start of the range in the parent RDD
- * @param outStart the start of the range in the child RDD
- * @param length the length of the range
- */
-class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
- extends NarrowDependency[T](rdd) {
-
- override def getParents(partitionId: Int) = {
- if (partitionId >= outStart && partitionId < outStart + length) {
- List(partitionId - outStart + inStart)
- } else {
- Nil
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/DoubleRDDFunctions.scala b/core/src/main/scala/spark/DoubleRDDFunctions.scala
deleted file mode 100644
index 104168e..0000000
--- a/core/src/main/scala/spark/DoubleRDDFunctions.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import spark.partial.BoundedDouble
-import spark.partial.MeanEvaluator
-import spark.partial.PartialResult
-import spark.partial.SumEvaluator
-import spark.util.StatCounter
-
-/**
- * Extra functions available on RDDs of Doubles through an implicit conversion.
- * Import `spark.SparkContext._` at the top of your program to use these functions.
- */
-class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
- /** Add up the elements in this RDD. */
- def sum(): Double = {
- self.reduce(_ + _)
- }
-
- /**
- * Return a [[spark.util.StatCounter]] object that captures the mean, variance and count
- * of the RDD's elements in one operation.
- */
- def stats(): StatCounter = {
- self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
- }
-
- /** Compute the mean of this RDD's elements. */
- def mean(): Double = stats().mean
-
- /** Compute the variance of this RDD's elements. */
- def variance(): Double = stats().variance
-
- /** Compute the standard deviation of this RDD's elements. */
- def stdev(): Double = stats().stdev
-
- /**
- * Compute the sample standard deviation of this RDD's elements (which corrects for bias in
- * estimating the standard deviation by dividing by N-1 instead of N).
- */
- def sampleStdev(): Double = stats().sampleStdev
-
- /**
- * Compute the sample variance of this RDD's elements (which corrects for bias in
- * estimating the variance by dividing by N-1 instead of N).
- */
- def sampleVariance(): Double = stats().sampleVariance
-
- /** (Experimental) Approximate operation to return the mean within a timeout. */
- def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
- val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
- val evaluator = new MeanEvaluator(self.partitions.size, confidence)
- self.context.runApproximateJob(self, processPartition, evaluator, timeout)
- }
-
- /** (Experimental) Approximate operation to return the sum within a timeout. */
- def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
- val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
- val evaluator = new SumEvaluator(self.partitions.size, confidence)
- self.context.runApproximateJob(self, processPartition, evaluator, timeout)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/FetchFailedException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/FetchFailedException.scala b/core/src/main/scala/spark/FetchFailedException.scala
deleted file mode 100644
index a2dae6c..0000000
--- a/core/src/main/scala/spark/FetchFailedException.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import spark.storage.BlockManagerId
-
-private[spark] class FetchFailedException(
- taskEndReason: TaskEndReason,
- message: String,
- cause: Throwable)
- extends Exception {
-
- def this (bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, cause: Throwable) =
- this(FetchFailed(bmAddress, shuffleId, mapId, reduceId),
- "Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId),
- cause)
-
- def this (shuffleId: Int, reduceId: Int, cause: Throwable) =
- this(FetchFailed(null, shuffleId, -1, reduceId),
- "Unable to fetch locations from master: %d %d".format(shuffleId, reduceId), cause)
-
- override def getMessage(): String = message
-
-
- override def getCause(): Throwable = cause
-
- def toTaskEndReason: TaskEndReason = taskEndReason
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/HttpFileServer.scala b/core/src/main/scala/spark/HttpFileServer.scala
deleted file mode 100644
index a13a7a2..0000000
--- a/core/src/main/scala/spark/HttpFileServer.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io.{File}
-import com.google.common.io.Files
-
-private[spark] class HttpFileServer extends Logging {
-
- var baseDir : File = null
- var fileDir : File = null
- var jarDir : File = null
- var httpServer : HttpServer = null
- var serverUri : String = null
-
- def initialize() {
- baseDir = Utils.createTempDir()
- fileDir = new File(baseDir, "files")
- jarDir = new File(baseDir, "jars")
- fileDir.mkdir()
- jarDir.mkdir()
- logInfo("HTTP File server directory is " + baseDir)
- httpServer = new HttpServer(baseDir)
- httpServer.start()
- serverUri = httpServer.uri
- }
-
- def stop() {
- httpServer.stop()
- }
-
- def addFile(file: File) : String = {
- addFileToDir(file, fileDir)
- return serverUri + "/files/" + file.getName
- }
-
- def addJar(file: File) : String = {
- addFileToDir(file, jarDir)
- return serverUri + "/jars/" + file.getName
- }
-
- def addFileToDir(file: File, dir: File) : String = {
- Files.copy(file, new File(dir, file.getName))
- return dir + "/" + file.getName
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/HttpServer.scala b/core/src/main/scala/spark/HttpServer.scala
deleted file mode 100644
index c9dffbc..0000000
--- a/core/src/main/scala/spark/HttpServer.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io.File
-import java.net.InetAddress
-
-import org.eclipse.jetty.server.Server
-import org.eclipse.jetty.server.bio.SocketConnector
-import org.eclipse.jetty.server.handler.DefaultHandler
-import org.eclipse.jetty.server.handler.HandlerList
-import org.eclipse.jetty.server.handler.ResourceHandler
-import org.eclipse.jetty.util.thread.QueuedThreadPool
-
-/**
- * Exception type thrown by HttpServer when it is in the wrong state for an operation.
- */
-private[spark] class ServerStateException(message: String) extends Exception(message)
-
-/**
- * An HTTP server for static content used to allow worker nodes to access JARs added to SparkContext
- * as well as classes created by the interpreter when the user types in code. This is just a wrapper
- * around a Jetty server.
- */
-private[spark] class HttpServer(resourceBase: File) extends Logging {
- private var server: Server = null
- private var port: Int = -1
-
- def start() {
- if (server != null) {
- throw new ServerStateException("Server is already started")
- } else {
- server = new Server()
- val connector = new SocketConnector
- connector.setMaxIdleTime(60*1000)
- connector.setSoLingerTime(-1)
- connector.setPort(0)
- server.addConnector(connector)
-
- val threadPool = new QueuedThreadPool
- threadPool.setDaemon(true)
- server.setThreadPool(threadPool)
- val resHandler = new ResourceHandler
- resHandler.setResourceBase(resourceBase.getAbsolutePath)
- val handlerList = new HandlerList
- handlerList.setHandlers(Array(resHandler, new DefaultHandler))
- server.setHandler(handlerList)
- server.start()
- port = server.getConnectors()(0).getLocalPort()
- }
- }
-
- def stop() {
- if (server == null) {
- throw new ServerStateException("Server is already stopped")
- } else {
- server.stop()
- port = -1
- server = null
- }
- }
-
- /**
- * Get the URI of this HTTP server (http://host:port)
- */
- def uri: String = {
- if (server == null) {
- throw new ServerStateException("Server is not started")
- } else {
- return "http://" + Utils.localIpAddress + ":" + port
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/JavaSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala
deleted file mode 100644
index 04c5f44..0000000
--- a/core/src/main/scala/spark/JavaSerializer.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-import java.nio.ByteBuffer
-
-import serializer.{Serializer, SerializerInstance, DeserializationStream, SerializationStream}
-import spark.util.ByteBufferInputStream
-
-private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
- val objOut = new ObjectOutputStream(out)
- def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
- def flush() { objOut.flush() }
- def close() { objOut.close() }
-}
-
-private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
-extends DeserializationStream {
- val objIn = new ObjectInputStream(in) {
- override def resolveClass(desc: ObjectStreamClass) =
- Class.forName(desc.getName, false, loader)
- }
-
- def readObject[T](): T = objIn.readObject().asInstanceOf[T]
- def close() { objIn.close() }
-}
-
-private[spark] class JavaSerializerInstance extends SerializerInstance {
- def serialize[T](t: T): ByteBuffer = {
- val bos = new ByteArrayOutputStream()
- val out = serializeStream(bos)
- out.writeObject(t)
- out.close()
- ByteBuffer.wrap(bos.toByteArray)
- }
-
- def deserialize[T](bytes: ByteBuffer): T = {
- val bis = new ByteBufferInputStream(bytes)
- val in = deserializeStream(bis)
- in.readObject().asInstanceOf[T]
- }
-
- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
- val bis = new ByteBufferInputStream(bytes)
- val in = deserializeStream(bis, loader)
- in.readObject().asInstanceOf[T]
- }
-
- def serializeStream(s: OutputStream): SerializationStream = {
- new JavaSerializationStream(s)
- }
-
- def deserializeStream(s: InputStream): DeserializationStream = {
- new JavaDeserializationStream(s, Thread.currentThread.getContextClassLoader)
- }
-
- def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
- new JavaDeserializationStream(s, loader)
- }
-}
-
-/**
- * A Spark serializer that uses Java's built-in serialization.
- */
-class JavaSerializer extends Serializer {
- def newInstance(): SerializerInstance = new JavaSerializerInstance
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala
deleted file mode 100644
index eeb2993..0000000
--- a/core/src/main/scala/spark/KryoSerializer.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-import java.nio.ByteBuffer
-import com.esotericsoftware.kryo.{Kryo, KryoException}
-import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
-import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
-import com.twitter.chill.ScalaKryoInstantiator
-import serializer.{SerializerInstance, DeserializationStream, SerializationStream}
-import spark.broadcast._
-import spark.storage._
-
-private[spark]
-class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
- val output = new KryoOutput(outStream)
-
- def writeObject[T](t: T): SerializationStream = {
- kryo.writeClassAndObject(output, t)
- this
- }
-
- def flush() { output.flush() }
- def close() { output.close() }
-}
-
-private[spark]
-class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
- val input = new KryoInput(inStream)
-
- def readObject[T](): T = {
- try {
- kryo.readClassAndObject(input).asInstanceOf[T]
- } catch {
- // DeserializationStream uses the EOF exception to indicate stopping condition.
- case _: KryoException => throw new EOFException
- }
- }
-
- def close() {
- // Kryo's Input automatically closes the input stream it is using.
- input.close()
- }
-}
-
-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
- val kryo = ks.newKryo()
- val output = ks.newKryoOutput()
- val input = ks.newKryoInput()
-
- def serialize[T](t: T): ByteBuffer = {
- output.clear()
- kryo.writeClassAndObject(output, t)
- ByteBuffer.wrap(output.toBytes)
- }
-
- def deserialize[T](bytes: ByteBuffer): T = {
- input.setBuffer(bytes.array)
- kryo.readClassAndObject(input).asInstanceOf[T]
- }
-
- def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
- val oldClassLoader = kryo.getClassLoader
- kryo.setClassLoader(loader)
- input.setBuffer(bytes.array)
- val obj = kryo.readClassAndObject(input).asInstanceOf[T]
- kryo.setClassLoader(oldClassLoader)
- obj
- }
-
- def serializeStream(s: OutputStream): SerializationStream = {
- new KryoSerializationStream(kryo, s)
- }
-
- def deserializeStream(s: InputStream): DeserializationStream = {
- new KryoDeserializationStream(kryo, s)
- }
-}
-
-/**
- * Interface implemented by clients to register their classes with Kryo when using Kryo
- * serialization.
- */
-trait KryoRegistrator {
- def registerClasses(kryo: Kryo)
-}
-
-/**
- * A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
- */
-class KryoSerializer extends spark.serializer.Serializer with Logging {
- private val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
-
- def newKryoOutput() = new KryoOutput(bufferSize)
-
- def newKryoInput() = new KryoInput(bufferSize)
-
- def newKryo(): Kryo = {
- val instantiator = new ScalaKryoInstantiator
- val kryo = instantiator.newKryo()
- val classLoader = Thread.currentThread.getContextClassLoader
-
- // Register some commonly used classes
- val toRegister: Seq[AnyRef] = Seq(
- ByteBuffer.allocate(1),
- StorageLevel.MEMORY_ONLY,
- PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
- GotBlock("1", ByteBuffer.allocate(1)),
- GetBlock("1")
- )
-
- for (obj <- toRegister) kryo.register(obj.getClass)
-
- // Allow sending SerializableWritable
- kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
- kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
-
- // Allow the user to register their own classes by setting spark.kryo.registrator
- try {
- Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
- logDebug("Running user registrator: " + regCls)
- val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
- reg.registerClasses(kryo)
- }
- } catch {
- case _: Exception => println("Failed to register spark.kryo.registrator")
- }
-
- kryo.setClassLoader(classLoader)
-
- // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
- kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
-
- kryo
- }
-
- def newInstance(): SerializerInstance = {
- new KryoSerializerInstance(this)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/Logging.scala b/core/src/main/scala/spark/Logging.scala
deleted file mode 100644
index 79b0362..0000000
--- a/core/src/main/scala/spark/Logging.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.slf4j.Logger
-import org.slf4j.LoggerFactory
-
-/**
- * Utility trait for classes that want to log data. Creates a SLF4J logger for the class and allows
- * logging messages at different levels using methods that only evaluate parameters lazily if the
- * log level is enabled.
- */
-trait Logging {
- // Make the log field transient so that objects with Logging can
- // be serialized and used on another machine
- @transient private var log_ : Logger = null
-
- // Method to get or create the logger for this object
- protected def log: Logger = {
- if (log_ == null) {
- var className = this.getClass.getName
- // Ignore trailing $'s in the class names for Scala objects
- if (className.endsWith("$")) {
- className = className.substring(0, className.length - 1)
- }
- log_ = LoggerFactory.getLogger(className)
- }
- return log_
- }
-
- // Log methods that take only a String
- protected def logInfo(msg: => String) {
- if (log.isInfoEnabled) log.info(msg)
- }
-
- protected def logDebug(msg: => String) {
- if (log.isDebugEnabled) log.debug(msg)
- }
-
- protected def logTrace(msg: => String) {
- if (log.isTraceEnabled) log.trace(msg)
- }
-
- protected def logWarning(msg: => String) {
- if (log.isWarnEnabled) log.warn(msg)
- }
-
- protected def logError(msg: => String) {
- if (log.isErrorEnabled) log.error(msg)
- }
-
- // Log methods that take Throwables (Exceptions/Errors) too
- protected def logInfo(msg: => String, throwable: Throwable) {
- if (log.isInfoEnabled) log.info(msg, throwable)
- }
-
- protected def logDebug(msg: => String, throwable: Throwable) {
- if (log.isDebugEnabled) log.debug(msg, throwable)
- }
-
- protected def logTrace(msg: => String, throwable: Throwable) {
- if (log.isTraceEnabled) log.trace(msg, throwable)
- }
-
- protected def logWarning(msg: => String, throwable: Throwable) {
- if (log.isWarnEnabled) log.warn(msg, throwable)
- }
-
- protected def logError(msg: => String, throwable: Throwable) {
- if (log.isErrorEnabled) log.error(msg, throwable)
- }
-
- protected def isTraceEnabled(): Boolean = {
- log.isTraceEnabled
- }
-
- // Method for ensuring that logging is initialized, to avoid having multiple
- // threads do it concurrently (as SLF4J initialization is not thread safe).
- protected def initLogging() { log }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/MapOutputTracker.scala b/core/src/main/scala/spark/MapOutputTracker.scala
deleted file mode 100644
index 0cd0341..0000000
--- a/core/src/main/scala/spark/MapOutputTracker.scala
+++ /dev/null
@@ -1,338 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-
-import akka.actor._
-import akka.dispatch._
-import akka.pattern.ask
-import akka.remote._
-import akka.util.Duration
-
-
-import spark.scheduler.MapStatus
-import spark.storage.BlockManagerId
-import spark.util.{MetadataCleaner, TimeStampedHashMap}
-
-
-private[spark] sealed trait MapOutputTrackerMessage
-private[spark] case class GetMapOutputStatuses(shuffleId: Int, requester: String)
- extends MapOutputTrackerMessage
-private[spark] case object StopMapOutputTracker extends MapOutputTrackerMessage
-
-private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging {
- def receive = {
- case GetMapOutputStatuses(shuffleId: Int, requester: String) =>
- logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + requester)
- sender ! tracker.getSerializedLocations(shuffleId)
-
- case StopMapOutputTracker =>
- logInfo("MapOutputTrackerActor stopped!")
- sender ! true
- context.stop(self)
- }
-}
-
-private[spark] class MapOutputTracker extends Logging {
-
- private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
-
- // Set to the MapOutputTrackerActor living on the driver
- var trackerActor: ActorRef = _
-
- private var mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
-
- // Incremented every time a fetch fails so that client nodes know to clear
- // their cache of map output locations if this happens.
- private var epoch: Long = 0
- private val epochLock = new java.lang.Object
-
- // Cache a serialized version of the output statuses for each shuffle to send them out faster
- var cacheEpoch = epoch
- private val cachedSerializedStatuses = new TimeStampedHashMap[Int, Array[Byte]]
-
- val metadataCleaner = new MetadataCleaner("MapOutputTracker", this.cleanup)
-
- // Send a message to the trackerActor and get its result within a default timeout, or
- // throw a SparkException if this fails.
- def askTracker(message: Any): Any = {
- try {
- val future = trackerActor.ask(message)(timeout)
- return Await.result(future, timeout)
- } catch {
- case e: Exception =>
- throw new SparkException("Error communicating with MapOutputTracker", e)
- }
- }
-
- // Send a one-way message to the trackerActor, to which we expect it to reply with true.
- def communicate(message: Any) {
- if (askTracker(message) != true) {
- throw new SparkException("Error reply received from MapOutputTracker")
- }
- }
-
- def registerShuffle(shuffleId: Int, numMaps: Int) {
- if (mapStatuses.putIfAbsent(shuffleId, new Array[MapStatus](numMaps)).isDefined) {
- throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice")
- }
- }
-
- def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) {
- var array = mapStatuses(shuffleId)
- array.synchronized {
- array(mapId) = status
- }
- }
-
- def registerMapOutputs(
- shuffleId: Int,
- statuses: Array[MapStatus],
- changeEpoch: Boolean = false) {
- mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
- if (changeEpoch) {
- incrementEpoch()
- }
- }
-
- def unregisterMapOutput(shuffleId: Int, mapId: Int, bmAddress: BlockManagerId) {
- var arrayOpt = mapStatuses.get(shuffleId)
- if (arrayOpt.isDefined && arrayOpt.get != null) {
- var array = arrayOpt.get
- array.synchronized {
- if (array(mapId) != null && array(mapId).location == bmAddress) {
- array(mapId) = null
- }
- }
- incrementEpoch()
- } else {
- throw new SparkException("unregisterMapOutput called for nonexistent shuffle ID")
- }
- }
-
- // Remembers which map output locations are currently being fetched on a worker
- private val fetching = new HashSet[Int]
-
- // Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
- def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = {
- val statuses = mapStatuses.get(shuffleId).orNull
- if (statuses == null) {
- logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
- var fetchedStatuses: Array[MapStatus] = null
- fetching.synchronized {
- if (fetching.contains(shuffleId)) {
- // Someone else is fetching it; wait for them to be done
- while (fetching.contains(shuffleId)) {
- try {
- fetching.wait()
- } catch {
- case e: InterruptedException =>
- }
- }
- }
-
- // Either while we waited the fetch happened successfully, or
- // someone fetched it in between the get and the fetching.synchronized.
- fetchedStatuses = mapStatuses.get(shuffleId).orNull
- if (fetchedStatuses == null) {
- // We have to do the fetch, get others to wait for us.
- fetching += shuffleId
- }
- }
-
- if (fetchedStatuses == null) {
- // We won the race to fetch the output locs; do so
- logInfo("Doing the fetch; tracker actor = " + trackerActor)
- val hostPort = Utils.localHostPort()
- // This try-finally prevents hangs due to timeouts:
- try {
- val fetchedBytes =
- askTracker(GetMapOutputStatuses(shuffleId, hostPort)).asInstanceOf[Array[Byte]]
- fetchedStatuses = deserializeStatuses(fetchedBytes)
- logInfo("Got the output locations")
- mapStatuses.put(shuffleId, fetchedStatuses)
- } finally {
- fetching.synchronized {
- fetching -= shuffleId
- fetching.notifyAll()
- }
- }
- }
- if (fetchedStatuses != null) {
- fetchedStatuses.synchronized {
- return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses)
- }
- }
- else{
- throw new FetchFailedException(null, shuffleId, -1, reduceId,
- new Exception("Missing all output locations for shuffle " + shuffleId))
- }
- } else {
- statuses.synchronized {
- return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses)
- }
- }
- }
-
- private def cleanup(cleanupTime: Long) {
- mapStatuses.clearOldValues(cleanupTime)
- cachedSerializedStatuses.clearOldValues(cleanupTime)
- }
-
- def stop() {
- communicate(StopMapOutputTracker)
- mapStatuses.clear()
- metadataCleaner.cancel()
- trackerActor = null
- }
-
- // Called on master to increment the epoch number
- def incrementEpoch() {
- epochLock.synchronized {
- epoch += 1
- logDebug("Increasing epoch to " + epoch)
- }
- }
-
- // Called on master or workers to get current epoch number
- def getEpoch: Long = {
- epochLock.synchronized {
- return epoch
- }
- }
-
- // Called on workers to update the epoch number, potentially clearing old outputs
- // because of a fetch failure. (Each worker task calls this with the latest epoch
- // number on the master at the time it was created.)
- def updateEpoch(newEpoch: Long) {
- epochLock.synchronized {
- if (newEpoch > epoch) {
- logInfo("Updating epoch to " + newEpoch + " and clearing cache")
- // mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]]
- mapStatuses.clear()
- epoch = newEpoch
- }
- }
- }
-
- def getSerializedLocations(shuffleId: Int): Array[Byte] = {
- var statuses: Array[MapStatus] = null
- var epochGotten: Long = -1
- epochLock.synchronized {
- if (epoch > cacheEpoch) {
- cachedSerializedStatuses.clear()
- cacheEpoch = epoch
- }
- cachedSerializedStatuses.get(shuffleId) match {
- case Some(bytes) =>
- return bytes
- case None =>
- statuses = mapStatuses(shuffleId)
- epochGotten = epoch
- }
- }
- // If we got here, we failed to find the serialized locations in the cache, so we pulled
- // out a snapshot of the locations as "locs"; let's serialize and return that
- val bytes = serializeStatuses(statuses)
- logInfo("Size of output statuses for shuffle %d is %d bytes".format(shuffleId, bytes.length))
- // Add them into the table only if the epoch hasn't changed while we were working
- epochLock.synchronized {
- if (epoch == epochGotten) {
- cachedSerializedStatuses(shuffleId) = bytes
- }
- }
- return bytes
- }
-
- // Serialize an array of map output locations into an efficient byte format so that we can send
- // it to reduce tasks. We do this by compressing the serialized bytes using GZIP. They will
- // generally be pretty compressible because many map outputs will be on the same hostname.
- private def serializeStatuses(statuses: Array[MapStatus]): Array[Byte] = {
- val out = new ByteArrayOutputStream
- val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
- // Since statuses can be modified in parallel, sync on it
- statuses.synchronized {
- objOut.writeObject(statuses)
- }
- objOut.close()
- out.toByteArray
- }
-
- // Opposite of serializeStatuses.
- def deserializeStatuses(bytes: Array[Byte]): Array[MapStatus] = {
- val objIn = new ObjectInputStream(new GZIPInputStream(new ByteArrayInputStream(bytes)))
- objIn.readObject().
- // // drop all null's from status - not sure why they are occuring though. Causes NPE downstream in slave if present
- // comment this out - nulls could be due to missing location ?
- asInstanceOf[Array[MapStatus]] // .filter( _ != null )
- }
-}
-
-private[spark] object MapOutputTracker {
- private val LOG_BASE = 1.1
-
- // Convert an array of MapStatuses to locations and sizes for a given reduce ID. If
- // any of the statuses is null (indicating a missing location due to a failed mapper),
- // throw a FetchFailedException.
- private def convertMapStatuses(
- shuffleId: Int,
- reduceId: Int,
- statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = {
- assert (statuses != null)
- statuses.map {
- status =>
- if (status == null) {
- throw new FetchFailedException(null, shuffleId, -1, reduceId,
- new Exception("Missing an output location for shuffle " + shuffleId))
- } else {
- (status.location, decompressSize(status.compressedSizes(reduceId)))
- }
- }
- }
-
- /**
- * Compress a size in bytes to 8 bits for efficient reporting of map output sizes.
- * We do this by encoding the log base 1.1 of the size as an integer, which can support
- * sizes up to 35 GB with at most 10% error.
- */
- def compressSize(size: Long): Byte = {
- if (size == 0) {
- 0
- } else if (size <= 1L) {
- 1
- } else {
- math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
- }
- }
-
- /**
- * Decompress an 8-bit encoded block size, using the reverse operation of compressSize.
- */
- def decompressSize(compressedSize: Byte): Long = {
- if (compressedSize == 0) {
- 0
- } else {
- math.pow(LOG_BASE, (compressedSize & 0xFF)).toLong
- }
- }
-}