You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/12/16 17:47:15 UTC
spark git commit: [SPARK-18708][CORE] Improvement/improve docs in
spark context file
Repository: spark
Updated Branches:
refs/heads/master 836c95b10 -> f7a574a6c
[SPARK-18708][CORE] Improvement/improve docs in spark context file
## What changes were proposed in this pull request?
SparkContext.scala was created a long time ago and contains several types of Scaladocs/Javadocs mixed together. Public methods/fields should have a Scaladoc that is formatted in the same way everywhere. This pull request also adds scaladoc to methods/fields that did not have it before.
## How was this patch tested?
No actual code was modified, only comments.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Aliaksandr.Bedrytski <al...@valtech.co.uk>
Closes #16137 from Mironor/improvement/improve-docs-in-spark-context-file.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7a574a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7a574a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7a574a6
Branch: refs/heads/master
Commit: f7a574a6cbfbf7adce677819ddc892ceab905ce2
Parents: 836c95b
Author: Aliaksandr.Bedrytski <al...@valtech.co.uk>
Authored: Fri Dec 16 17:47:08 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Dec 16 17:47:08 2016 +0000
----------------------------------------------------------------------
.../scala/org/apache/spark/SparkContext.scala | 198 ++++++++++++++++---
1 file changed, 169 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/f7a574a6/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index bd3f454..cae22d7 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -673,10 +673,10 @@ class SparkContext(config: SparkConf) extends Logging {
* sc.cancelJobGroup("some_job_to_cancel")
* }}}
*
- * If interruptOnCancel is set to true for the job group, then job cancellation will result
- * in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
- * that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
- * where HDFS may respond to Thread.interrupt() by marking nodes as dead.
+ * @param interruptOnCancel If true, then job cancellation will result in `Thread.interrupt()`
+ * being called on the job's executor threads. This is useful to help ensure that the tasks
+ * are actually stopped in a timely manner, but is off by default due to HDFS-1208, where HDFS
+ * may respond to Thread.interrupt() by marking nodes as dead.
*/
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) {
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description)
@@ -712,6 +712,9 @@ class SparkContext(config: SparkConf) extends Logging {
* modified collection. Pass a copy of the argument to avoid this.
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
+ * @param seq Scala collection to distribute
+ * @param numSlices number of partitions to divide the collection into
+ * @return RDD representing distributed collection
*/
def parallelize[T: ClassTag](
seq: Seq[T],
@@ -729,8 +732,8 @@ class SparkContext(config: SparkConf) extends Logging {
* @param start the start value.
* @param end the end value.
* @param step the incremental step
- * @param numSlices the partition number of the new RDD.
- * @return
+ * @param numSlices number of partitions to divide the collection into
+ * @return RDD representing distributed range
*/
def range(
start: Long,
@@ -795,6 +798,9 @@ class SparkContext(config: SparkConf) extends Logging {
/** Distribute a local Scala collection to form an RDD.
*
* This method is identical to `parallelize`.
+ * @param seq Scala collection to distribute
+ * @param numSlices number of partitions to divide the collection into
+ * @return RDD representing distributed collection
*/
def makeRDD[T: ClassTag](
seq: Seq[T],
@@ -806,6 +812,8 @@ class SparkContext(config: SparkConf) extends Logging {
* Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item.
+ * @param seq list of tuples of data and location preferences (hostnames of Spark nodes)
+ * @return RDD representing data partitioned according to location preferences
*/
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
@@ -816,6 +824,9 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
+ * @param path path to the text file on a supported file system
+ * @param minPartitions suggested minimum number of partitions for the resulting RDD
+ * @return RDD of lines of the text file
*/
def textFile(
path: String,
@@ -857,6 +868,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @param path Directory to the input data files, the path can be comma separated paths as the
* list of inputs.
* @param minPartitions A suggestion value of the minimal splitting number for input data.
+ * @return RDD representing tuples of file path and the corresponding file content
*/
def wholeTextFiles(
path: String,
@@ -908,6 +920,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @param path Directory to the input data files, the path can be comma separated paths as the
* list of inputs.
* @param minPartitions A suggestion value of the minimal splitting number for input data.
+ * @return RDD representing tuples of file path and corresponding file content
*/
def binaryFiles(
path: String,
@@ -968,10 +981,11 @@ class SparkContext(config: SparkConf) extends Logging {
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
- * @param inputFormatClass Class of the InputFormat
- * @param keyClass Class of the keys
- * @param valueClass Class of the values
+ * @param inputFormatClass storage format of the data to be read
+ * @param keyClass `Class` of the key associated with the `inputFormatClass` parameter
+ * @param valueClass `Class` of the value associated with the `inputFormatClass` parameter
* @param minPartitions Minimum number of Hadoop Splits to generate.
+ * @return RDD of tuples of key and corresponding value
*
* @note Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
@@ -1003,6 +1017,13 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
+ * @param path directory to the input data files, the path can be comma separated paths
+ * as a list of inputs
+ * @param inputFormatClass storage format of the data to be read
+ * @param keyClass `Class` of the key associated with the `inputFormatClass` parameter
+ * @param valueClass `Class` of the value associated with the `inputFormatClass` parameter
+ * @param minPartitions suggested minimum number of partitions for the resulting RDD
+ * @return RDD of tuples of key and corresponding value
*/
def hadoopFile[K, V](
path: String,
@@ -1042,6 +1063,10 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
+ * @param path directory to the input data files, the path can be comma separated paths
+ * as a list of inputs
+ * @param minPartitions suggested minimum number of partitions for the resulting RDD
+ * @return RDD of tuples of key and corresponding value
*/
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minPartitions: Int)
@@ -1066,13 +1091,32 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
+ * @param path directory to the input data files, the path can be comma separated paths as
+ * a list of inputs
+ * @return RDD of tuples of key and corresponding value
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
hadoopFile[K, V, F](path, defaultMinPartitions)
}
- /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
+ /**
+ * Smarter version of `newApiHadoopFile` that uses class tags to figure out the classes of keys,
+ * values and the `org.apache.hadoop.mapreduce.InputFormat` (new MapReduce API) so that user
+ * don't need to pass them directly. Instead, callers can just write, for example:
+ * ```
+ * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
+ * ```
+ *
+ * @note Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
+ * @param path directory to the input data files, the path can be comma separated paths
+ * as a list of inputs
+ * @return RDD of tuples of key and corresponding value
+ */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope {
@@ -1092,6 +1136,13 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
+ * @param path directory to the input data files, the path can be comma separated paths
+ * as a list of inputs
+ * @param fClass storage format of the data to be read
+ * @param kClass `Class` of the key associated with the `fClass` parameter
+ * @param vClass `Class` of the value associated with the `fClass` parameter
+ * @param conf Hadoop configuration
+ * @return RDD of tuples of key and corresponding value
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
@@ -1123,9 +1174,9 @@ class SparkContext(config: SparkConf) extends Logging {
* Therefore if you plan to reuse this conf to create multiple RDDs, you need to make
* sure you won't modify the conf. A safe approach is always creating a new conf for
* a new RDD.
- * @param fClass Class of the InputFormat
- * @param kClass Class of the keys
- * @param vClass Class of the values
+ * @param fClass storage format of the data to be read
+ * @param kClass `Class` of the key associated with the `fClass` parameter
+ * @param vClass `Class` of the value associated with the `fClass` parameter
*
* @note Because Hadoop's RecordReader class re-uses the same Writable object for each
* record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
@@ -1158,6 +1209,12 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
+ * @param path directory to the input data files, the path can be comma separated paths
+ * as a list of inputs
+ * @param keyClass `Class` of the key associated with `SequenceFileInputFormat`
+ * @param valueClass `Class` of the value associated with `SequenceFileInputFormat`
+ * @param minPartitions suggested minimum number of partitions for the resulting RDD
+ * @return RDD of tuples of key and corresponding value
*/
def sequenceFile[K, V](path: String,
keyClass: Class[K],
@@ -1177,6 +1234,11 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
+ * @param path directory to the input data files, the path can be comma separated paths
+ * as a list of inputs
+ * @param keyClass `Class` of the key associated with `SequenceFileInputFormat`
+ * @param valueClass `Class` of the value associated with `SequenceFileInputFormat`
+ * @return RDD of tuples of key and corresponding value
*/
def sequenceFile[K, V](
path: String,
@@ -1207,6 +1269,10 @@ class SparkContext(config: SparkConf) extends Logging {
* operation will create many references to the same object.
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
+ * @param path directory to the input data files, the path can be comma separated paths
+ * as a list of inputs
+ * @param minPartitions suggested minimum number of partitions for the resulting RDD
+ * @return RDD of tuples of key and corresponding value
*/
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
@@ -1231,6 +1297,11 @@ class SparkContext(config: SparkConf) extends Logging {
* be pretty slow if you use the default serializer (Java serialization),
* though the nice thing about it is that there's very little effort required to save arbitrary
* objects.
+ *
+ * @param path directory to the input data files, the path can be comma separated paths
+ * as a list of inputs
+ * @param minPartitions suggested minimum number of partitions for the resulting RDD
+ * @return RDD representing deserialized data from the file(s)
*/
def objectFile[T: ClassTag](
path: String,
@@ -1410,6 +1481,9 @@ class SparkContext(config: SparkConf) extends Logging {
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
+ *
+ * @param value value to broadcast to the Spark nodes
+ * @return `Broadcast` object, a read-only variable cached on each machine
*/
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
assertNotStopped()
@@ -1424,8 +1498,9 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Add a file to be downloaded with this Spark job on every node.
- * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
- * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
+ *
+ * @param path can be either a local file, a file in HDFS (or other Hadoop-supported
+ * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
* use `SparkFiles.get(fileName)` to find its download location.
*/
def addFile(path: String): Unit = {
@@ -1439,12 +1514,12 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Add a file to be downloaded with this Spark job on every node.
- * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
- * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
- * use `SparkFiles.get(fileName)` to find its download location.
*
- * A directory can be given if the recursive option is set to true. Currently directories are only
- * supported for Hadoop-supported filesystems.
+ * @param path can be either a local file, a file in HDFS (or other Hadoop-supported
+ * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
+ * use `SparkFiles.get(fileName)` to find its download location.
+ * @param recursive if true, a directory can be given in `path`. Currently directories are
+ * only supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
val uri = new Path(path).toUri
@@ -1715,9 +1790,9 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
- * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
- * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
- * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
+ * Adds a JAR dependency for all tasks to be executed on this `SparkContext` in the future.
+ * @param path can be either a local file, a file in HDFS (or other Hadoop-supported filesystems),
+ * an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
*/
def addJar(path: String) {
if (path == null) {
@@ -1907,6 +1982,12 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Run a function on a given set of partitions in an RDD and pass the results to the given
* handler function. This is the main entry point for all actions in Spark.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param func a function to run on each partition of the RDD
+ * @param partitions set of partitions to run on; some jobs may not want to compute on all
+ * partitions of the target RDD, e.g. for operations like `first()`
+ * @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
@@ -1929,6 +2010,14 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Run a function on a given set of partitions in an RDD and return the results as an array.
+ * The function that is run against each partition additionally takes `TaskContext` argument.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param func a function to run on each partition of the RDD
+ * @param partitions set of partitions to run on; some jobs may not want to compute on all
+ * partitions of the target RDD, e.g. for operations like `first()`
+ * @return in-memory collection with a result of the job (each collection element will contain
+ * a result from one partition)
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
@@ -1940,8 +2029,14 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
- * Run a job on a given set of partitions of an RDD, but take a function of type
- * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
+ * Run a function on a given set of partitions in an RDD and return the results as an array.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param func a function to run on each partition of the RDD
+ * @param partitions set of partitions to run on; some jobs may not want to compute on all
+ * partitions of the target RDD, e.g. for operations like `first()`
+ * @return in-memory collection with a result of the job (each collection element will contain
+ * a result from one partition)
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
@@ -1952,7 +2047,13 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
- * Run a job on all partitions in an RDD and return the results in an array.
+ * Run a job on all partitions in an RDD and return the results in an array. The function
+ * that is run against each partition additionally takes `TaskContext` argument.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param func a function to run on each partition of the RDD
+ * @return in-memory collection with a result of the job (each collection element will contain
+ * a result from one partition)
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
@@ -1960,13 +2061,23 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Run a job on all partitions in an RDD and return the results in an array.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param func a function to run on each partition of the RDD
+ * @return in-memory collection with a result of the job (each collection element will contain
+ * a result from one partition)
*/
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.length)
}
/**
- * Run a job on all partitions in an RDD and pass the results to a handler function.
+ * Run a job on all partitions in an RDD and pass the results to a handler function. The function
+ * that is run against each partition additionally takes `TaskContext` argument.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param processPartition a function to run on each partition of the RDD
+ * @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
@@ -1978,6 +2089,10 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Run a job on all partitions in an RDD and pass the results to a handler function.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param processPartition a function to run on each partition of the RDD
+ * @param resultHandler callback to pass each result to
*/
def runJob[T, U: ClassTag](
rdd: RDD[T],
@@ -1991,6 +2106,13 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* :: DeveloperApi ::
* Run a job that can return approximate results.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param func a function to run on each partition of the RDD
+ * @param evaluator `ApproximateEvaluator` to receive the partial results
+ * @param timeout maximum time to wait for the job, in milliseconds
+ * @return partial result (how partial depends on whether the job was finished before or
+ * after timeout)
*/
@DeveloperApi
def runApproximateJob[T, U, R](
@@ -2012,6 +2134,13 @@ class SparkContext(config: SparkConf) extends Logging {
/**
* Submit a job for execution and return a FutureJob holding the result.
+ *
+ * @param rdd target RDD to run tasks on
+ * @param processPartition a function to run on each partition of the RDD
+ * @param partitions set of partitions to run on; some jobs may not want to compute on all
+ * partitions of the target RDD, e.g. for operations like `first()`
+ * @param resultHandler callback to pass each result to
+ * @param resultFunc function to be executed when the result is ready
*/
def submitJob[T, U, R](
rdd: RDD[T],
@@ -2096,6 +2225,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
* @throws SparkException if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
* serializable
+ * @return the cleaned closure
*/
private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
ClosureCleaner.clean(f, checkSerializable)
@@ -2103,8 +2233,9 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
- * Set the directory under which RDDs are going to be checkpointed. The directory must
- * be a HDFS path if running on a cluster.
+ * Set the directory under which RDDs are going to be checkpointed.
+ * @param directory path to the directory where checkpoint files will be stored
+ * (must be HDFS path if running in cluster)
*/
def setCheckpointDir(directory: String) {
@@ -2311,6 +2442,8 @@ object SparkContext extends Logging {
*
* @note This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
+ * @param config `SparkConfig` that will be used for initialisation of the `SparkContext`
+ * @return current `SparkContext` (or a new one if it wasn't created before the function call)
*/
def getOrCreate(config: SparkConf): SparkContext = {
// Synchronize to ensure that multiple create requests don't trigger an exception
@@ -2336,6 +2469,7 @@ object SparkContext extends Logging {
*
* @note This function cannot be used to create multiple SparkContext instances
* even if multiple contexts are allowed.
+ * @return current `SparkContext` (or a new one if wasn't created before the function call)
*/
def getOrCreate(): SparkContext = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
@@ -2416,6 +2550,9 @@ object SparkContext extends Logging {
/**
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to SparkContext.
+ *
+ * @param cls class that should be inside of the jar
+ * @return jar that contains the Class, `None` if not found
*/
def jarOfClass(cls: Class[_]): Option[String] = {
val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
@@ -2437,6 +2574,9 @@ object SparkContext extends Logging {
* Find the JAR that contains the class of a particular object, to make it easy for users
* to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in
* your driver program.
+ *
+ * @param obj reference to an instance which class should be inside of the jar
+ * @return jar that contains the class of the instance, `None` if not found
*/
def jarOfObject(obj: AnyRef): Option[String] = jarOfClass(obj.getClass)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org