You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/09 09:56:22 UTC

[1/3] git commit: Minor style cleanup. Mostly on indenting & line width changes.

Updated Branches:
  refs/heads/master 73c724e12 -> 365cac946


Minor style cleanup. Mostly on indenting & line width changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/46f6a3b6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/46f6a3b6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/46f6a3b6

Branch: refs/heads/master
Commit: 46f6a3b6aa23aa6dada66423d2851de5a4e8bee5
Parents: 04d83fc
Author: Reynold Xin <rx...@apache.org>
Authored: Wed Jan 8 14:55:04 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Wed Jan 8 14:55:04 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Partitioner.scala    |  7 +--
 .../scala/org/apache/spark/SparkContext.scala   | 30 ++++++-----
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 52 +++++++++++---------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 32 ++++++------
 4 files changed, 67 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46f6a3b6/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 31b0773..9b043f0 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -61,7 +61,8 @@ object Partitioner {
 }
 
 /**
- * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`.
+ * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
+ * Java's `Object.hashCode`.
  *
  * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
  * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
@@ -84,8 +85,8 @@ class HashPartitioner(partitions: Int) extends Partitioner {
 }
 
 /**
- * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges.
- * Determines the ranges by sampling the RDD passed in.
+ * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
+ * equal ranges. The ranges are determined by sampling the content of the RDD passed in.
  */
 class RangePartitioner[K <% Ordered[K]: ClassTag, V](
     partitions: Int,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46f6a3b6/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0e47f4e..47574ab 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -31,9 +31,9 @@ import scala.reflect.{ClassTag, classTag}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
-FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
+  FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
-TextInputFormat}
+  TextInputFormat}
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
 import org.apache.mesos.MesosNativeLibrary
@@ -49,7 +49,7 @@ import org.apache.spark.scheduler.local.LocalBackend
 import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
 import org.apache.spark.ui.SparkUI
 import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType,
-ClosureCleaner}
+  ClosureCleaner}
 
 /**
  * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -517,15 +517,15 @@ class SparkContext(
   // Methods for creating shared variables
 
   /**
-   * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" values
-   * to using the `+=` method. Only the driver can access the accumulator's `value`.
+   * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
+   * values to using the `+=` method. Only the driver can access the accumulator's `value`.
    */
   def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
     new Accumulator(initialValue, param)
 
   /**
-   * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values with `+=`.
-   * Only the driver can access the accumuable's `value`.
+   * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
+   * with `+=`. Only the driver can access the accumuable's `value`.
    * @tparam T accumulator type
    * @tparam R type that can be added to the accumulator
    */
