You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:54:22 UTC
[1/2] git commit: Adjusted visibility of various components.
Updated Branches:
refs/heads/master 0ca0d4d65 -> 68641bce6
Adjusted visibility of various components.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/33022d66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/33022d66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/33022d66
Branch: refs/heads/master
Commit: 33022d6656530ffd272ed447af543473fb8de5e9
Parents: 30328c3
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Jan 13 19:58:53 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Jan 13 19:58:53 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/Accumulators.scala | 4 +-
.../scala/org/apache/spark/FutureAction.scala | 8 +-
.../apache/spark/InterruptibleIterator.scala | 2 +-
.../main/scala/org/apache/spark/Logging.scala | 2 +-
.../org/apache/spark/broadcast/Broadcast.scala | 1 +
.../spark/broadcast/BroadcastFactory.scala | 2 +-
.../apache/spark/broadcast/HttpBroadcast.scala | 5 +-
.../spark/broadcast/TorrentBroadcast.scala | 6 +-
.../scala/org/apache/spark/deploy/Client.scala | 3 +-
.../spark/deploy/worker/CommandUtils.scala | 3 +-
.../main/scala/org/apache/spark/package.scala | 3 +
.../scala/org/apache/spark/rdd/PipedRDD.scala | 3 +-
.../apache/spark/scheduler/SparkListener.scala | 13 ++--
.../spark/storage/BlockObjectWriter.scala | 4 +-
.../org/apache/spark/storage/StorageLevel.scala | 4 +
.../apache/spark/util/CompletionIterator.scala | 11 +--
.../org/apache/spark/util/MetadataCleaner.scala | 8 +-
.../spark/util/RateLimitedOutputStream.scala | 79 --------------------
.../util/RateLimitedOutputStreamSuite.scala | 40 ----------
project/SparkBuild.scala | 7 ++
.../util/RateLimitedOutputStream.scala | 79 ++++++++++++++++++++
.../spark/streaming/util/RawTextSender.scala | 13 ++--
.../util/RateLimitedOutputStreamSuite.scala | 40 ++++++++++
23 files changed, 183 insertions(+), 157 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index e89ac28..2ba871a 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -24,7 +24,7 @@ import scala.collection.generic.Growable
import org.apache.spark.serializer.JavaSerializer
/**
- * A datatype that can be accumulated, i.e. has an commutative and associative "add" operation,
+ * A datatype that can be accumulated, ie has an commutative and associative "add" operation,
* but where the result type, `R`, may be different from the element type being added, `T`.
*
* You must define how to add data, and how to merge two of these together. For some datatypes,
@@ -185,7 +185,7 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
}
/**
- * A simpler value of [[org.apache.spark.Accumulable]] where the result type being accumulated is the same
+ * A simpler value of [[Accumulable]] where the result type being accumulated is the same
* as the types of elements being merged.
*
* @param initialValue initial value of accumulator
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/FutureAction.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index c6b4ac5..d7d1028 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -27,8 +27,8 @@ import org.apache.spark.rdd.RDD
/**
- * A future for the result of an action. This is an extension of the Scala Future interface to
- * support cancellation.
+ * A future for the result of an action to support cancellation. This is an extension of the
+ * Scala Future interface to support cancellation.
*/
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
@@ -86,7 +86,7 @@ trait FutureAction[T] extends Future[T] {
/**
- * The future holding the result of an action that triggers a single job. Examples include
+ * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
@@ -150,7 +150,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
/**
- * A FutureAction for actions that could trigger multiple Spark jobs. Examples include take,
+ * A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
index 56e0b8d..9b1601d 100644
--- a/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
+++ b/core/src/main/scala/org/apache/spark/InterruptibleIterator.scala
@@ -19,7 +19,7 @@ package org.apache.spark
/**
* An iterator that wraps around an existing iterator to provide task killing functionality.
- * It works by checking the interrupted flag in TaskContext.
+ * It works by checking the interrupted flag in [[TaskContext]].
*/
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala
index 9063cae..b749e54 100644
--- a/core/src/main/scala/org/apache/spark/Logging.scala
+++ b/core/src/main/scala/org/apache/spark/Logging.scala
@@ -122,7 +122,7 @@ trait Logging {
}
}
-object Logging {
+private object Logging {
@volatile private var initialized = false
val initLock = new Object()
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 0fc478a..6bfe2cb 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong
import org.apache.spark._
+private[spark]
abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
def value: T
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
index fb161ce..940e5ab 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
@@ -25,7 +25,7 @@ import org.apache.spark.SparkConf
* BroadcastFactory implementation to instantiate a particular broadcast for the
* entire Spark job.
*/
-private[spark] trait BroadcastFactory {
+trait BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf): Unit
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def stop(): Unit
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 0eacda3..39ee0db 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -63,7 +63,10 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
}
}
-private[spark] class HttpBroadcastFactory extends BroadcastFactory {
+/**
+ * A [[BroadcastFactory]] implementation that uses a HTTP server as the broadcast medium.
+ */
+class HttpBroadcastFactory extends BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf) { HttpBroadcast.initialize(isDriver, conf) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 1d295c6..d351dfc 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -236,8 +236,10 @@ private[spark] case class TorrentInfo(
@transient var hasBlocks = 0
}
-private[spark] class TorrentBroadcastFactory
- extends BroadcastFactory {
+/**
+ * A [[BroadcastFactory]] that creates a torrent-based implementation of broadcast.
+ */
+class TorrentBroadcastFactory extends BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) }
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/deploy/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala
index e133893..9987e23 100644
--- a/core/src/main/scala/org/apache/spark/deploy/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala
@@ -29,13 +29,12 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{AkkaUtils, Utils}
-import akka.actor.Actor.emptyBehavior
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}
/**
* Proxy that relays messages to the driver.
*/
-class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
+private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index 7507bf8..cf6a233 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -10,8 +10,9 @@ import org.apache.spark.util.Utils
/**
** Utilities for running commands with the spark classpath.
*/
+private[spark]
object CommandUtils extends Logging {
- private[spark] def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
+ def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val runner = getEnv("JAVA_HOME", command).map(_ + "/bin/java").getOrElse("java")
// SPARK-698: do not call the run.cmd script, as process.destroy()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala
index 70a5a8c..2625a7f 100644
--- a/core/src/main/scala/org/apache/spark/package.scala
+++ b/core/src/main/scala/org/apache/spark/package.scala
@@ -29,6 +29,9 @@ package org.apache
* be saved as SequenceFiles. These operations are automatically available on any RDD of the right
* type (e.g. RDD[(Int, Int)] through implicit conversions when you
* `import org.apache.spark.SparkContext._`.
+ *
+ * Java programmers should reference the [[spark.api.java]] package
+ * for Spark programming APIs in Java.
*/
package object spark {
// For package docs only
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index d4f396a..8ef919c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -27,7 +27,6 @@ import scala.io.Source
import scala.reflect.ClassTag
import org.apache.spark.{SparkEnv, Partition, TaskContext}
-import org.apache.spark.broadcast.Broadcast
/**
@@ -113,7 +112,7 @@ class PipedRDD[T: ClassTag](
}
}
-object PipedRDD {
+private object PipedRDD {
// Split a string into words using a standard StringTokenizer
def tokenize(command: String): Seq[String] = {
val buf = new ArrayBuffer[String]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 55a40a9..d8e97c3 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
import java.util.Properties
import org.apache.spark.util.{Utils, Distribution}
-import org.apache.spark.{Logging, SparkContext, TaskEndReason}
+import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.executor.TaskMetrics
sealed trait SparkListenerEvents
@@ -27,7 +27,7 @@ sealed trait SparkListenerEvents
case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
extends SparkListenerEvents
-case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents
+case class SparkListenerStageCompleted(stage: StageInfo) extends SparkListenerEvents
case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents
@@ -46,6 +46,9 @@ case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents
+/**
+ * Interface for listening to events from the Spark scheduler.
+ */
trait SparkListener {
/**
* Called when a stage is completed, with information on the completed stage
@@ -115,7 +118,7 @@ class StatsReportListener extends SparkListener with Logging {
}
-object StatsReportListener extends Logging {
+private[spark] object StatsReportListener extends Logging {
//for profiling, the extremes are more interesting
val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
@@ -202,9 +205,9 @@ object StatsReportListener extends Logging {
}
}
+private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
-case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
-object RuntimePercentage {
+private object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
val denom = totalTime.toDouble
val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index 369a277..48cec4b 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -32,7 +32,7 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
*
* This interface does not support concurrent writes.
*/
-abstract class BlockObjectWriter(val blockId: BlockId) {
+private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
def open(): BlockObjectWriter
@@ -69,7 +69,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
}
/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
-class DiskBlockObjectWriter(
+private[spark] class DiskBlockObjectWriter(
blockId: BlockId,
file: File,
serializer: Serializer,
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
index 0f84810..1b7934d 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
@@ -108,6 +108,10 @@ class StorageLevel private(
}
+/**
+ * Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
+ * new storage levels.
+ */
object StorageLevel {
val NONE = new StorageLevel(false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
index dc15a38..fcc1ca9 100644
--- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
@@ -18,14 +18,15 @@
package org.apache.spark.util
/**
- * Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements
+ * Wrapper around an iterator which calls a completion method after it successfully iterates
+ * through all the elements.
*/
-abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
- def next = sub.next
+private[spark] abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
+ def next() = sub.next()
def hasNext = {
val r = sub.hasNext
if (!r) {
- completion
+ completion()
}
r
}
@@ -33,7 +34,7 @@ abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterato
def completion()
}
-object CompletionIterator {
+private[spark] object CompletionIterator {
def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = {
new CompletionIterator[A,I](sub) {
def completion() = completionFunction
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
index ac07a55..b0febe9 100644
--- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala
@@ -18,13 +18,13 @@
package org.apache.spark.util
import java.util.{TimerTask, Timer}
-import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.{SparkConf, Logging}
/**
* Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
*/
-class MetadataCleaner(
+private[spark] class MetadataCleaner(
cleanerType: MetadataCleanerType.MetadataCleanerType,
cleanupFunc: (Long) => Unit,
conf: SparkConf)
@@ -60,7 +60,7 @@ class MetadataCleaner(
}
}
-object MetadataCleanerType extends Enumeration {
+private[spark] object MetadataCleanerType extends Enumeration {
val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK,
SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
@@ -72,7 +72,7 @@ object MetadataCleanerType extends Enumeration {
// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the
// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
-object MetadataCleaner {
+private[spark] object MetadataCleaner {
def getDelaySeconds(conf: SparkConf) = {
conf.getInt("spark.cleaner.ttl", -1)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala b/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala
deleted file mode 100644
index 47e1b45..0000000
--- a/core/src/main/scala/org/apache/spark/util/RateLimitedOutputStream.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import scala.annotation.tailrec
-
-import java.io.OutputStream
-import java.util.concurrent.TimeUnit._
-
-class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream {
- val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
- val CHUNK_SIZE = 8192
- var lastSyncTime = System.nanoTime
- var bytesWrittenSinceSync: Long = 0
-
- override def write(b: Int) {
- waitToWrite(1)
- out.write(b)
- }
-
- override def write(bytes: Array[Byte]) {
- write(bytes, 0, bytes.length)
- }
-
- @tailrec
- override final def write(bytes: Array[Byte], offset: Int, length: Int) {
- val writeSize = math.min(length - offset, CHUNK_SIZE)
- if (writeSize > 0) {
- waitToWrite(writeSize)
- out.write(bytes, offset, writeSize)
- write(bytes, offset + writeSize, length)
- }
- }
-
- override def flush() {
- out.flush()
- }
-
- override def close() {
- out.close()
- }
-
- @tailrec
- private def waitToWrite(numBytes: Int) {
- val now = System.nanoTime
- val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS)
- val rate = bytesWrittenSinceSync.toDouble / elapsedSecs
- if (rate < bytesPerSec) {
- // It's okay to write; just update some variables and return
- bytesWrittenSinceSync += numBytes
- if (now > lastSyncTime + SYNC_INTERVAL) {
- // Sync interval has passed; let's resync
- lastSyncTime = now
- bytesWrittenSinceSync = numBytes
- }
- } else {
- // Calculate how much time we should sleep to bring ourselves to the desired rate.
- // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
- val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
- if (sleepTime > 0) Thread.sleep(sleepTime)
- waitToWrite(numBytes)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
deleted file mode 100644
index a9dd0b1..0000000
--- a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util
-
-import org.scalatest.FunSuite
-import java.io.ByteArrayOutputStream
-import java.util.concurrent.TimeUnit._
-
-class RateLimitedOutputStreamSuite extends FunSuite {
-
- private def benchmark[U](f: => U): Long = {
- val start = System.nanoTime
- f
- System.nanoTime - start
- }
-
- test("write") {
- val underlying = new ByteArrayOutputStream
- val data = "X" * 41000
- val stream = new RateLimitedOutputStream(underlying, 10000)
- val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) }
- assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4)
- assert(underlying.toString("UTF-8") == data)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index c8b5f09..d4e06dd 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -136,6 +136,13 @@ object SparkBuild extends Build {
javaOptions += "-Xmx3g",
// Show full stack trace and duration in test cases.
testOptions in Test += Tests.Argument("-oDF"),
+ // Remove certain packages from Scaladoc
+ scalacOptions in (Compile,doc) := Seq("-skip-packages", Seq(
+ "akka",
+ "org.apache.spark.network",
+ "org.apache.spark.deploy",
+ "org.apache.spark.util.collection"
+ ).mkString(":")),
// Only allow one test at a time, even across projects, since they run in the same JVM
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
new file mode 100644
index 0000000..b9c0596
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RateLimitedOutputStream.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import scala.annotation.tailrec
+
+import java.io.OutputStream
+import java.util.concurrent.TimeUnit._
+
+class RateLimitedOutputStream(out: OutputStream, bytesPerSec: Int) extends OutputStream {
+ val SYNC_INTERVAL = NANOSECONDS.convert(10, SECONDS)
+ val CHUNK_SIZE = 8192
+ var lastSyncTime = System.nanoTime
+ var bytesWrittenSinceSync: Long = 0
+
+ override def write(b: Int) {
+ waitToWrite(1)
+ out.write(b)
+ }
+
+ override def write(bytes: Array[Byte]) {
+ write(bytes, 0, bytes.length)
+ }
+
+ @tailrec
+ override final def write(bytes: Array[Byte], offset: Int, length: Int) {
+ val writeSize = math.min(length - offset, CHUNK_SIZE)
+ if (writeSize > 0) {
+ waitToWrite(writeSize)
+ out.write(bytes, offset, writeSize)
+ write(bytes, offset + writeSize, length)
+ }
+ }
+
+ override def flush() {
+ out.flush()
+ }
+
+ override def close() {
+ out.close()
+ }
+
+ @tailrec
+ private def waitToWrite(numBytes: Int) {
+ val now = System.nanoTime
+ val elapsedSecs = SECONDS.convert(math.max(now - lastSyncTime, 1), NANOSECONDS)
+ val rate = bytesWrittenSinceSync.toDouble / elapsedSecs
+ if (rate < bytesPerSec) {
+ // It's okay to write; just update some variables and return
+ bytesWrittenSinceSync += numBytes
+ if (now > lastSyncTime + SYNC_INTERVAL) {
+ // Sync interval has passed; let's resync
+ lastSyncTime = now
+ bytesWrittenSinceSync = numBytes
+ }
+ } else {
+ // Calculate how much time we should sleep to bring ourselves to the desired rate.
+ // Based on throttler in Kafka (https://github.com/kafka-dev/kafka/blob/master/core/src/main/scala/kafka/utils/Throttler.scala)
+ val sleepTime = MILLISECONDS.convert((bytesWrittenSinceSync / bytesPerSec - elapsedSecs), SECONDS)
+ if (sleepTime > 0) Thread.sleep(sleepTime)
+ waitToWrite(numBytes)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
index 6585d49..463617a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala
@@ -17,14 +17,17 @@
package org.apache.spark.streaming.util
-import java.nio.ByteBuffer
-import org.apache.spark.util.{RateLimitedOutputStream, IntParam}
+import java.io.IOException
import java.net.ServerSocket
-import org.apache.spark.{SparkConf, Logging}
-import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+import java.nio.ByteBuffer
+
import scala.io.Source
-import java.io.IOException
+
+import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
+
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.util.IntParam
/**
* A helper program that sends blocks of Kryo-serialized text strings out on a socket at a
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/33022d66/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala
new file mode 100644
index 0000000..15f13d5
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RateLimitedOutputStreamSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.util
+
+import org.scalatest.FunSuite
+import java.io.ByteArrayOutputStream
+import java.util.concurrent.TimeUnit._
+
+class RateLimitedOutputStreamSuite extends FunSuite {
+
+ private def benchmark[U](f: => U): Long = {
+ val start = System.nanoTime
+ f
+ System.nanoTime - start
+ }
+
+ test("write") {
+ val underlying = new ByteArrayOutputStream
+ val data = "X" * 41000
+ val stream = new RateLimitedOutputStream(underlying, 10000)
+ val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) }
+ assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4)
+ assert(underlying.toString("UTF-8") == data)
+ }
+}
[2/2] git commit: Merge pull request #413 from rxin/scaladoc
Posted by pw...@apache.org.
Merge pull request #413 from rxin/scaladoc
Adjusted visibility of various components and documentation for 0.9.0 release.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/68641bce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/68641bce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/68641bce
Branch: refs/heads/master
Commit: 68641bce612d9a4c4adc34d4dd9cab6735f28335
Parents: 0ca0d4d 33022d6
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 13 22:54:13 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 13 22:54:13 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/Accumulators.scala | 4 +-
.../scala/org/apache/spark/FutureAction.scala | 8 +-
.../apache/spark/InterruptibleIterator.scala | 2 +-
.../main/scala/org/apache/spark/Logging.scala | 2 +-
.../org/apache/spark/broadcast/Broadcast.scala | 1 +
.../spark/broadcast/BroadcastFactory.scala | 2 +-
.../apache/spark/broadcast/HttpBroadcast.scala | 5 +-
.../spark/broadcast/TorrentBroadcast.scala | 6 +-
.../scala/org/apache/spark/deploy/Client.scala | 3 +-
.../spark/deploy/worker/CommandUtils.scala | 3 +-
.../main/scala/org/apache/spark/package.scala | 3 +
.../scala/org/apache/spark/rdd/PipedRDD.scala | 3 +-
.../apache/spark/scheduler/SparkListener.scala | 13 ++--
.../spark/storage/BlockObjectWriter.scala | 4 +-
.../org/apache/spark/storage/StorageLevel.scala | 4 +
.../apache/spark/util/CompletionIterator.scala | 11 +--
.../org/apache/spark/util/MetadataCleaner.scala | 8 +-
.../spark/util/RateLimitedOutputStream.scala | 79 --------------------
.../util/RateLimitedOutputStreamSuite.scala | 40 ----------
project/SparkBuild.scala | 7 ++
.../util/RateLimitedOutputStream.scala | 79 ++++++++++++++++++++
.../spark/streaming/util/RawTextSender.scala | 13 ++--
.../util/RateLimitedOutputStreamSuite.scala | 40 ++++++++++
23 files changed, 183 insertions(+), 157 deletions(-)
----------------------------------------------------------------------