You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/02/04 05:17:13 UTC

spark git commit: [SPARK-4795][Core] Redesign the "primitive type => Writable" implicit APIs to make them be activated automatically

Repository: spark
Updated Branches:
  refs/heads/master 1077f2e1d -> d37978d8a


[SPARK-4795][Core] Redesign the "primitive type => Writable" implicit APIs to make them be activated automatically

Try to redesign the "primitive type => Writable" implicit APIs to make them be activated automatically and without breaking binary compatibility.

However, this PR will breaking the source compatibility if people use `xxxToXxxWritable` occasionally. See the unit test in `graphx`.

Author: zsxwing <zs...@gmail.com>

Closes #3642 from zsxwing/SPARK-4795 and squashes the following commits:

914b2d6 [zsxwing] Add implicit back to the Writables methods
0b9017f [zsxwing] Add some docs
a0e8509 [zsxwing] Merge branch 'master' into SPARK-4795
39343de [zsxwing] Fix the unit test
64853af [zsxwing] Reorganize the rest 'implicit' methods in SparkContext


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d37978d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d37978d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d37978d8

Branch: refs/heads/master
Commit: d37978d8aafef8a2e637687f3848ca0a8b935b33
Parents: 1077f2e
Author: zsxwing <zs...@gmail.com>
Authored: Tue Feb 3 20:17:12 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Feb 3 20:17:12 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 70 +++++++++++++++++++-
 .../WriteInputFormatTestDataGenerator.scala     |  1 -
 .../main/scala/org/apache/spark/package.scala   |  3 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 22 +++---
 .../spark/rdd/SequenceFileRDDFunctions.scala    | 49 ++++++++++----
 .../test/scala/org/apache/spark/FileSuite.scala |  1 -
 .../org/apache/sparktest/ImplicitSuite.scala    | 14 +++-
 .../spark/graphx/lib/ShortestPathsSuite.scala   |  2 +-
 8 files changed, 129 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d37978d8/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6a16a31..16c6fdb 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1749,8 +1749,14 @@ object SparkContext extends Logging {
   @deprecated("Replaced by implicit functions in the RDD companion object. This is " +
     "kept here only for backward compatibility.", "1.3.0")
   def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
-      rdd: RDD[(K, V)]) =
+      rdd: RDD[(K, V)]) = {
+    val kf = implicitly[K => Writable]
+    val vf = implicitly[V => Writable]
+    // Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it
+    implicit val keyWritableFactory = new WritableFactory[K](_ => null, kf)
+    implicit val valueWritableFactory = new WritableFactory[V](_ => null, vf)
     RDD.rddToSequenceFileRDDFunctions(rdd)
+  }
 
   @deprecated("Replaced by implicit functions in the RDD companion object. This is " +
     "kept here only for backward compatibility.", "1.3.0")
@@ -1767,20 +1773,35 @@ object SparkContext extends Logging {
   def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
     RDD.numericRDDToDoubleRDDFunctions(rdd)
 
-  // Implicit conversions to common Writable types, for saveAsSequenceFile
+  // The following deprecated functions have already been moved to `object WritableFactory` to
+  // make the compiler find them automatically. They are still kept here for backward compatibility.
 
+  @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+    "kept here only for backward compatibility.", "1.3.0")
   implicit def intToIntWritable(i: Int): IntWritable = new IntWritable(i)
 
+  @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+    "kept here only for backward compatibility.", "1.3.0")
   implicit def longToLongWritable(l: Long): LongWritable = new LongWritable(l)
 
+  @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+    "kept here only for backward compatibility.", "1.3.0")
   implicit def floatToFloatWritable(f: Float): FloatWritable = new FloatWritable(f)
 
+  @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+    "kept here only for backward compatibility.", "1.3.0")
   implicit def doubleToDoubleWritable(d: Double): DoubleWritable = new DoubleWritable(d)
 