@@ -538,13 +538,15 @@ class SparkContext(
    * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
    * standard mutable collections. So you can use this with mutable Map, Set, etc.
    */
-  def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = {
+  def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
+      (initialValue: R) = {
     val param = new GrowableAccumulableParam[R,T]
     new Accumulable(initialValue, param)
   }
 
   /**
-   * Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broadcast.Broadcast]] object for
+   * Broadcast a read-only variable to the cluster, returning a
+   * [[org.apache.spark.broadcast.Broadcast]] object for
    * reading it in distributed functions. The variable will be sent to each cluster only once.
    */
   def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
@@ -1010,7 +1012,8 @@ object SparkContext {
 
   implicit def stringToText(s: String) = new Text(s)
 
-  private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]): ArrayWritable = {
+  private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
+    : ArrayWritable = {
     def anyToWritable[U <% Writable](u: U): Writable = u
 
     new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]],
@@ -1033,7 +1036,9 @@ object SparkContext {
 
   implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get)
 
-  implicit def bytesWritableConverter() = simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
+  implicit def bytesWritableConverter() = {
+    simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
+  }
 
   implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
 
@@ -1049,7 +1054,8 @@ object SparkContext {
     if (uri != null) {
       val uriStr = uri.toString
       if (uriStr.startsWith("jar:file:")) {
-        // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class", so pull out the /path/foo.jar
+        // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class",
+        // so pull out the /path/foo.jar
         List(uriStr.substring("jar:file:".length, uriStr.indexOf('!')))
       } else {
         Nil

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46f6a3b6/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index a0809cd..c118ddf 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -18,35 +18,34 @@
 package org.apache.spark.rdd
 
 import java.nio.ByteBuffer
-import java.util.Date
 import java.text.SimpleDateFormat
+import java.util.Date
 import java.util.{HashMap => JHashMap}
 
-import scala.collection.{mutable, Map}
+import scala.collection.Map
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
 import scala.reflect.{ClassTag, classTag}
 
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.io.compress.CompressionCodec
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
 import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
 import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLog
 
+// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
+import org.apache.hadoop.mapred.SparkHadoopWriter
+import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.spark._
 import org.apache.spark.SparkContext._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
-import org.apache.spark.Aggregator
-import org.apache.spark.Partitioner
 import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.util.SerializableHyperLogLog
 
@@ -120,9 +119,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
   }
 
   /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
+   * Merge the values for each key using an associative function and a neutral "zero value" which
+   * may be added to the result an arbitrary number of times, and must not change the result
+   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
   def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
     // Serialize the zero value to a byte array so that we can get a new clone of it on each key
@@ -138,18 +137,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
   }
 
   /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
+   * Merge the values for each key using an associative function and a neutral "zero value" which
+   * may be added to the result an arbitrary number of times, and must not change the result
+   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
   def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
     foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
   }
 
   /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
+   * Merge the values for each key using an associative function and a neutral "zero value" which
+   * may be added to the result an arbitrary number of times, and must not change the result
+   * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
    */
   def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
     foldByKey(zeroValue, defaultPartitioner(self))(func)
@@ -226,7 +225,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
   }
 
   /**
-   * Return approximate number of distinct values for each key in this RDD. 
+   * Return approximate number of distinct values for each key in this RDD.
    * The accuracy of approximation can be controlled through the relative standard deviation
    * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
    * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
@@ -579,7 +578,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
    */
   def saveAsHadoopFile[F <: OutputFormat[K, V]](
       path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) {
-    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]], codec)
+    val runtimeClass = fm.runtimeClass
+    saveAsHadoopFile(path, getKeyClass, getValueClass, runtimeClass.asInstanceOf[Class[F]], codec)
   }
 
   /**
@@ -599,7 +599,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
       keyClass: Class[_],
       valueClass: Class[_],
       outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
-      conf: Configuration = self.context.hadoopConfiguration) {
+      conf: Configuration = self.context.hadoopConfiguration)
+  {
     val job = new NewAPIHadoopJob(conf)
     job.setOutputKeyClass(keyClass)
     job.setOutputValueClass(valueClass)
@@ -668,7 +669,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
       codec: Option[Class[_ <: CompressionCodec]] = None) {
     conf.setOutputKeyClass(keyClass)
     conf.setOutputValueClass(valueClass)
-    // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
+    // Doesn't work in Scala 2.9 due to what may be a generics bug
+    // TODO: Should we uncomment this for Scala 2.10?
+    // conf.setOutputFormat(outputFormatClass)
     conf.set("mapred.output.format.class", outputFormatClass.getName)
     for (c <- codec) {
       conf.setCompressMapOutput(true)
@@ -702,7 +705,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
       throw new SparkException("Output value class not set")
     }
 
-    logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
+    logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
+      valueClass.getSimpleName+ ")")
 
     val writer = new SparkHadoopWriter(conf)
     writer.preSetup()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46f6a3b6/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3f41b66..f9dc12e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -23,7 +23,6 @@ import scala.collection.Map
 import scala.collection.JavaConversions.mapAsScalaMap
 import scala.collection.mutable.ArrayBuffer
 
-import scala.collection.mutable.HashMap
 import scala.reflect.{classTag, ClassTag}
 
 import org.apache.hadoop.io.BytesWritable
@@ -52,11 +51,13 @@ import org.apache.spark._
  * partitioned collection of elements that can be operated on in parallel. This class contains the
  * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
  * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
- * pairs, such as `groupByKey` and `join`; [[org.apache.spark.rdd.DoubleRDDFunctions]] contains
- * operations available only on RDDs of Doubles; and [[org.apache.spark.rdd.SequenceFileRDDFunctions]]
- * contains operations available on RDDs that can be saved as SequenceFiles. These operations are
- * automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit
- * conversions when you `import org.apache.spark.SparkContext._`.
+ * pairs, such as `groupByKey` and `join`;
+ * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
+ * Doubles; and
+ * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
+ * can be saved as SequenceFiles.
+ * These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
+ * through implicit conversions when you `import org.apache.spark.SparkContext._`.
  *
  * Internally, each RDD is characterized by five main properties:
  *
@@ -235,12 +236,9 @@ abstract class RDD[T: ClassTag](
   /**
    * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
    */
