You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:54:22 UTC

[1/2] git commit: Adjusted visibility of various components.

Updated Branches:
  refs/heads/master 0ca0d4d65 -> 68641bce6


Adjusted visibility of various components.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/33022d66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/33022d66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/33022d66

Branch: refs/heads/master
Commit: 33022d6656530ffd272ed447af543473fb8de5e9
Parents: 30328c3
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Jan 13 19:58:53 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Jan 13 19:58:53 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   |  4 +-
 .../scala/org/apache/spark/FutureAction.scala   |  8 +-
 .../apache/spark/InterruptibleIterator.scala    |  2 +-
 .../main/scala/org/apache/spark/Logging.scala   |  2 +-
 .../org/apache/spark/broadcast/Broadcast.scala  |  1 +
 .../spark/broadcast/BroadcastFactory.scala      |  2 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |  5 +-
 .../spark/broadcast/TorrentBroadcast.scala      |  6 +-
 .../scala/org/apache/spark/deploy/Client.scala  |  3 +-
 .../spark/deploy/worker/CommandUtils.scala      |  3 +-
 .../main/scala/org/apache/spark/package.scala   |  3 +
 .../scala/org/apache/spark/rdd/PipedRDD.scala   |  3 +-
 .../apache/spark/scheduler/SparkListener.scala  | 13 ++--
 .../spark/storage/BlockObjectWriter.scala       |  4 +-
 .../org/apache/spark/storage/StorageLevel.scala |  4 +
 .../apache/spark/util/CompletionIterator.scala  | 11 +--
 .../org/apache/spark/util/MetadataCleaner.scala |  8 +-
 .../spark/util/RateLimitedOutputStream.scala    | 79 --------------------
 .../util/RateLimitedOutputStreamSuite.scala     | 40 ----------
 project/SparkBuild.scala                        |  7 ++
 .../util/RateLimitedOutputStream.scala          | 79 ++++++++++++++++++++
 .../spark/streaming/util/RawTextSender.scala    | 13 ++--
 .../util/RateLimitedOutputStreamSuite.scala     | 40 ++++++++++
 23 files changed, 183 insertions(+), 157 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index e89ac28..2ba871a 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -24,7 +24,7 @@ import scala.collection.generic.Growable
 import org.apache.spark.serializer.JavaSerializer
 
 /**
- * A datatype that can be accumulated, i.e. has an commutative and associative "add" operation,
+ * A datatype that can be accumulated, ie has an commutative and associative "add" operation,
  * but where the result type, `R`, may be different from the element type being added, `T`.
  *
  * You must define how to add data, and how to merge two of these together.  For some datatypes,
@@ -185,7 +185,7 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
 }
 
 /**
- * A simpler value of [[org.apache.spark.Accumulable]] where the result type being accumulated is the same
+ * A simpler value of [[Accumulable]] where the result type being accumulated is the same
  * as the types of elements being merged.
  *
  * @param initialValue initial value of accumulator

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/FutureAction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index c6b4ac5..d7d1028 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -27,8 +27,8 @@ import org.apache.spark.rdd.RDD
 
 
 /**
- * A future for the result of an action. This is an extension of the Scala Future interface to
- * support cancellation.
+ * A future for the result of an action to support cancellation. This is an extension of the
+ * Scala Future interface to support cancellation.
  */
 trait FutureAction[T] extends Future[T] {
   // Note that we redefine methods of the Future trait here explicitly so we can specify a different
@@ -86,7 +86,7 @@ trait FutureAction[T] extends Future[T] {
 
 
 /**
- * The future holding the result of an action that triggers a single job. Examples include
+ * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
  * count, collect, reduce.
  */
 class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
@@ -150,7 +150,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
 
 
 /**
- * A FutureAction for actions that could trigger multiple Spark jobs. Examples include take,
+ * A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
  * takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
  * action thread if it is being blocked by a job.
  */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
index 56e0b8d..9b1601d 100644
--- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
+++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 /**
  * An iterator that wraps around an existing iterator to provide task killing functionality.
- * It works by checking the interrupted flag in TaskContext.
+ * It works by checking the interrupted flag in [[TaskContext]].
  */
 class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
   extends Iterator[T] {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 9063cae..b749e54 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -122,7 +122,7 @@ trait Logging {
   }
 }
 
-object Logging {
+private object Logging {
   @volatile private var initialized = false
   val initLock = new Object()
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 0fc478a..6bfe2cb 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
 
 import org.apache.spark._
 
+private[spark]
 abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
   def value: T
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
index fb161ce..940e5ab 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkConf
  * BroadcastFactory implementation to instantiate a particular broadcast for the
  * entire Spark job.
  */
-private[spark] trait BroadcastFactory {
+trait BroadcastFactory {
   def initialize(isDriver: Boolean, conf: SparkConf): Unit
   def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
   def stop(): Unit

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 0eacda3..39ee0db 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -63,7 +63,10 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
   }
 }
 
-private[spark] class HttpBroadcastFactory extends BroadcastFactory {
+/**
+ * A [[BroadcastFactory]] implementation that uses a HTTP server as the broadcast medium.
+ */
+class HttpBroadcastFactory extends BroadcastFactory {
   def initialize(isDriver: Boolean, conf: SparkConf) { HttpBroadcast.initialize(isDriver, conf) }
 
   def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 1d295c6..d351dfc 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -236,8 +236,10 @@ private[spark] case class TorrentInfo(
   @transient var hasBlocks = 0
 }
 
-private[spark] class TorrentBroadcastFactory
-  extends BroadcastFactory {
+/**
+ * A [[BroadcastFactory]] that creates a torrent-based implementation of broadcast.
+ */
+class TorrentBroadcastFactory extends BroadcastFactory {
 
   def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) }
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index e133893..9987e23 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -29,13 +29,12 @@ import org.apache.spark.{Logging, SparkConf}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.{DriverState, Master}
 import org.apache.spark.util.{AkkaUtils, Utils}
-import akka.actor.Actor.emptyBehavior
 import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
 
 /**
  * Proxy that relays messages to the driver.
  */
-class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
+private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
   var masterActor: ActorSelection = _
   val timeout = AkkaUtils.askTimeout(conf)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 7507bf8..cf6a233 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -10,8 +10,9 @@ import org.apache.spark.util.Utils
 /**
  ** Utilities for running commands with the spark classpath.
  */
+private[spark]
 object CommandUtils extends Logging {
-  private[spark] def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
+  def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
     val runner = getEnv("JAVA_HOME", command).map(_ + "/bin/java").getOrElse("java")
 
     // SPARK-698: do not call the run.cmd script, as process.destroy()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala
index 70a5a8c..2625a7f 100644
--- a/core/src/main/scala/org/apache/spark/package.scala
+++ b/core/src/main/scala/org/apache/spark/package.scala
@@ -29,6 +29,9 @@ package org.apache
  * be saved as SequenceFiles. These operations are automatically available on any RDD of the right
  * type (e.g. RDD[(Int, Int)] through implicit conversions when you
  * `import org.apache.spark.SparkContext._`.
+ *
+ * Java programmers should reference the [[spark.api.java]] package
+ * for Spark programming APIs in Java.
  */
 package object spark {
   // For package docs only

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index d4f396a..8ef919c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -27,7 +27,6 @@ import scala.io.Source
 import scala.reflect.ClassTag
 
 import org.apache.spark.{SparkEnv, Partition, TaskContext}
-import org.apache.spark.broadcast.Broadcast
 
 
 /**
@@ -113,7 +112,7 @@ class PipedRDD[T: ClassTag](
   }
 }
 
-object PipedRDD {
+private object PipedRDD {
   // Split a string into words using a standard StringTokenizer
   def tokenize(command: String): Seq[String] = {
     val buf = new ArrayBuffer[String]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 55a40a9..d8e97c3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
 
 import java.util.Properties
 import org.apache.spark.util.{Utils, Distribution}
-import org.apache.spark.{Logging, SparkContext, TaskEndReason}
+import org.apache.spark.{Logging, TaskEndReason}
 import org.apache.spark.executor.TaskMetrics
 
 sealed trait SparkListenerEvents
@@ -27,7 +27,7 @@ sealed trait SparkListenerEvents
 case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
      extends SparkListenerEvents
 
-case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents
+case class SparkListenerStageCompleted(stage: StageInfo) extends SparkListenerEvents
 
 case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
 
@@ -46,6 +46,9 @@ case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
 /** An event used in the listener to shutdown the listener daemon thread. */
 private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents
 
+/**
+ * Interface for listening to events from the Spark scheduler.
+ */
 trait SparkListener {
   /**
    * Called when a stage is completed, with information on the completed stage
@@ -115,7 +118,7 @@ class StatsReportListener extends SparkListener with Logging {
 
 }
 
-object StatsReportListener extends Logging {
+private[spark] object StatsReportListener extends Logging {
 
   //for profiling, the extremes are more interesting
   val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
@@ -202,9 +205,9 @@ object StatsReportListener extends Logging {
   }
 }
 
+private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
 
-case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
-object RuntimePercentage {
+private object RuntimePercentage {
   def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
     val denom = totalTime.toDouble
     val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 369a277..48cec4b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -32,7 +32,7 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
  *
  * This interface does not support concurrent writes.
  */
-abstract class BlockObjectWriter(val blockId: BlockId) {
+private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
 
   def open(): BlockObjectWriter
 
@@ -69,7 +69,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
 }
 
 /** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
-class DiskBlockObjectWriter(
+private[spark] class DiskBlockObjectWriter(
     blockId: BlockId,
     file: File,
     serializer: Serializer,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 0f84810..1b7934d 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -108,6 +108,10 @@ class StorageLevel private(
 }
 
 
+/**
+ * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
+ * new storage levels.
+ */
 object StorageLevel {
   val NONE = new StorageLevel(false, false, false)
   val DISK_ONLY = new StorageLevel(true, false, false)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
index dc15a38..fcc1ca9 100644
--- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
@@ -18,14 +18,15 @@
 package org.apache.spark.util
 
 /**
- * Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements
+ * Wrapper around an iterator which calls a completion method after it successfully iterates
+ * through all the elements.
  */
-abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
-  def next = sub.next
+private[spark] abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
+  def next() = sub.next()
   def hasNext = {
     val r = sub.hasNext
     if (!r) {
-      completion
+      completion()
     }
     r
   }
@@ -33,7 +34,7 @@ abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterato
   def completion()
 }
 
-object CompletionIterator {
+private[spark] object CompletionIterator {
   def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = {
     new CompletionIterator[A,I](sub) {
       def completion() = completionFunction

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index ac07a55..b0febe9 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -18,13 +18,13 @@
 package org.apache.spark.util
 
 import java.util.{TimerTask, Timer}
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{SparkConf, Logging}
 
 
 /**
  * Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
  */
-class MetadataCleaner(
+private[spark] class MetadataCleaner(
     cleanerType: MetadataCleanerType.MetadataCleanerType,
     cleanupFunc: (Long) => Unit,
     conf: SparkConf)
@@ -60,7 +60,7 @@ class MetadataCleaner(
   }
 }
 
-object MetadataCleanerType extends Enumeration {
+private[spark] object MetadataCleanerType extends Enumeration {
 
   val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK,
     SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
@@ -72,7 +72,7 @@ object MetadataCleanerType extends Enumeration {
 
 // TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the
 // initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
-object MetadataCleaner {
+private[spark] object MetadataCleaner {
   def getDelaySeconds(conf: SparkConf) = {
     conf.getInt("spark.cleaner.ttl", -1)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala b/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala
deleted file mode 100644
index 47e1b45..0000000
--- a/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.annotation.tailrec
-
-import java.io.OutputStream
-import java.util.concurrent.TimeUnit._
-
-class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream {
-  val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
-  val CHUNK_SIZE = 8192
-  var lastSyncTime = System.nanoTime
-  var bytesWrittenSinceSync: Long = 0
-
-  override def write(b: Int) {
-    waitToWrite(1)
-    out.write(b)
-  }
-
-  override def write(bytes: Array[Byte]) {
-    write(bytes, 0, bytes.length)
-  }
-
-  @tailrec
-  override final def write(bytes: Array[Byte], offset: Int, length: Int) {
-    val writeSize = math.min(length - offset, CHUNK_SIZE)
-    if (writeSize > 0) {
-      waitToWrite(writeSize)
-      out.write(bytes, offset, writeSize)
-      write(bytes, offset + writeSize, length)
-    }
-  }
-
-  override def flush() {
-    out.flush()
-  }
-
-  override def close() {
-    out.close()
-  }
-
-  @tailrec
-  private def waitToWrite(numBytes: Int) {
-    val now = System.nanoTime
-    val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS)
-    val rate = bytesWrittenSinceSync.toDouble / elapsedSecs
-    if (rate < bytesPerSec) {
-      // It's okay to write; just update some variables and return
-      bytesWrittenSinceSync += numBytes
-      if (now > lastSyncTime + SYNC_INTERVAL) {
-        // Sync interval has passed; let's resync
-        lastSyncTime = now
-        bytesWrittenSinceSync = numBytes
-      }
-    } else {
-      // Calculate how much time we should sleep to bring ourselves to the desired rate.
-      // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
-      val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
-      if (sleepTime > 0) Thread.sleep(sleepTime)
-      waitToWrite(numBytes)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
deleted file mode 100644
index a9dd0b1..0000000
--- a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import org.scalatest.FunSuite
-import java.io.ByteArrayOutputStream
-import java.util.concurrent.TimeUnit._
-
-class RateLimitedOutputStreamSuite extends FunSuite {
-
-  private def benchmark[U](f: => U): Long = {
-    val start = System.nanoTime
-    f
-    System.nanoTime - start
-  }
-
-  test("write") {
-    val underlying = new ByteArrayOutputStream
-    val data = "X" * 41000
-    val stream = new RateLimitedOutputStream(underlying, 10000)
-    val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) }
-    assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4)
-    assert(underlying.toString("UTF-8") == data)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c8b5f09..d4e06dd 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -136,6 +136,13 @@ object SparkBuild extends Build {
     javaOptions += "-Xmx3g",
     // Show full stack trace and duration in test cases.
     testOptions in Test += Tests.Argument("-oDF"),
+    // Remove certain packages from Scaladoc
+    scalacOptions in (Compile,doc) := Seq("-skip-packages", Seq(
+      "akka",
+      "org.apache.spark.network",
+      "org.apache.spark.deploy",
+      "org.apache.spark.util.collection"
+      ).mkString(":")),
 
     // Only allow one test at a time, even across projects, since they run in the same JVM
     concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
new file mode 100644
index 0000000..b9c0596
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import scala.annotation.tailrec
+
+import java.io.OutputStream
+import java.util.concurrent.TimeUnit._
+
+class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream {
+  val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
+  val CHUNK_SIZE = 8192
+  var lastSyncTime = System.nanoTime
+  var bytesWrittenSinceSync: Long = 0
+
+  override def write(b: Int) {
+    waitToWrite(1)
+    out.write(b)
+  }
+
+  override def write(bytes: Array[Byte]) {
+    write(bytes, 0, bytes.length)
+  }
+
+  @tailrec
+  override final def write(bytes: Array[Byte], offset: Int, length: Int) {
+    val writeSize = math.min(length - offset, CHUNK_SIZE)
+    if (writeSize > 0) {
+      waitToWrite(writeSize)
+      out.write(bytes, offset, writeSize)
+      write(bytes, offset + writeSize, length)
+    }
+  }
+
+  override def flush() {
+    out.flush()
+  }
+
+  override def close() {
+    out.close()
+  }
+
+  @tailrec
+  private def waitToWrite(numBytes: Int) {
+    val now = System.nanoTime
+    val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS)
+    val rate = bytesWrittenSinceSync.toDouble / elapsedSecs
+    if (rate < bytesPerSec) {
+      // It's okay to write; just update some variables and return
+      bytesWrittenSinceSync += numBytes
+      if (now > lastSyncTime + SYNC_INTERVAL) {
+        // Sync interval has passed; let's resync
+        lastSyncTime = now
+        bytesWrittenSinceSync = numBytes
+      }
+    } else {
+      // Calculate how much time we should sleep to bring ourselves to the desired rate.
+      // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
+      val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
+      if (sleepTime > 0) Thread.sleep(sleepTime)
+      waitToWrite(numBytes)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index 6585d49..463617a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -17,14 +17,17 @@
 
 package org.apache.spark.streaming.util
 
-import java.nio.ByteBuffer
-import org.apache.spark.util.{RateLimitedOutputStream, IntParam}
+import java.io.IOException
 import java.net.ServerSocket
-import org.apache.spark.{SparkConf, Logging}
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+import java.nio.ByteBuffer
+
 import scala.io.Source
-import java.io.IOException
+
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+
+import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.util.IntParam
 
 /**
  * A helper program that sends blocks of Kryo-serialized text strings out on a socket at a

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala
new file mode 100644
index 0000000..15f13d5
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import org.scalatest.FunSuite
+import java.io.ByteArrayOutputStream
+import java.util.concurrent.TimeUnit._
+
+class RateLimitedOutputStreamSuite extends FunSuite {
+
+  private def benchmark[U](f: => U): Long = {
+    val start = System.nanoTime
+    f
+    System.nanoTime - start
+  }
+
+  test("write") {
+    val underlying = new ByteArrayOutputStream
+    val data = "X" * 41000
+    val stream = new RateLimitedOutputStream(underlying, 10000)
+    val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) }
+    assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4)
+    assert(underlying.toString("UTF-8") == data)
+  }
+}


[2/2] git commit: Merge pull request #413 from rxin/scaladoc

Posted by pw...@apache.org.
Merge pull request #413 from rxin/scaladoc

Adjusted visibility of various components and documentation for 0.9.0 release.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/68641bce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/68641bce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/68641bce

Branch: refs/heads/master
Commit: 68641bce612d9a4c4adc34d4dd9cab6735f28335
Parents: 0ca0d4d 33022d6
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 13 22:54:13 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 13 22:54:13 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   |  4 +-
 .../scala/org/apache/spark/FutureAction.scala   |  8 +-
 .../apache/spark/InterruptibleIterator.scala    |  2 +-
 .../main/scala/org/apache/spark/Logging.scala   |  2 +-
 .../org/apache/spark/broadcast/Broadcast.scala  |  1 +
 .../spark/broadcast/BroadcastFactory.scala      |  2 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |  5 +-
 .../spark/broadcast/TorrentBroadcast.scala      |  6 +-
 .../scala/org/apache/spark/deploy/Client.scala  |  3 +-
 .../spark/deploy/worker/CommandUtils.scala      |  3 +-
 .../main/scala/org/apache/spark/package.scala   |  3 +
 .../scala/org/apache/spark/rdd/PipedRDD.scala   |  3 +-
 .../apache/spark/scheduler/SparkListener.scala  | 13 ++--
 .../spark/storage/BlockObjectWriter.scala       |  4 +-
 .../org/apache/spark/storage/StorageLevel.scala |  4 +
 .../apache/spark/util/CompletionIterator.scala  | 11 +--
 .../org/apache/spark/util/MetadataCleaner.scala |  8 +-
 .../spark/util/RateLimitedOutputStream.scala    | 79 --------------------
 .../util/RateLimitedOutputStreamSuite.scala     | 40 ----------
 project/SparkBuild.scala                        |  7 ++
 .../util/RateLimitedOutputStream.scala          | 79 ++++++++++++++++++++
 .../spark/streaming/util/RawTextSender.scala    | 13 ++--
 .../util/RateLimitedOutputStreamSuite.scala     | 40 ++++++++++
 23 files changed, 183 insertions(+), 157 deletions(-)
----------------------------------------------------------------------