+  @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+    "kept here only for backward compatibility.", "1.3.0")
   implicit def boolToBoolWritable (b: Boolean): BooleanWritable = new BooleanWritable(b)
 
+  @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+    "kept here only for backward compatibility.", "1.3.0")
   implicit def bytesToBytesWritable (aob: Array[Byte]): BytesWritable = new BytesWritable(aob)
 
+  @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " +
+    "kept here only for backward compatibility.", "1.3.0")
   implicit def stringToText(s: String): Text = new Text(s)
 
   private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
@@ -2070,7 +2091,7 @@ object WritableConverter {
     new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
   }
 
-  // The following implicit functions were in SparkContext before 1.2 and users had to
+  // The following implicit functions were in SparkContext before 1.3 and users had to
   // `import SparkContext._` to enable them. Now we move them here to make the compiler find
   // them automatically. However, we still keep the old functions in SparkContext for backward
   // compatibility and forward to the following functions directly.
@@ -2103,3 +2124,46 @@ object WritableConverter {
   implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] =
     new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])
 }
+
+/**
+ * A class encapsulating how to convert some type T to Writable. It stores both the Writable class
+ * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
+ * The Writable class will be used in `SequenceFileRDDFunctions`.
+ */
+private[spark] class WritableFactory[T](
+    val writableClass: ClassTag[T] => Class[_ <: Writable],
+    val convert: T => Writable) extends Serializable
+
+object WritableFactory {
+
+  private[spark] def simpleWritableFactory[T: ClassTag, W <: Writable : ClassTag](convert: T => W)
+    : WritableFactory[T] = {
+    val writableClass = implicitly[ClassTag[W]].runtimeClass.asInstanceOf[Class[W]]
+    new WritableFactory[T](_ => writableClass, convert)
+  }
+
+  implicit def intWritableFactory: WritableFactory[Int] =
+    simpleWritableFactory(new IntWritable(_))
+
+  implicit def longWritableFactory: WritableFactory[Long] =
+    simpleWritableFactory(new LongWritable(_))
+
+  implicit def floatWritableFactory: WritableFactory[Float] =
+    simpleWritableFactory(new FloatWritable(_))
+
+  implicit def doubleWritableFactory: WritableFactory[Double] =
+    simpleWritableFactory(new DoubleWritable(_))
+
+  implicit def booleanWritableFactory: WritableFactory[Boolean] =
+    simpleWritableFactory(new BooleanWritable(_))
+
+  implicit def bytesWritableFactory: WritableFactory[Array[Byte]] =
+    simpleWritableFactory(new BytesWritable(_))
+
+  implicit def stringWritableFactory: WritableFactory[String] =
+    simpleWritableFactory(new Text(_))
+
+  implicit def writableWritableFactory[T <: Writable: ClassTag]: WritableFactory[T] =
+    simpleWritableFactory(w => w)
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d37978d8/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
index c0cbd28..cf289fb 100644
--- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala
@@ -107,7 +107,6 @@ private[python] class WritableToDoubleArrayConverter extends Converter[Any, Arra
  * given directory (probably a temp directory)
  */
 object WriteInputFormatTestDataGenerator {
-  import SparkContext._
 
   def main(args: Array[String]) {
     val path = args(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/d37978d8/core/src/main/scala/org/apache/spark/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala
index 5ad73c3..b6249b4 100644
--- a/core/src/main/scala/org/apache/spark/package.scala
+++ b/core/src/main/scala/org/apache/spark/package.scala
@@ -27,8 +27,7 @@ package org.apache
  * contains operations available only on RDDs of Doubles; and
  * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can
  * be saved as SequenceFiles. These operations are automatically available on any RDD of the right
- * type (e.g. RDD[(Int, Int)] through implicit conversions except `saveAsSequenceFile`. You need to
- * `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work.
+ * type (e.g. RDD[(Int, Int)] through implicit conversions.
  *
  * Java programmers should reference the [[org.apache.spark.api.java]] package
  * for Spark programming APIs in Java.

http://git-wip-us.apache.org/repos/asf/spark/blob/d37978d8/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 97aee58..fe55a51 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -25,11 +25,8 @@ import scala.language.implicitConversions
 import scala.reflect.{classTag, ClassTag}
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
-import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.{Writable, BytesWritable, NullWritable, Text}
 import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapred.TextOutputFormat
 
 import org.apache.spark._
@@ -57,8 +54,7 @@ import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, Bernoulli
  * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
  * can be saved as SequenceFiles.
  * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
- * through implicit conversions except `saveAsSequenceFile`. You need to
- * `import org.apache.spark.SparkContext._` to make `saveAsSequenceFile` work.
+ * through implicit.
  *
  * Internally, each RDD is characterized by five main properties:
  *
@@ -1527,7 +1523,7 @@ abstract class RDD[T: ClassTag](
  */
 object RDD {
 
-  // The following implicit functions were in SparkContext before 1.2 and users had to
+  // The following implicit functions were in SparkContext before 1.3 and users had to
   // `import SparkContext._` to enable them. Now we move them here to make the compiler find
   // them automatically. However, we still keep the old functions in SparkContext for backward
   // compatibility and forward to the following functions directly.
@@ -1541,9 +1537,15 @@ object RDD {
     new AsyncRDDActions(rdd)
   }
 
-  implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
-      rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = {
-    new SequenceFileRDDFunctions(rdd)
+  implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])
+      (implicit kt: ClassTag[K], vt: ClassTag[V],
+                keyWritableFactory: WritableFactory[K],
+                valueWritableFactory: WritableFactory[V])
+    : SequenceFileRDDFunctions[K, V] = {
+    implicit val keyConverter = keyWritableFactory.convert
+    implicit val valueConverter = valueWritableFactory.convert
+    new SequenceFileRDDFunctions(rdd,
+      keyWritableFactory.writableClass(kt), valueWritableFactory.writableClass(vt))
   }
 
   implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])

http://git-wip-us.apache.org/repos/asf/spark/blob/d37978d8/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
index 2b48916..059f896 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -30,13 +30,35 @@ import org.apache.spark.Logging
  * through an implicit conversion. Note that this can't be part of PairRDDFunctions because
  * we need more implicit parameters to convert our keys and values to Writable.
  *
- * Import `org.apache.spark.SparkContext._` at the top of their program to use these functions.
  */
 class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag](
-    self: RDD[(K, V)])
+    self: RDD[(K, V)],
+    _keyWritableClass: Class[_ <: Writable],
+    _valueWritableClass: Class[_ <: Writable])
   extends Logging
   with Serializable {
 
+  @deprecated("It's used to provide backward compatibility for pre 1.3.0.", "1.3.0")
+  def this(self: RDD[(K, V)]) {
+    this(self, null, null)
+  }
+
+  private val keyWritableClass =
+    if (_keyWritableClass == null) {
+      // pre 1.3.0, we need to use Reflection to get the Writable class
+      getWritableClass[K]()
+    } else {
+      _keyWritableClass
+    }
+
+  private val valueWritableClass =
+    if (_valueWritableClass == null) {
+      // pre 1.3.0, we need to use Reflection to get the Writable class
+      getWritableClass[V]()
+    } else {
+      _valueWritableClass
+    }
+
   private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = {
     val c = {
       if (classOf[Writable].isAssignableFrom(classTag[T].runtimeClass)) {
@@ -55,6 +77,7 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
     c.asInstanceOf[Class[_ <: Writable]]
   }
 
+
   /**
    * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
    * and value types. If the key or value are Writable, then we use their classes directly;
@@ -65,26 +88,28 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag
   def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
     def anyToWritable[U <% Writable](u: U): Writable = u
 
-    val keyClass = getWritableClass[K]
-    val valueClass = getWritableClass[V]
-    val convertKey = !classOf[Writable].isAssignableFrom(self.keyClass)
-    val convertValue = !classOf[Writable].isAssignableFrom(self.valueClass)
+    // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and
+    // valueWritableClass at the compile time. To implement that, we need to add type parameters to
+    // SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a
+    // breaking change.
+    val convertKey = self.keyClass != keyWritableClass
+    val convertValue = self.valueClass != valueWritableClass
 
-    logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," +
-      valueClass.getSimpleName + ")" )
+    logInfo("Saving as sequence file of type (" + keyWritableClass.getSimpleName + "," +
+      valueWritableClass.getSimpleName + ")" )
     val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
     val jobConf = new JobConf(self.context.hadoopConfiguration)
     if (!convertKey && !convertValue) {
-      self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
+      self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec)
     } else if (!convertKey && convertValue) {
       self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
-        path, keyClass, valueClass, format, jobConf, codec)
+        path, keyWritableClass, valueWritableClass, format, jobConf, codec)
     } else if (convertKey && !convertValue) {
       self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
-        path, keyClass, valueClass, format, jobConf, codec)
+        path, keyWritableClass, valueWritableClass, format, jobConf, codec)
     } else if (convertKey && convertValue) {
       self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
-        path, keyClass, valueClass, format, jobConf, codec)
+        path, keyWritableClass, valueWritableClass, format, jobConf, codec)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d37978d8/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
index 5e24196..7acd27c 100644
--- a/core/src/test/scala/org/apache/spark/FileSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.lib.input.{FileSplit => NewFileSplit, TextInp
 import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat}
 import org.scalatest.FunSuite
 
-import org.apache.spark.SparkContext._
 import org.apache.spark.rdd.{NewHadoopRDD, HadoopRDD}
 import org.apache.spark.util.Utils
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d37978d8/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala
index 4918e2d..daa795a 100644
--- a/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala
+++ b/core/src/test/scala/org/apache/sparktest/ImplicitSuite.scala
@@ -44,13 +44,21 @@ class ImplicitSuite {
   }
 
   def testRddToSequenceFileRDDFunctions(): Unit = {
-    // TODO eliminating `import intToIntWritable` needs refactoring SequenceFileRDDFunctions.
-    // That will be a breaking change.
-    import org.apache.spark.SparkContext.intToIntWritable
     val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
     rdd.saveAsSequenceFile("/a/test/path")
   }
 
+  def testRddToSequenceFileRDDFunctionsWithWritable(): Unit = {
+    val rdd: org.apache.spark.rdd.RDD[(org.apache.hadoop.io.IntWritable, org.apache.hadoop.io.Text)]
+      = mockRDD
+    rdd.saveAsSequenceFile("/a/test/path")
+  }
+
+  def testRddToSequenceFileRDDFunctionsWithBytesArray(): Unit = {
+    val rdd: org.apache.spark.rdd.RDD[(Int, Array[Byte])] = mockRDD
+    rdd.saveAsSequenceFile("/a/test/path")
+  }
+
   def testRddToOrderedRDDFunctions(): Unit = {
     val rdd: org.apache.spark.rdd.RDD[(Int, Int)] = mockRDD
     rdd.sortByKey()

http://git-wip-us.apache.org/repos/asf/spark/blob/d37978d8/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
----------------------------------------------------------------------
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
index 265827b..f2c38e7 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
@@ -40,7 +40,7 @@ class ShortestPathsSuite extends FunSuite with LocalSparkContext {
       val graph = Graph.fromEdgeTuples(edges, 1)
       val landmarks = Seq(1, 4).map(_.toLong)
       val results = ShortestPaths.run(graph, landmarks).vertices.collect.map {
-        case (v, spMap) => (v, spMap.mapValues(_.get))
+        case (v, spMap) => (v, spMap.mapValues(i => i))
       }
       assert(results.toSet === shortestPaths)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org