-  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
-    if (isCheckpointed) {
-      firstParent[T].iterator(split, context)
-    } else {
-      compute(split, context)
-    }
+  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
+  {
+    if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
   }
 
   // Transformations (return a new RDD)
@@ -268,6 +266,9 @@ abstract class RDD[T: ClassTag](
   def distinct(numPartitions: Int): RDD[T] =
     map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
 
+  /**
+   * Return a new RDD containing the distinct elements in this RDD.
+   */
   def distinct(): RDD[T] = distinct(partitions.size)
 
   /**
@@ -280,7 +281,7 @@ abstract class RDD[T: ClassTag](
    * which can avoid performing a shuffle.
    */
   def repartition(numPartitions: Int): RDD[T] = {
-    coalesce(numPartitions, true)
+    coalesce(numPartitions, shuffle = true)
   }
 
   /**
@@ -646,7 +647,8 @@ abstract class RDD[T: ClassTag](
   }
 
   /**
-   * Reduces the elements of this RDD using the specified commutative and associative binary operator.
+   * Reduces the elements of this RDD using the specified commutative and
+   * associative binary operator.
    */
   def reduce(f: (T, T) => T): T = {
     val cleanF = sc.clean(f)
@@ -953,7 +955,7 @@ abstract class RDD[T: ClassTag](
   private var storageLevel: StorageLevel = StorageLevel.NONE
 
   /** Record user function generating this RDD. */
-  @transient private[spark] val origin = sc.getCallSite
+  @transient private[spark] val origin = sc.getCallSite()
 
   private[spark] def elementClassTag: ClassTag[T] = classTag[T]
 


[3/3] git commit: Merge pull request #361 from rxin/clean

Posted by rx...@apache.org.
Merge pull request #361 from rxin/clean

Minor style cleanup. Mostly on indenting & line width changes.

Focused on the few important files since they are the files that new contributors usually read first.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/365cac94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/365cac94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/365cac94

Branch: refs/heads/master
Commit: 365cac94652cd012bf3783f74eed98c95b884bbb
Parents: 73c724e 295d825
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jan 9 00:56:16 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jan 9 00:56:16 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Partitioner.scala    |  7 +--
 .../scala/org/apache/spark/SparkContext.scala   | 32 +++++++-----
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 52 +++++++++++---------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 32 ++++++------
 4 files changed, 68 insertions(+), 55 deletions(-)
----------------------------------------------------------------------



[2/3] git commit: Minor update on SparkContext.broadcast's JavaDoc.

Posted by rx...@apache.org.
Minor update on SparkContext.broadcast's JavaDoc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/295d8258
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/295d8258
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/295d8258

Branch: refs/heads/master
Commit: 295d82583a8e162a41a7cc59a6612490580ffe09
Parents: 46f6a3b
Author: Reynold Xin <rx...@apache.org>
Authored: Thu Jan 9 00:30:22 2014 -0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Thu Jan 9 00:30:22 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/295d8258/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 47574ab..fce8f2d 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -546,8 +546,8 @@ class SparkContext(
 
   /**
    * Broadcast a read-only variable to the cluster, returning a
-   * [[org.apache.spark.broadcast.Broadcast]] object for
-   * reading it in distributed functions. The variable will be sent to each cluster only once.
+   * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
+   * The variable will be sent to each cluster only once.
    */
   def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)