You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:48 UTC
[64/69] [abbrv] Move some classes to more appropriate packages:
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/RDD.scala b/core/src/main/scala/org/apache/spark/RDD.scala
deleted file mode 100644
index 0d1f07f..0000000
--- a/core/src/main/scala/org/apache/spark/RDD.scala
+++ /dev/null
@@ -1,957 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.util.Random
-
-import scala.collection.Map
-import scala.collection.JavaConversions.mapAsScalaMap
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.TextOutputFormat
-
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-
-import org.apache.spark.Partitioner._
-import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.partial.BoundedDouble
-import org.apache.spark.partial.CountEvaluator
-import org.apache.spark.partial.GroupedCountEvaluator
-import org.apache.spark.partial.PartialResult
-import org.apache.spark.rdd.CoalescedRDD
-import org.apache.spark.rdd.CartesianRDD
-import org.apache.spark.rdd.FilteredRDD
-import org.apache.spark.rdd.FlatMappedRDD
-import org.apache.spark.rdd.GlommedRDD
-import org.apache.spark.rdd.MappedRDD
-import org.apache.spark.rdd.MapPartitionsRDD
-import org.apache.spark.rdd.MapPartitionsWithIndexRDD
-import org.apache.spark.rdd.PipedRDD
-import org.apache.spark.rdd.SampledRDD
-import org.apache.spark.rdd.ShuffledRDD
-import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.rdd.ZippedRDD
-import org.apache.spark.rdd.ZippedPartitionsRDD2
-import org.apache.spark.rdd.ZippedPartitionsRDD3
-import org.apache.spark.rdd.ZippedPartitionsRDD4
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.BoundedPriorityQueue
-
-import SparkContext._
-
-/**
- * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
- * partitioned collection of elements that can be operated on in parallel. This class contains the
- * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
- * [[org.apache.spark.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such
- * as `groupByKey` and `join`; [[org.apache.spark.DoubleRDDFunctions]] contains operations available only on
- * RDDs of Doubles; and [[org.apache.spark.SequenceFileRDDFunctions]] contains operations available on RDDs
- * that can be saved as SequenceFiles. These operations are automatically available on any RDD of
- * the right type (e.g. RDD[(Int, Int)] through implicit conversions when you
- * `import org.apache.spark.SparkContext._`.
- *
- * Internally, each RDD is characterized by five main properties:
- *
- * - A list of partitions
- * - A function for computing each split
- * - A list of dependencies on other RDDs
- * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
- * an HDFS file)
- *
- * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
- * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
- * reading data from a new storage system) by overriding these functions. Please refer to the
- * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
- * on RDD internals.
- */
-abstract class RDD[T: ClassManifest](
- @transient private var sc: SparkContext,
- @transient private var deps: Seq[Dependency[_]]
- ) extends Serializable with Logging {
-
- /** Construct an RDD with just a one-to-one dependency on one parent */
- def this(@transient oneParent: RDD[_]) =
- this(oneParent.context , List(new OneToOneDependency(oneParent)))
-
- // =======================================================================
- // Methods that should be implemented by subclasses of RDD
- // =======================================================================
-
- /** Implemented by subclasses to compute a given partition. */
- def compute(split: Partition, context: TaskContext): Iterator[T]
-
- /**
- * Implemented by subclasses to return the set of partitions in this RDD. This method will only
- * be called once, so it is safe to implement a time-consuming computation in it.
- */
- protected def getPartitions: Array[Partition]
-
- /**
- * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
- * be called once, so it is safe to implement a time-consuming computation in it.
- */
- protected def getDependencies: Seq[Dependency[_]] = deps
-
- /** Optionally overridden by subclasses to specify placement preferences. */
- protected def getPreferredLocations(split: Partition): Seq[String] = Nil
-
- /** Optionally overridden by subclasses to specify how they are partitioned. */
- val partitioner: Option[Partitioner] = None
-
- // =======================================================================
- // Methods and fields available on all RDDs
- // =======================================================================
-
- /** The SparkContext that created this RDD. */
- def sparkContext: SparkContext = sc
-
- /** A unique ID for this RDD (within its SparkContext). */
- val id: Int = sc.newRddId()
-
- /** A friendly name for this RDD */
- var name: String = null
-
- /** Assign a name to this RDD */
- def setName(_name: String) = {
- name = _name
- this
- }
-
- /** User-defined generator of this RDD*/
- var generator = Utils.getCallSiteInfo.firstUserClass
-
- /** Reset generator*/
- def setGenerator(_generator: String) = {
- generator = _generator
- }
-
- /**
- * Set this RDD's storage level to persist its values across operations after the first time
- * it is computed. This can only be used to assign a new storage level if the RDD does not
- * have a storage level set yet..
- */
- def persist(newLevel: StorageLevel): RDD[T] = {
- // TODO: Handle changes of StorageLevel
- if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
- throw new UnsupportedOperationException(
- "Cannot change storage level of an RDD after it was already assigned a level")
- }
- storageLevel = newLevel
- // Register the RDD with the SparkContext
- sc.persistentRdds(id) = this
- this
- }
-
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
-
- /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
- def cache(): RDD[T] = persist()
-
- /**
- * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
- *
- * @param blocking Whether to block until all blocks are deleted.
- * @return This RDD.
- */
- def unpersist(blocking: Boolean = true): RDD[T] = {
- logInfo("Removing RDD " + id + " from persistence list")
- sc.env.blockManager.master.removeRdd(id, blocking)
- sc.persistentRdds.remove(id)
- storageLevel = StorageLevel.NONE
- this
- }
-
- /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
- def getStorageLevel = storageLevel
-
- // Our dependencies and partitions will be gotten by calling subclass's methods below, and will
- // be overwritten when we're checkpointed
- private var dependencies_ : Seq[Dependency[_]] = null
- @transient private var partitions_ : Array[Partition] = null
-
- /** An Option holding our checkpoint RDD, if we are checkpointed */
- private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
-
- /**
- * Get the list of dependencies of this RDD, taking into account whether the
- * RDD is checkpointed or not.
- */
- final def dependencies: Seq[Dependency[_]] = {
- checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
- if (dependencies_ == null) {
- dependencies_ = getDependencies
- }
- dependencies_
- }
- }
-
- /**
- * Get the array of partitions of this RDD, taking into account whether the
- * RDD is checkpointed or not.
- */
- final def partitions: Array[Partition] = {
- checkpointRDD.map(_.partitions).getOrElse {
- if (partitions_ == null) {
- partitions_ = getPartitions
- }
- partitions_
- }
- }
-
- /**
- * Get the preferred locations of a partition (as hostnames), taking into account whether the
- * RDD is checkpointed.
- */
- final def preferredLocations(split: Partition): Seq[String] = {
- checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
- getPreferredLocations(split)
- }
- }
-
- /**
- * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
- * This should ''not'' be called by users directly, but is available for implementors of custom
- * subclasses of RDD.
- */
- final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
- if (storageLevel != StorageLevel.NONE) {
- SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
- } else {
- computeOrReadCheckpoint(split, context)
- }
- }
-
- /**
- * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
- */
- private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
- if (isCheckpointed) {
- firstParent[T].iterator(split, context)
- } else {
- compute(split, context)
- }
- }
-
- // Transformations (return a new RDD)
-
- /**
- * Return a new RDD by applying a function to all elements of this RDD.
- */
- def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
-
- /**
- * Return a new RDD by first applying a function to all elements of this
- * RDD, and then flattening the results.
- */
- def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
- new FlatMappedRDD(this, sc.clean(f))
-
- /**
- * Return a new RDD containing only the elements that satisfy a predicate.
- */
- def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
-
- /**
- * Return a new RDD containing the distinct elements in this RDD.
- */
- def distinct(numPartitions: Int): RDD[T] =
- map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
-
- def distinct(): RDD[T] = distinct(partitions.size)
-
- /**
- * Return a new RDD that is reduced into `numPartitions` partitions.
- */
- def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
- if (shuffle) {
- // include a shuffle step so that our upstream tasks are still distributed
- new CoalescedRDD(
- new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
- new HashPartitioner(numPartitions)),
- numPartitions).keys
- } else {
- new CoalescedRDD(this, numPartitions)
- }
- }
-
- /**
- * Return a sampled subset of this RDD.
- */
- def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
- new SampledRDD(this, withReplacement, fraction, seed)
-
- def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
- var fraction = 0.0
- var total = 0
- val multiplier = 3.0
- val initialCount = this.count()
- var maxSelected = 0
-
- if (num < 0) {
- throw new IllegalArgumentException("Negative number of elements requested")
- }
-
- if (initialCount > Integer.MAX_VALUE - 1) {
- maxSelected = Integer.MAX_VALUE - 1
- } else {
- maxSelected = initialCount.toInt
- }
-
- if (num > initialCount && !withReplacement) {
- total = maxSelected
- fraction = multiplier * (maxSelected + 1) / initialCount
- } else {
- fraction = multiplier * (num + 1) / initialCount
- total = num
- }
-
- val rand = new Random(seed)
- var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
-
- // If the first sample didn't turn out large enough, keep trying to take samples;
- // this shouldn't happen often because we use a big multiplier for thei initial size
- while (samples.length < total) {
- samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
- }
-
- Utils.randomizeInPlace(samples, rand).take(total)
- }
-
- /**
- * Return the union of this RDD and another one. Any identical elements will appear multiple
- * times (use `.distinct()` to eliminate them).
- */
- def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
-
- /**
- * Return the union of this RDD and another one. Any identical elements will appear multiple
- * times (use `.distinct()` to eliminate them).
- */
- def ++(other: RDD[T]): RDD[T] = this.union(other)
-
- /**
- * Return an RDD created by coalescing all elements within each partition into an array.
- */
- def glom(): RDD[Array[T]] = new GlommedRDD(this)
-
- /**
- * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
- * elements (a, b) where a is in `this` and b is in `other`.
- */
- def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
-
- /**
- * Return an RDD of grouped items.
- */
- def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
- groupBy[K](f, defaultPartitioner(this))
-
- /**
- * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
- * mapping to that key.
- */
- def groupBy[K: ClassManifest](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
- groupBy(f, new HashPartitioner(numPartitions))
-
- /**
- * Return an RDD of grouped items.
- */
- def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
- val cleanF = sc.clean(f)
- this.map(t => (cleanF(t), t)).groupByKey(p)
- }
-
- /**
- * Return an RDD created by piping elements to a forked external process.
- */
- def pipe(command: String): RDD[String] = new PipedRDD(this, command)
-
- /**
- * Return an RDD created by piping elements to a forked external process.
- */
- def pipe(command: String, env: Map[String, String]): RDD[String] =
- new PipedRDD(this, command, env)
-
-
- /**
- * Return an RDD created by piping elements to a forked external process.
- * The print behavior can be customized by providing two functions.
- *
- * @param command command to run in forked process.
- * @param env environment variables to set.
- * @param printPipeContext Before piping elements, this function is called as an oppotunity
- * to pipe context data. Print line function (like out.println) will be
- * passed as printPipeContext's parameter.
- * @param printRDDElement Use this function to customize how to pipe elements. This function
- * will be called with each RDD element as the 1st parameter, and the
- * print line function (like out.println()) as the 2nd parameter.
- * An example of pipe the RDD data of groupBy() in a streaming way,
- * instead of constructing a huge String to concat all the elements:
- * def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
- * for (e <- record._2){f(e)}
- * @return the result RDD
- */
- def pipe(
- command: Seq[String],
- env: Map[String, String] = Map(),
- printPipeContext: (String => Unit) => Unit = null,
- printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
- new PipedRDD(this, command, env,
- if (printPipeContext ne null) sc.clean(printPipeContext) else null,
- if (printRDDElement ne null) sc.clean(printRDDElement) else null)
-
- /**
- * Return a new RDD by applying a function to each partition of this RDD.
- */
- def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U] =
- new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
-
- /**
- * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
- * of the original partition.
- */
- def mapPartitionsWithIndex[U: ClassManifest](
- f: (Int, Iterator[T]) => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U] =
- new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
-
- /**
- * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
- * of the original partition.
- */
- @deprecated("use mapPartitionsWithIndex", "0.7.0")
- def mapPartitionsWithSplit[U: ClassManifest](
- f: (Int, Iterator[T]) => Iterator[U],
- preservesPartitioning: Boolean = false): RDD[U] =
- new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
-
- /**
- * Maps f over this RDD, where f takes an additional parameter of type A. This
- * additional parameter is produced by constructA, which is called in each
- * partition with the index of that partition.
- */
- def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
- (f:(T, A) => U): RDD[U] = {
- def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
- val a = constructA(index)
- iter.map(t => f(t, a))
- }
- new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
- }
-
- /**
- * FlatMaps f over this RDD, where f takes an additional parameter of type A. This
- * additional parameter is produced by constructA, which is called in each
- * partition with the index of that partition.
- */
- def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
- (f:(T, A) => Seq[U]): RDD[U] = {
- def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
- val a = constructA(index)
- iter.flatMap(t => f(t, a))
- }
- new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
- }
-
- /**
- * Applies f to each element of this RDD, where f takes an additional parameter of type A.
- * This additional parameter is produced by constructA, which is called in each
- * partition with the index of that partition.
- */
- def foreachWith[A: ClassManifest](constructA: Int => A)
- (f:(T, A) => Unit) {
- def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
- val a = constructA(index)
- iter.map(t => {f(t, a); t})
- }
- (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
- }
-
- /**
- * Filters this RDD with p, where p takes an additional parameter of type A. This
- * additional parameter is produced by constructA, which is called in each
- * partition with the index of that partition.
- */
- def filterWith[A: ClassManifest](constructA: Int => A)
- (p:(T, A) => Boolean): RDD[T] = {
- def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
- val a = constructA(index)
- iter.filter(t => p(t, a))
- }
- new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
- }
-
- /**
- * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
- * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
- * partitions* and the *same number of elements in each partition* (e.g. one was made through
- * a map on the other).
- */
- def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
-
- /**
- * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
- * applying a function to the zipped partitions. Assumes that all the RDDs have the
- * *same number of partitions*, but does *not* require them to have the same number
- * of elements in each partition.
- */
- def zipPartitions[B: ClassManifest, V: ClassManifest]
- (rdd2: RDD[B])
- (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
-
- def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
- (rdd2: RDD[B], rdd3: RDD[C])
- (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
-
- def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
- (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
- (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
- new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
-
-
- // Actions (launch a job to return a value to the user program)
-
- /**
- * Applies a function f to all elements of this RDD.
- */
- def foreach(f: T => Unit) {
- val cleanF = sc.clean(f)
- sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
- }
-
- /**
- * Applies a function f to each partition of this RDD.
- */
- def foreachPartition(f: Iterator[T] => Unit) {
- val cleanF = sc.clean(f)
- sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
- }
-
- /**
- * Return an array that contains all of the elements in this RDD.
- */
- def collect(): Array[T] = {
- val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
- Array.concat(results: _*)
- }
-
- /**
- * Return an array that contains all of the elements in this RDD.
- */
- def toArray(): Array[T] = collect()
-
- /**
- * Return an RDD that contains all matching values by applying `f`.
- */
- def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = {
- filter(f.isDefinedAt).map(f)
- }
-
- /**
- * Return an RDD with the elements from `this` that are not in `other`.
- *
- * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
- * RDD will be <= us.
- */
- def subtract(other: RDD[T]): RDD[T] =
- subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
-
- /**
- * Return an RDD with the elements from `this` that are not in `other`.
- */
- def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
- subtract(other, new HashPartitioner(numPartitions))
-
- /**
- * Return an RDD with the elements from `this` that are not in `other`.
- */
- def subtract(other: RDD[T], p: Partitioner): RDD[T] = {
- if (partitioner == Some(p)) {
- // Our partitioner knows how to handle T (which, since we have a partitioner, is
- // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
- val p2 = new Partitioner() {
- override def numPartitions = p.numPartitions
- override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)
- }
- // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
- // anyway, and when calling .keys, will not have a partitioner set, even though
- // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
- // partitioned by the right/real keys (e.g. p).
- this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
- } else {
- this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
- }
- }
-
- /**
- * Reduces the elements of this RDD using the specified commutative and associative binary operator.
- */
- def reduce(f: (T, T) => T): T = {
- val cleanF = sc.clean(f)
- val reducePartition: Iterator[T] => Option[T] = iter => {
- if (iter.hasNext) {
- Some(iter.reduceLeft(cleanF))
- } else {
- None
- }
- }
- var jobResult: Option[T] = None
- val mergeResult = (index: Int, taskResult: Option[T]) => {
- if (taskResult != None) {
- jobResult = jobResult match {
- case Some(value) => Some(f(value, taskResult.get))
- case None => taskResult
- }
- }
- }
- sc.runJob(this, reducePartition, mergeResult)
- // Get the final result out of our Option, or throw an exception if the RDD was empty
- jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
- }
-
- /**
- * Aggregate the elements of each partition, and then the results for all the partitions, using a
- * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
- * modify t1 and return it as its result value to avoid object allocation; however, it should not
- * modify t2.
- */
- def fold(zeroValue: T)(op: (T, T) => T): T = {
- // Clone the zero value since we will also be serializing it as part of tasks
- var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
- val cleanOp = sc.clean(op)
- val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
- val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
- sc.runJob(this, foldPartition, mergeResult)
- jobResult
- }
-
- /**
- * Aggregate the elements of each partition, and then the results for all the partitions, using
- * given combine functions and a neutral "zero value". This function can return a different result
- * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
- * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
- * allowed to modify and return their first argument instead of creating a new U to avoid memory
- * allocation.
- */
- def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
- // Clone the zero value since we will also be serializing it as part of tasks
- var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
- val cleanSeqOp = sc.clean(seqOp)
- val cleanCombOp = sc.clean(combOp)
- val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
- val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
- sc.runJob(this, aggregatePartition, mergeResult)
- jobResult
- }
-
- /**
- * Return the number of elements in the RDD.
- */
- def count(): Long = {
- sc.runJob(this, (iter: Iterator[T]) => {
- var result = 0L
- while (iter.hasNext) {
- result += 1L
- iter.next()
- }
- result
- }).sum
- }
-
- /**
- * (Experimental) Approximate version of count() that returns a potentially incomplete result
- * within a timeout, even if not all tasks have finished.
- */
- def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
- val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
- var result = 0L
- while (iter.hasNext) {
- result += 1L
- iter.next()
- }
- result
- }
- val evaluator = new CountEvaluator(partitions.size, confidence)
- sc.runApproximateJob(this, countElements, evaluator, timeout)
- }
-
- /**
- * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
- * combine step happens locally on the master, equivalent to running a single reduce task.
- */
- def countByValue(): Map[T, Long] = {
- if (elementClassManifest.erasure.isArray) {
- throw new SparkException("countByValue() does not support arrays")
- }
- // TODO: This should perhaps be distributed by default.
- def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
- val map = new OLMap[T]
- while (iter.hasNext) {
- val v = iter.next()
- map.put(v, map.getLong(v) + 1L)
- }
- Iterator(map)
- }
- def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
- val iter = m2.object2LongEntrySet.fastIterator()
- while (iter.hasNext) {
- val entry = iter.next()
- m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
- }
- return m1
- }
- val myResult = mapPartitions(countPartition).reduce(mergeMaps)
- myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map
- }
-
- /**
- * (Experimental) Approximate version of countByValue().
- */
- def countByValueApprox(
- timeout: Long,
- confidence: Double = 0.95
- ): PartialResult[Map[T, BoundedDouble]] = {
- if (elementClassManifest.erasure.isArray) {
- throw new SparkException("countByValueApprox() does not support arrays")
- }
- val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
- val map = new OLMap[T]
- while (iter.hasNext) {
- val v = iter.next()
- map.put(v, map.getLong(v) + 1L)
- }
- map
- }
- val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence)
- sc.runApproximateJob(this, countPartition, evaluator, timeout)
- }
-
- /**
- * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
- * it will be slow if a lot of partitions are required. In that case, use collect() to get the
- * whole RDD instead.
- */
- def take(num: Int): Array[T] = {
- if (num == 0) {
- return new Array[T](0)
- }
- val buf = new ArrayBuffer[T]
- var p = 0
- while (buf.size < num && p < partitions.size) {
- val left = num - buf.size
- val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
- buf ++= res(0)
- if (buf.size == num)
- return buf.toArray
- p += 1
- }
- return buf.toArray
- }
-
- /**
- * Return the first element in this RDD.
- */
- def first(): T = take(1) match {
- case Array(t) => t
- case _ => throw new UnsupportedOperationException("empty collection")
- }
-
- /**
- * Returns the top K elements from this RDD as defined by
- * the specified implicit Ordering[T].
- * @param num the number of top elements to return
- * @param ord the implicit ordering for T
- * @return an array of top elements
- */
- def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
- mapPartitions { items =>
- val queue = new BoundedPriorityQueue[T](num)
- queue ++= items
- Iterator.single(queue)
- }.reduce { (queue1, queue2) =>
- queue1 ++= queue2
- queue1
- }.toArray.sorted(ord.reverse)
- }
-
- /**
- * Returns the first K elements from this RDD as defined by
- * the specified implicit Ordering[T] and maintains the
- * ordering.
- * @param num the number of top elements to return
- * @param ord the implicit ordering for T
- * @return an array of top elements
- */
- def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
-
- /**
- * Save this RDD as a text file, using string representations of elements.
- */
- def saveAsTextFile(path: String) {
- this.map(x => (NullWritable.get(), new Text(x.toString)))
- .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
- }
-
- /**
- * Save this RDD as a compressed text file, using string representations of elements.
- */
- def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
- this.map(x => (NullWritable.get(), new Text(x.toString)))
- .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
- }
-
- /**
- * Save this RDD as a SequenceFile of serialized objects.
- */
- def saveAsObjectFile(path: String) {
- this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
- .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
- .saveAsSequenceFile(path)
- }
-
- /**
- * Creates tuples of the elements in this RDD by applying `f`.
- */
- def keyBy[K](f: T => K): RDD[(K, T)] = {
- map(x => (f(x), x))
- }
-
- /** A private method for tests, to look at the contents of each partition */
- private[spark] def collectPartitions(): Array[Array[T]] = {
- sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
- }
-
- /**
- * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
- * directory set with SparkContext.setCheckpointDir() and all references to its parent
- * RDDs will be removed. This function must be called before any job has been
- * executed on this RDD. It is strongly recommended that this RDD is persisted in
- * memory, otherwise saving it on a file will require recomputation.
- */
- def checkpoint() {
- if (context.checkpointDir.isEmpty) {
- throw new Exception("Checkpoint directory has not been set in the SparkContext")
- } else if (checkpointData.isEmpty) {
- checkpointData = Some(new RDDCheckpointData(this))
- checkpointData.get.markForCheckpoint()
- }
- }
-
- /**
- * Return whether this RDD has been checkpointed or not
- */
- def isCheckpointed: Boolean = {
- checkpointData.map(_.isCheckpointed).getOrElse(false)
- }
-
- /**
- * Gets the name of the file to which this RDD was checkpointed
- */
- def getCheckpointFile: Option[String] = {
- checkpointData.flatMap(_.getCheckpointFile)
- }
-
- // =======================================================================
- // Other internal methods and fields
- // =======================================================================
-
- private var storageLevel: StorageLevel = StorageLevel.NONE
-
- /** Record user function generating this RDD. */
- private[spark] val origin = Utils.formatSparkCallSite
-
- private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
-
- private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
-
- /** Returns the first parent RDD */
- protected[spark] def firstParent[U: ClassManifest] = {
- dependencies.head.rdd.asInstanceOf[RDD[U]]
- }
-
- /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
- def context = sc
-
- // Avoid handling doCheckpoint multiple times to prevent excessive recursion
- private var doCheckpointCalled = false
-
- /**
- * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
- * after a job using this RDD has completed (therefore the RDD has been materialized and
- * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
- */
- private[spark] def doCheckpoint() {
- if (!doCheckpointCalled) {
- doCheckpointCalled = true
- if (checkpointData.isDefined) {
- checkpointData.get.doCheckpoint()
- } else {
- dependencies.foreach(_.rdd.doCheckpoint())
- }
- }
- }
-
- /**
- * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
- * created from the checkpoint file, and forget its old dependencies and partitions.
- */
- private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
- clearDependencies()
- partitions_ = null
- deps = null // Forget the constructor argument for dependencies too
- }
-
- /**
- * Clears the dependencies of this RDD. This method must ensure that all references
- * to the original parent RDDs is removed to enable the parent RDDs to be garbage
- * collected. Subclasses of RDD may override this method for implementing their own cleaning
- * logic. See [[org.apache.spark.rdd.UnionRDD]] for an example.
- */
- protected def clearDependencies() {
- dependencies_ = null
- }
-
- /** A description of this RDD and its recursive dependencies for debugging. */
- def toDebugString: String = {
- def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
- Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++
- rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " "))
- }
- debugString(this).mkString("\n")
- }
-
- override def toString: String = "%s%s[%d] at %s".format(
- Option(name).map(_ + " ").getOrElse(""),
- getClass.getSimpleName,
- id,
- origin)
-
- def toJavaRDD() : JavaRDD[T] = {
- new JavaRDD(this)(elementClassManifest)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/RDDCheckpointData.scala
deleted file mode 100644
index 0334de6..0000000
--- a/core/src/main/scala/org/apache/spark/RDDCheckpointData.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
-import rdd.{CheckpointRDD, CoalescedRDD}
-import scheduler.{ResultTask, ShuffleMapTask}
-
-/**
- * Enumeration to manage state transitions of an RDD through checkpointing
- * [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]
- */
-private[spark] object CheckpointState extends Enumeration {
- type CheckpointState = Value
- val Initialized, MarkedForCheckpoint, CheckpointingInProgress, Checkpointed = Value
-}
-
-/**
- * This class contains all the information related to RDD checkpointing. Each instance of this class
- * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as,
- * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
- * of the checkpointed RDD.
- */
-private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
- extends Logging with Serializable {
-
- import CheckpointState._
-
- // The checkpoint state of the associated RDD.
- var cpState = Initialized
-
- // The file to which the associated RDD has been checkpointed to
- @transient var cpFile: Option[String] = None
-
- // The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD.
- var cpRDD: Option[RDD[T]] = None
-
- // Mark the RDD for checkpointing
- def markForCheckpoint() {
- RDDCheckpointData.synchronized {
- if (cpState == Initialized) cpState = MarkedForCheckpoint
- }
- }
-
- // Is the RDD already checkpointed
- def isCheckpointed: Boolean = {
- RDDCheckpointData.synchronized { cpState == Checkpointed }
- }
-
- // Get the file to which this RDD was checkpointed to as an Option
- def getCheckpointFile: Option[String] = {
- RDDCheckpointData.synchronized { cpFile }
- }
-
- // Do the checkpointing of the RDD. Called after the first job using that RDD is over.
- def doCheckpoint() {
- // If it is marked for checkpointing AND checkpointing is not already in progress,
- // then set it to be in progress, else return
- RDDCheckpointData.synchronized {
- if (cpState == MarkedForCheckpoint) {
- cpState = CheckpointingInProgress
- } else {
- return
- }
- }
-
- // Create the output path for the checkpoint
- val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
- val fs = path.getFileSystem(new Configuration())
- if (!fs.mkdirs(path)) {
- throw new SparkException("Failed to create checkpoint path " + path)
- }
-
- // Save to file, and reload it as an RDD
- rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
- val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
-
- // Change the dependencies and partitions of the RDD
- RDDCheckpointData.synchronized {
- cpFile = Some(path.toString)
- cpRDD = Some(newRDD)
- rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
- cpState = Checkpointed
- RDDCheckpointData.clearTaskCaches()
- logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
- }
- }
-
- // Get preferred location of a split after checkpointing
- def getPreferredLocations(split: Partition): Seq[String] = {
- RDDCheckpointData.synchronized {
- cpRDD.get.preferredLocations(split)
- }
- }
-
- def getPartitions: Array[Partition] = {
- RDDCheckpointData.synchronized {
- cpRDD.get.partitions
- }
- }
-
- def checkpointRDD: Option[RDD[T]] = {
- RDDCheckpointData.synchronized {
- cpRDD
- }
- }
-}
-
-private[spark] object RDDCheckpointData {
- def clearTaskCaches() {
- ShuffleMapTask.clearCache()
- ResultTask.clearCache()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/SequenceFileRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/SequenceFileRDDFunctions.scala
deleted file mode 100644
index d58fb4e..0000000
--- a/core/src/main/scala/org/apache/spark/SequenceFileRDDFunctions.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.EOFException
-import java.net.URL
-import java.io.ObjectInputStream
-import java.util.concurrent.atomic.AtomicLong
-import java.util.HashSet
-import java.util.Random
-import java.util.Date
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.Map
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.mapred.TextOutputFormat
-import org.apache.hadoop.mapred.SequenceFileOutputFormat
-import org.apache.hadoop.mapred.OutputCommitter
-import org.apache.hadoop.mapred.FileOutputCommitter
-import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.Text
-
-import org.apache.spark.SparkContext._
-
-/**
- * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile,
- * through an implicit conversion. Note that this can't be part of PairRDDFunctions because
- * we need more implicit parameters to convert our keys and values to Writable.
- *
- * Users should import `spark.SparkContext._` at the top of their program to use these functions.
- */
-class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
- self: RDD[(K, V)])
- extends Logging
- with Serializable {
-
- private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
- val c = {
- if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
- classManifest[T].erasure
- } else {
- // We get the type of the Writable class by looking at the apply method which converts
- // from T to Writable. Since we have two apply methods we filter out the one which
- // is not of the form "java.lang.Object apply(java.lang.Object)"
- implicitly[T => Writable].getClass.getDeclaredMethods().filter(
- m => m.getReturnType().toString != "class java.lang.Object" &&
- m.getName() == "apply")(0).getReturnType
-
- }
- // TODO: use something like WritableConverter to avoid reflection
- }
- c.asInstanceOf[Class[_ <: Writable]]
- }
-
- /**
- * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
- * and value types. If the key or value are Writable, then we use their classes directly;
- * otherwise we map primitive types such as Int and Double to IntWritable, DoubleWritable, etc,
- * byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported
- * file system.
- */
- def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
- def anyToWritable[U <% Writable](u: U): Writable = u
-
- val keyClass = getWritableClass[K]
- val valueClass = getWritableClass[V]
- val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
- val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
-
- logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
- val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
- val jobConf = new JobConf(self.context.hadoopConfiguration)
- if (!convertKey && !convertValue) {
- self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
- } else if (!convertKey && convertValue) {
- self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
- path, keyClass, valueClass, format, jobConf, codec)
- } else if (convertKey && !convertValue) {
- self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
- path, keyClass, valueClass, format, jobConf, codec)
- } else if (convertKey && convertValue) {
- self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
- path, keyClass, valueClass, format, jobConf, codec)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/SizeEstimator.scala
deleted file mode 100644
index 4bfc837..0000000
--- a/core/src/main/scala/org/apache/spark/SizeEstimator.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.lang.reflect.Field
-import java.lang.reflect.Modifier
-import java.lang.reflect.{Array => JArray}
-import java.util.IdentityHashMap
-import java.util.concurrent.ConcurrentHashMap
-import java.util.Random
-
-import javax.management.MBeanServer
-import java.lang.management.ManagementFactory
-
-import scala.collection.mutable.ArrayBuffer
-
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet
-
-/**
- * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
- * memory-aware caches.
- *
- * Based on the following JavaWorld article:
- * http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
- */
-private[spark] object SizeEstimator extends Logging {
-
- // Sizes of primitive types
- private val BYTE_SIZE = 1
- private val BOOLEAN_SIZE = 1
- private val CHAR_SIZE = 2
- private val SHORT_SIZE = 2
- private val INT_SIZE = 4
- private val LONG_SIZE = 8
- private val FLOAT_SIZE = 4
- private val DOUBLE_SIZE = 8
-
- // Alignment boundary for objects
- // TODO: Is this arch dependent ?
- private val ALIGN_SIZE = 8
-
- // A cache of ClassInfo objects for each class
- private val classInfos = new ConcurrentHashMap[Class[_], ClassInfo]
-
- // Object and pointer sizes are arch dependent
- private var is64bit = false
-
- // Size of an object reference
- // Based on https://wikis.oracle.com/display/HotSpotInternals/CompressedOops
- private var isCompressedOops = false
- private var pointerSize = 4
-
- // Minimum size of a java.lang.Object
- private var objectSize = 8
-
- initialize()
-
- // Sets object size, pointer size based on architecture and CompressedOops settings
- // from the JVM.
- private def initialize() {
- is64bit = System.getProperty("os.arch").contains("64")
- isCompressedOops = getIsCompressedOops
-
- objectSize = if (!is64bit) 8 else {
- if(!isCompressedOops) {
- 16
- } else {
- 12
- }
- }
- pointerSize = if (is64bit && !isCompressedOops) 8 else 4
- classInfos.clear()
- classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
- }
-
- private def getIsCompressedOops : Boolean = {
- if (System.getProperty("spark.test.useCompressedOops") != null) {
- return System.getProperty("spark.test.useCompressedOops").toBoolean
- }
-
- try {
- val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic"
- val server = ManagementFactory.getPlatformMBeanServer()
-
- // NOTE: This should throw an exception in non-Sun JVMs
- val hotSpotMBeanClass = Class.forName("com.sun.management.HotSpotDiagnosticMXBean")
- val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption",
- Class.forName("java.lang.String"))
-
- val bean = ManagementFactory.newPlatformMXBeanProxy(server,
- hotSpotMBeanName, hotSpotMBeanClass)
- // TODO: We could use reflection on the VMOption returned ?
- return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
- } catch {
- case e: Exception => {
- // Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
- val guess = Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
- val guessInWords = if (guess) "yes" else "not"
- logWarning("Failed to check whether UseCompressedOops is set; assuming " + guessInWords)
- return guess
- }
- }
- }
-
- /**
- * The state of an ongoing size estimation. Contains a stack of objects to visit as well as an
- * IdentityHashMap of visited objects, and provides utility methods for enqueueing new objects
- * to visit.
- */
- private class SearchState(val visited: IdentityHashMap[AnyRef, AnyRef]) {
- val stack = new ArrayBuffer[AnyRef]
- var size = 0L
-
- def enqueue(obj: AnyRef) {
- if (obj != null && !visited.containsKey(obj)) {
- visited.put(obj, null)
- stack += obj
- }
- }
-
- def isFinished(): Boolean = stack.isEmpty
-
- def dequeue(): AnyRef = {
- val elem = stack.last
- stack.trimEnd(1)
- return elem
- }
- }
-
- /**
- * Cached information about each class. We remember two things: the "shell size" of the class
- * (size of all non-static fields plus the java.lang.Object size), and any fields that are
- * pointers to objects.
- */
- private class ClassInfo(
- val shellSize: Long,
- val pointerFields: List[Field]) {}
-
- def estimate(obj: AnyRef): Long = estimate(obj, new IdentityHashMap[AnyRef, AnyRef])
-
- private def estimate(obj: AnyRef, visited: IdentityHashMap[AnyRef, AnyRef]): Long = {
- val state = new SearchState(visited)
- state.enqueue(obj)
- while (!state.isFinished) {
- visitSingleObject(state.dequeue(), state)
- }
- return state.size
- }
-
- private def visitSingleObject(obj: AnyRef, state: SearchState) {
- val cls = obj.getClass
- if (cls.isArray) {
- visitArray(obj, cls, state)
- } else if (obj.isInstanceOf[ClassLoader] || obj.isInstanceOf[Class[_]]) {
- // Hadoop JobConfs created in the interpreter have a ClassLoader, which greatly confuses
- // the size estimator since it references the whole REPL. Do nothing in this case. In
- // general all ClassLoaders and Classes will be shared between objects anyway.
- } else {
- val classInfo = getClassInfo(cls)
- state.size += classInfo.shellSize
- for (field <- classInfo.pointerFields) {
- state.enqueue(field.get(obj))
- }
- }
- }
-
- // Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
- private val ARRAY_SIZE_FOR_SAMPLING = 200
- private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
-
- private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) {
- val length = JArray.getLength(array)
- val elementClass = cls.getComponentType
-
- // Arrays have object header and length field which is an integer
- var arrSize: Long = alignSize(objectSize + INT_SIZE)
-
- if (elementClass.isPrimitive) {
- arrSize += alignSize(length * primitiveSize(elementClass))
- state.size += arrSize
- } else {
- arrSize += alignSize(length * pointerSize)
- state.size += arrSize
-
- if (length <= ARRAY_SIZE_FOR_SAMPLING) {
- for (i <- 0 until length) {
- state.enqueue(JArray.get(array, i))
- }
- } else {
- // Estimate the size of a large array by sampling elements without replacement.
- var size = 0.0
- val rand = new Random(42)
- val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE)
- for (i <- 0 until ARRAY_SAMPLE_SIZE) {
- var index = 0
- do {
- index = rand.nextInt(length)
- } while (drawn.contains(index))
- drawn.add(index)
- val elem = JArray.get(array, index)
- size += SizeEstimator.estimate(elem, state.visited)
- }
- state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong
- }
- }
- }
-
- private def primitiveSize(cls: Class[_]): Long = {
- if (cls == classOf[Byte])
- BYTE_SIZE
- else if (cls == classOf[Boolean])
- BOOLEAN_SIZE
- else if (cls == classOf[Char])
- CHAR_SIZE
- else if (cls == classOf[Short])
- SHORT_SIZE
- else if (cls == classOf[Int])
- INT_SIZE
- else if (cls == classOf[Long])
- LONG_SIZE
- else if (cls == classOf[Float])
- FLOAT_SIZE
- else if (cls == classOf[Double])
- DOUBLE_SIZE
- else throw new IllegalArgumentException(
- "Non-primitive class " + cls + " passed to primitiveSize()")
- }
-
- /**
- * Get or compute the ClassInfo for a given class.
- */
- private def getClassInfo(cls: Class[_]): ClassInfo = {
- // Check whether we've already cached a ClassInfo for this class
- val info = classInfos.get(cls)
- if (info != null) {
- return info
- }
-
- val parent = getClassInfo(cls.getSuperclass)
- var shellSize = parent.shellSize
- var pointerFields = parent.pointerFields
-
- for (field <- cls.getDeclaredFields) {
- if (!Modifier.isStatic(field.getModifiers)) {
- val fieldClass = field.getType
- if (fieldClass.isPrimitive) {
- shellSize += primitiveSize(fieldClass)
- } else {
- field.setAccessible(true) // Enable future get()'s on this field
- shellSize += pointerSize
- pointerFields = field :: pointerFields
- }
- }
- }
-
- shellSize = alignSize(shellSize)
-
- // Create and cache a new ClassInfo
- val newInfo = new ClassInfo(shellSize, pointerFields)
- classInfos.put(cls, newInfo)
- return newInfo
- }
-
- private def alignSize(size: Long): Long = {
- val rem = size % ALIGN_SIZE
- return if (rem == 0) size else (size + ALIGN_SIZE - rem)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1207b24..faf0c23 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -54,17 +54,15 @@ import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.deploy.LocalSparkCluster
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
- OrderedRDDFunctions}
+import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
+import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
-import scala.Some
+import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.RDDInfo
import org.apache.spark.storage.StorageStatus
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 6e6fe5d..478e5a0 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -29,7 +29,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster}
import org.apache.spark.network.ConnectionManager
import org.apache.spark.serializer.{Serializer, SerializerManager}
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{Utils, AkkaUtils}
import org.apache.spark.api.python.PythonWorkerFactory
@@ -155,10 +155,10 @@ object SparkEnv extends Logging {
val serializerManager = new SerializerManager
val serializer = serializerManager.setDefault(
- System.getProperty("spark.serializer", "org.apache.spark.JavaSerializer"))
+ System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
val closureSerializer = serializerManager.get(
- System.getProperty("spark.closure.serializer", "org.apache.spark.JavaSerializer"))
+ System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
if (isDriver) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Utils.scala b/core/src/main/scala/org/apache/spark/Utils.scala
deleted file mode 100644
index 1e17deb..0000000
--- a/core/src/main/scala/org/apache/spark/Utils.scala
+++ /dev/null
@@ -1,780 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io._
-import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
-import java.util.{Locale, Random, UUID}
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import java.util.regex.Pattern
-
-import scala.collection.Map
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.collection.JavaConversions._
-import scala.io.Source
-
-import com.google.common.io.Files
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-
-import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
-
-import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
-import org.apache.spark.deploy.SparkHadoopUtil
-import java.nio.ByteBuffer
-
-
-/**
- * Various utility methods used by Spark.
- */
-private object Utils extends Logging {
-
- /** Serialize an object using Java serialization */
- def serialize[T](o: T): Array[Byte] = {
- val bos = new ByteArrayOutputStream()
- val oos = new ObjectOutputStream(bos)
- oos.writeObject(o)
- oos.close()
- return bos.toByteArray
- }
-
- /** Deserialize an object using Java serialization */
- def deserialize[T](bytes: Array[Byte]): T = {
- val bis = new ByteArrayInputStream(bytes)
- val ois = new ObjectInputStream(bis)
- return ois.readObject.asInstanceOf[T]
- }
-
- /** Deserialize an object using Java serialization and the given ClassLoader */
- def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
- val bis = new ByteArrayInputStream(bytes)
- val ois = new ObjectInputStream(bis) {
- override def resolveClass(desc: ObjectStreamClass) =
- Class.forName(desc.getName, false, loader)
- }
- return ois.readObject.asInstanceOf[T]
- }
-
- /** Serialize via nested stream using specific serializer */
- def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
- val osWrapper = ser.serializeStream(new OutputStream {
- def write(b: Int) = os.write(b)
-
- override def write(b: Array[Byte], off: Int, len: Int) = os.write(b, off, len)
- })
- try {
- f(osWrapper)
- } finally {
- osWrapper.close()
- }
- }
-
- /** Deserialize via nested stream using specific serializer */
- def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(f: DeserializationStream => Unit) = {
- val isWrapper = ser.deserializeStream(new InputStream {
- def read(): Int = is.read()
-
- override def read(b: Array[Byte], off: Int, len: Int): Int = is.read(b, off, len)
- })
- try {
- f(isWrapper)
- } finally {
- isWrapper.close()
- }
- }
-
- /**
- * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
- */
- def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
- if (bb.hasArray) {
- out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
- } else {
- val bbval = new Array[Byte](bb.remaining())
- bb.get(bbval)
- out.write(bbval)
- }
- }
-
- def isAlpha(c: Char): Boolean = {
- (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
- }
-
- /** Split a string into words at non-alphabetic characters */
- def splitWords(s: String): Seq[String] = {
- val buf = new ArrayBuffer[String]
- var i = 0
- while (i < s.length) {
- var j = i
- while (j < s.length && isAlpha(s.charAt(j))) {
- j += 1
- }
- if (j > i) {
- buf += s.substring(i, j)
- }
- i = j
- while (i < s.length && !isAlpha(s.charAt(i))) {
- i += 1
- }
- }
- return buf
- }
-
- private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
-
- // Register the path to be deleted via shutdown hook
- def registerShutdownDeleteDir(file: File) {
- val absolutePath = file.getAbsolutePath()
- shutdownDeletePaths.synchronized {
- shutdownDeletePaths += absolutePath
- }
- }
-
- // Is the path already registered to be deleted via a shutdown hook ?
- def hasShutdownDeleteDir(file: File): Boolean = {
- val absolutePath = file.getAbsolutePath()
- shutdownDeletePaths.synchronized {
- shutdownDeletePaths.contains(absolutePath)
- }
- }
-
- // Note: if file is child of some registered path, while not equal to it, then return true;
- // else false. This is to ensure that two shutdown hooks do not try to delete each others
- // paths - resulting in IOException and incomplete cleanup.
- def hasRootAsShutdownDeleteDir(file: File): Boolean = {
- val absolutePath = file.getAbsolutePath()
- val retval = shutdownDeletePaths.synchronized {
- shutdownDeletePaths.find { path =>
- !absolutePath.equals(path) && absolutePath.startsWith(path)
- }.isDefined
- }
- if (retval) {
- logInfo("path = " + file + ", already present as root for deletion.")
- }
- retval
- }
-
- /** Create a temporary directory inside the given parent directory */
- def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
- var attempts = 0
- val maxAttempts = 10
- var dir: File = null
- while (dir == null) {
- attempts += 1
- if (attempts > maxAttempts) {
- throw new IOException("Failed to create a temp directory (under " + root + ") after " +
- maxAttempts + " attempts!")
- }
- try {
- dir = new File(root, "spark-" + UUID.randomUUID.toString)
- if (dir.exists() || !dir.mkdirs()) {
- dir = null
- }
- } catch { case e: IOException => ; }
- }
-
- registerShutdownDeleteDir(dir)
-
- // Add a shutdown hook to delete the temp dir when the JVM exits
- Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
- override def run() {
- // Attempt to delete if some patch which is parent of this is not already registered.
- if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
- }
- })
- dir
- }
-
- /** Copy all data from an InputStream to an OutputStream */
- def copyStream(in: InputStream,
- out: OutputStream,
- closeStreams: Boolean = false)
- {
- val buf = new Array[Byte](8192)
- var n = 0
- while (n != -1) {
- n = in.read(buf)
- if (n != -1) {
- out.write(buf, 0, n)
- }
- }
- if (closeStreams) {
- in.close()
- out.close()
- }
- }
-
- /**
- * Download a file requested by the executor. Supports fetching the file in a variety of ways,
- * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
- *
- * Throws SparkException if the target file already exists and has different contents than
- * the requested file.
- */
- def fetchFile(url: String, targetDir: File) {
- val filename = url.split("/").last
- val tempDir = getLocalDir
- val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
- val targetFile = new File(targetDir, filename)
- val uri = new URI(url)
- uri.getScheme match {
- case "http" | "https" | "ftp" =>
- logInfo("Fetching " + url + " to " + tempFile)
- val in = new URL(url).openStream()
- val out = new FileOutputStream(tempFile)
- Utils.copyStream(in, out, true)
- if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
- tempFile.delete()
- throw new SparkException(
- "File " + targetFile + " exists and does not match contents of" + " " + url)
- } else {
- Files.move(tempFile, targetFile)
- }
- case "file" | null =>
- // In the case of a local file, copy the local file to the target directory.
- // Note the difference between uri vs url.
- val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
- if (targetFile.exists) {
- // If the target file already exists, warn the user if
- if (!Files.equal(sourceFile, targetFile)) {
- throw new SparkException(
- "File " + targetFile + " exists and does not match contents of" + " " + url)
- } else {
- // Do nothing if the file contents are the same, i.e. this file has been copied
- // previously.
- logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
- + targetFile.getAbsolutePath)
- }
- } else {
- // The file does not exist in the target directory. Copy it there.
- logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
- Files.copy(sourceFile, targetFile)
- }
- case _ =>
- // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
- val env = SparkEnv.get
- val uri = new URI(url)
- val conf = env.hadoop.newConfiguration()
- val fs = FileSystem.get(uri, conf)
- val in = fs.open(new Path(uri))
- val out = new FileOutputStream(tempFile)
- Utils.copyStream(in, out, true)
- if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
- tempFile.delete()
- throw new SparkException("File " + targetFile + " exists and does not match contents of" +
- " " + url)
- } else {
- Files.move(tempFile, targetFile)
- }
- }
- // Decompress the file if it's a .tar or .tar.gz
- if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
- logInfo("Untarring " + filename)
- Utils.execute(Seq("tar", "-xzf", filename), targetDir)
- } else if (filename.endsWith(".tar")) {
- logInfo("Untarring " + filename)
- Utils.execute(Seq("tar", "-xf", filename), targetDir)
- }
- // Make the file executable - That's necessary for scripts
- FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
- }
-
- /**
- * Get a temporary directory using Spark's spark.local.dir property, if set. This will always
- * return a single directory, even though the spark.local.dir property might be a list of
- * multiple paths.
- */
- def getLocalDir: String = {
- System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
- }
-
- /**
- * Shuffle the elements of a collection into a random order, returning the
- * result in a new collection. Unlike scala.util.Random.shuffle, this method
- * uses a local random number generator, avoiding inter-thread contention.
- */
- def randomize[T: ClassManifest](seq: TraversableOnce[T]): Seq[T] = {
- randomizeInPlace(seq.toArray)
- }
-
- /**
- * Shuffle the elements of an array into a random order, modifying the
- * original array. Returns the original array.
- */
- def randomizeInPlace[T](arr: Array[T], rand: Random = new Random): Array[T] = {
- for (i <- (arr.length - 1) to 1 by -1) {
- val j = rand.nextInt(i)
- val tmp = arr(j)
- arr(j) = arr(i)
- arr(i) = tmp
- }
- arr
- }
-
- /**
- * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
- * Note, this is typically not used from within core spark.
- */
- lazy val localIpAddress: String = findLocalIpAddress()
- lazy val localIpAddressHostname: String = getAddressHostName(localIpAddress)
-
- private def findLocalIpAddress(): String = {
- val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
- if (defaultIpOverride != null) {
- defaultIpOverride
- } else {
- val address = InetAddress.getLocalHost
- if (address.isLoopbackAddress) {
- // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
- // a better address using the local network interfaces
- for (ni <- NetworkInterface.getNetworkInterfaces) {
- for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress &&
- !addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) {
- // We've found an address that looks reasonable!
- logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
- " a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
- " instead (on interface " + ni.getName + ")")
- logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
- return addr.getHostAddress
- }
- }
- logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
- " a loopback address: " + address.getHostAddress + ", but we couldn't find any" +
- " external IP address!")
- logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
- }
- address.getHostAddress
- }
- }
-
- private var customHostname: Option[String] = None
-
- /**
- * Allow setting a custom host name because when we run on Mesos we need to use the same
- * hostname it reports to the master.
- */
- def setCustomHostname(hostname: String) {
- // DEBUG code
- Utils.checkHost(hostname)
- customHostname = Some(hostname)
- }
-
- /**
- * Get the local machine's hostname.
- */
- def localHostName(): String = {
- customHostname.getOrElse(localIpAddressHostname)
- }
-
- def getAddressHostName(address: String): String = {
- InetAddress.getByName(address).getHostName
- }
-
- def localHostPort(): String = {
- val retval = System.getProperty("spark.hostPort", null)
- if (retval == null) {
- logErrorWithStack("spark.hostPort not set but invoking localHostPort")
- return localHostName()
- }
-
- retval
- }
-
- def checkHost(host: String, message: String = "") {
- assert(host.indexOf(':') == -1, message)
- }
-
- def checkHostPort(hostPort: String, message: String = "") {
- assert(hostPort.indexOf(':') != -1, message)
- }
-
- // Used by DEBUG code : remove when all testing done
- def logErrorWithStack(msg: String) {
- try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
- }
-
- // Typically, this will be of order of number of nodes in cluster
- // If not, we should change it to LRUCache or something.
- private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
-
- def parseHostPort(hostPort: String): (String, Int) = {
- {
- // Check cache first.
- var cached = hostPortParseResults.get(hostPort)
- if (cached != null) return cached
- }
-
- val indx: Int = hostPort.lastIndexOf(':')
- // This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
- // but then hadoop does not support ipv6 right now.
- // For now, we assume that if port exists, then it is valid - not check if it is an int > 0
- if (-1 == indx) {
- val retval = (hostPort, 0)
- hostPortParseResults.put(hostPort, retval)
- return retval
- }
-
- val retval = (hostPort.substring(0, indx).trim(), hostPort.substring(indx + 1).trim().toInt)
- hostPortParseResults.putIfAbsent(hostPort, retval)
- hostPortParseResults.get(hostPort)
- }
-
- private[spark] val daemonThreadFactory: ThreadFactory =
- new ThreadFactoryBuilder().setDaemon(true).build()
-
- /**
- * Wrapper over newCachedThreadPool.
- */
- def newDaemonCachedThreadPool(): ThreadPoolExecutor =
- Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
-
- /**
- * Return the string to tell how long has passed in seconds. The passing parameter should be in
- * millisecond.
- */
- def getUsedTimeMs(startTimeMs: Long): String = {
- return " " + (System.currentTimeMillis - startTimeMs) + " ms"
- }
-
- /**
- * Wrapper over newFixedThreadPool.
- */
- def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
- Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
-
- /**
- * Delete a file or directory and its contents recursively.
- */
- def deleteRecursively(file: File) {
- if (file.isDirectory) {
- for (child <- file.listFiles()) {
- deleteRecursively(child)
- }
- }
- if (!file.delete()) {
- throw new IOException("Failed to delete: " + file)
- }
- }
-
- /**
- * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
- * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
- * environment variable.
- */
- def memoryStringToMb(str: String): Int = {
- val lower = str.toLowerCase
- if (lower.endsWith("k")) {
- (lower.substring(0, lower.length-1).toLong / 1024).toInt
- } else if (lower.endsWith("m")) {
- lower.substring(0, lower.length-1).toInt
- } else if (lower.endsWith("g")) {
- lower.substring(0, lower.length-1).toInt * 1024
- } else if (lower.endsWith("t")) {
- lower.substring(0, lower.length-1).toInt * 1024 * 1024
- } else {// no suffix, so it's just a number in bytes
- (lower.toLong / 1024 / 1024).toInt
- }
- }
-
- /**
- * Convert a quantity in bytes to a human-readable string such as "4.0 MB".
- */
- def bytesToString(size: Long): String = {
- val TB = 1L << 40
- val GB = 1L << 30
- val MB = 1L << 20
- val KB = 1L << 10
-
- val (value, unit) = {
- if (size >= 2*TB) {
- (size.asInstanceOf[Double] / TB, "TB")
- } else if (size >= 2*GB) {
- (size.asInstanceOf[Double] / GB, "GB")
- } else if (size >= 2*MB) {
- (size.asInstanceOf[Double] / MB, "MB")
- } else if (size >= 2*KB) {
- (size.asInstanceOf[Double] / KB, "KB")
- } else {
- (size.asInstanceOf[Double], "B")
- }
- }
- "%.1f %s".formatLocal(Locale.US, value, unit)
- }
-
- /**
- * Returns a human-readable string representing a duration such as "35ms"
- */
- def msDurationToString(ms: Long): String = {
- val second = 1000
- val minute = 60 * second
- val hour = 60 * minute
-
- ms match {
- case t if t < second =>
- "%d ms".format(t)
- case t if t < minute =>
- "%.1f s".format(t.toFloat / second)
- case t if t < hour =>
- "%.1f m".format(t.toFloat / minute)
- case t =>
- "%.2f h".format(t.toFloat / hour)
- }
- }
-
- /**
- * Convert a quantity in megabytes to a human-readable string such as "4.0 MB".
- */
- def megabytesToString(megabytes: Long): String = {
- bytesToString(megabytes * 1024L * 1024L)
- }
-
- /**
- * Execute a command in the given working directory, throwing an exception if it completes
- * with an exit code other than 0.
- */
- def execute(command: Seq[String], workingDir: File) {
- val process = new ProcessBuilder(command: _*)
- .directory(workingDir)
- .redirectErrorStream(true)
- .start()
- new Thread("read stdout for " + command(0)) {
- override def run() {
- for (line <- Source.fromInputStream(process.getInputStream).getLines) {
- System.err.println(line)
- }
- }
- }.start()
- val exitCode = process.waitFor()
- if (exitCode != 0) {
- throw new SparkException("Process " + command + " exited with code " + exitCode)
- }
- }
-
- /**
- * Execute a command in the current working directory, throwing an exception if it completes
- * with an exit code other than 0.
- */
- def execute(command: Seq[String]) {
- execute(command, new File("."))
- }
-
- /**
- * Execute a command and get its output, throwing an exception if it yields a code other than 0.
- */
- def executeAndGetOutput(command: Seq[String], workingDir: File = new File("."),
- extraEnvironment: Map[String, String] = Map.empty): String = {
- val builder = new ProcessBuilder(command: _*)
- .directory(workingDir)
- val environment = builder.environment()
- for ((key, value) <- extraEnvironment) {
- environment.put(key, value)
- }
- val process = builder.start()
- new Thread("read stderr for " + command(0)) {
- override def run() {
- for (line <- Source.fromInputStream(process.getErrorStream).getLines) {
- System.err.println(line)
- }
- }
- }.start()
- val output = new StringBuffer
- val stdoutThread = new Thread("read stdout for " + command(0)) {
- override def run() {
- for (line <- Source.fromInputStream(process.getInputStream).getLines) {
- output.append(line)
- }
- }
- }
- stdoutThread.start()
- val exitCode = process.waitFor()
- stdoutThread.join() // Wait for it to finish reading output
- if (exitCode != 0) {
- throw new SparkException("Process " + command + " exited with code " + exitCode)
- }
- output.toString
- }
-
- /**
- * A regular expression to match classes of the "core" Spark API that we want to skip when
- * finding the call site of a method.
- */
- private val SPARK_CLASS_REGEX = """^spark(\.api\.java)?(\.rdd)?\.[A-Z]""".r
-
- private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
- val firstUserLine: Int, val firstUserClass: String)
-
- /**
- * When called inside a class in the spark package, returns the name of the user code class
- * (outside the spark package) that called into Spark, as well as which Spark method they called.
- * This is used, for example, to tell users where in their code each RDD got created.
- */
- def getCallSiteInfo: CallSiteInfo = {
- val trace = Thread.currentThread.getStackTrace().filter( el =>
- (!el.getMethodName.contains("getStackTrace")))
-
- // Keep crawling up the stack trace until we find the first function not inside of the spark
- // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
- // transformation, a SparkContext function (such as parallelize), or anything else that leads
- // to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
- var lastSparkMethod = "<unknown>"
- var firstUserFile = "<unknown>"
- var firstUserLine = 0
- var finished = false
- var firstUserClass = "<unknown>"
-
- for (el <- trace) {
- if (!finished) {
- if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName) != None) {
- lastSparkMethod = if (el.getMethodName == "<init>") {
- // Spark method is a constructor; get its class name
- el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
- } else {
- el.getMethodName
- }
- }
- else {
- firstUserLine = el.getLineNumber
- firstUserFile = el.getFileName
- firstUserClass = el.getClassName
- finished = true
- }
- }
- }
- new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
- }
-
- def formatSparkCallSite = {
- val callSiteInfo = getCallSiteInfo
- "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
- callSiteInfo.firstUserLine)
- }
-
- /** Return a string containing part of a file from byte 'start' to 'end'. */
- def offsetBytes(path: String, start: Long, end: Long): String = {
- val file = new File(path)
- val length = file.length()
- val effectiveEnd = math.min(length, end)
- val effectiveStart = math.max(0, start)
- val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
- val stream = new FileInputStream(file)
-
- stream.skip(effectiveStart)
- stream.read(buff)
- stream.close()
- Source.fromBytes(buff).mkString
- }
-
- /**
- * Clone an object using a Spark serializer.
- */
- def clone[T](value: T, serializer: SerializerInstance): T = {
- serializer.deserialize[T](serializer.serialize(value))
- }
-
- /**
- * Detect whether this thread might be executing a shutdown hook. Will always return true if
- * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
- * if System.exit was just called by a concurrent thread).
- *
- * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
- * an IllegalStateException.
- */
- def inShutdown(): Boolean = {
- try {
- val hook = new Thread {
- override def run() {}
- }
- Runtime.getRuntime.addShutdownHook(hook)
- Runtime.getRuntime.removeShutdownHook(hook)
- } catch {
- case ise: IllegalStateException => return true
- }
- return false
- }
-
- def isSpace(c: Char): Boolean = {
- " \t\r\n".indexOf(c) != -1
- }
-
- /**
- * Split a string of potentially quoted arguments from the command line the way that a shell
- * would do it to determine arguments to a command. For example, if the string is 'a "b c" d',
- * then it would be parsed as three arguments: 'a', 'b c' and 'd'.
- */
- def splitCommandString(s: String): Seq[String] = {
- val buf = new ArrayBuffer[String]
- var inWord = false
- var inSingleQuote = false
- var inDoubleQuote = false
- var curWord = new StringBuilder
- def endWord() {
- buf += curWord.toString
- curWord.clear()
- }
- var i = 0
- while (i < s.length) {
- var nextChar = s.charAt(i)
- if (inDoubleQuote) {
- if (nextChar == '"') {
- inDoubleQuote = false
- } else if (nextChar == '\\') {
- if (i < s.length - 1) {
- // Append the next character directly, because only " and \ may be escaped in
- // double quotes after the shell's own expansion
- curWord.append(s.charAt(i + 1))
- i += 1
- }
- } else {
- curWord.append(nextChar)
- }
- } else if (inSingleQuote) {
- if (nextChar == '\'') {
- inSingleQuote = false
- } else {
- curWord.append(nextChar)
- }
- // Backslashes are not treated specially in single quotes
- } else if (nextChar == '"') {
- inWord = true
- inDoubleQuote = true
- } else if (nextChar == '\'') {
- inWord = true
- inSingleQuote = true
- } else if (!isSpace(nextChar)) {
- curWord.append(nextChar)
- inWord = true
- } else if (inWord && isSpace(nextChar)) {
- endWord()
- inWord = false
- }
- i += 1
- }
- if (inWord || inDoubleQuote || inSingleQuote) {
- endWord()
- }
- return buf
- }
-
- /* Calculates 'x' modulo 'mod', takes to consideration sign of x,
- * i.e. if 'x' is negative, than 'x' % 'mod' is negative too
- * so function return (x % mod) + mod in that case.
- */
- def nonNegativeMod(x: Int, mod: Int): Int = {
- val rawMod = x % mod
- rawMod + (if (rawMod < 0) mod else 0)
- }
-}