You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Heather Miller (JIRA)" <ji...@apache.org> on 2014/03/31 02:45:15 UTC

[jira] [Commented] (SPARK-1296) Make RDDs Covariant

    [ https://issues.apache.org/jira/browse/SPARK-1296?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13954887#comment-13954887 ] 

Heather Miller commented on SPARK-1296:
---------------------------------------

Rephrasing Matei's question, I believe he's asking: 

{quote}“I understand that you’d like to potentially change the signature of methods used to create RDDs to take an implicit pickler in addition. However, because such creation methods, including the RDD constructors, already take an implicit ClassTag. If such a ClassTag\[T\] is not enough to create a Pickler\[T\], I am wondering whether it would be enough to change that ClassTag\[T\] to a TypeTag\[T\], so that more type information is made available. Would this change be enough to support what you suggested in the ticket?”{quote}

So his suggestion boils down to:
- *not* adding implicit picklers, but instead
- changing the implicit ClassTag to an implicit TypeTag.


The problem is that ClassTags/TypeTags don’t help when generating picklers statically. They can only be used to create runtime picklers. The reason is that the implicit macro that generates picklers only kicks in when implicit search tries to find an implicit SPickler\[T\] or an implicit DPickler\[T\] -- ClassTags/TypeTags don’t help with that.

However, the fact that ClassTags are already obtained for an RDD’s element type is still interesting, because it means that within the corresponding constructor we can trigger the static generation macro, e.g., using implicitly\[DPickler\[T\]\]. *The obtained pickler could then be stored in a private field of the RDD.*

If RDDs would be made covariant, the field’s type would need to be DPickler\[T @uncheckedVariance\] to disable variance checking. This is safe, since T is the actual type of elements stored in the RDD, and the only thing that can happen is that some elements are of a subtype of T. However, a DPickler\[T\] can pickle elements of type T or subtypes of T.

This is roughly how it could look like (not using real RDDs/DPicklers):

{code}
    scala> trait DPickler[T] {
         |   def pickle(x: T): Unit = { println("pickling " + x + " with class " + x.getClass) }
         | }
    defined trait DPickler

    scala> class RDD[+T : ClassTag](xs: T*) {
         |   val elems: List[T] = xs.toList
         |   val p: DPickler[T @uncheckedVariance] = new DPickler[T] {}
         | }
    defined class RDD

    scala> val rddi = new RDD[Int](1,2,3)
    rddi: RDD[Int] = RDD@3eaf6fe7

    scala> val rdda: RDD[Any] = rddi
    rdda: RDD[Any] = RDD@3eaf6fe7

    scala> val thePickler = rdda.p
    thePickler: DPickler[Any] = RDD$$anon$1@b2c3415

    scala> rdda.elems.foreach(x => thePickler.pickle(x))
    pickling 1 with class class java.lang.Integer
    pickling 2 with class class java.lang.Integer
    pickling 3 with class class java.lang.Integer
{code}


> Make RDDs Covariant
> -------------------
>
>                 Key: SPARK-1296
>                 URL: https://issues.apache.org/jira/browse/SPARK-1296
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>            Reporter: Michael Armbrust
>            Assignee: Michael Armbrust
>             Fix For: 1.0.0
>
>
> First, what is the problem with RDDs not being covariant
> {code}
> // Consider a function that takes a Seq of some trait.
> scala> trait A { val a = 1 }
> scala> def f(as: Seq[A]) = as.map(_.a)
> // A list of a concrete version of that trait can be used in this function.
> scala> class B extends A
> scala> f(new B :: Nil)
> res0: Seq[Int] = List(1)
> // Now lets try the same thing with RDDs
> scala> def f(as: org.apache.spark.rdd.RDD[A]) = as.map(_.a)
> scala> val rdd = sc.parallelize(new B :: Nil)
> rdd: org.apache.spark.rdd.RDD[B] = ParallelCollectionRDD[2] at parallelize at <console>:42
> // :(
> scala> f(rdd)
> <console>:45: error: type mismatch;
>  found   : org.apache.spark.rdd.RDD[B]
>  required: org.apache.spark.rdd.RDD[A]
> Note: B <: A, but class RDD is invariant in type T.
> You may wish to define T as +T instead. (SLS 4.5)
>               f(rdd)
> {code}
> h2. Is it possible to make RDDs covariant?
> Probably?  In terms of the public user interface, they are *mostly* covariant. (Internally we use the type parameter T in a lot of mutable state that breaks the covariance contract, but I think with casting we can 'promise' the compiler that we are behaving).  There are also a lot of complications with other types that we return which are invariant.
> h2. What will it take to make RDDs covariant?
> As I mention above, all of our mutable internal state is going to require casting to avoid using T.  This seems to be okay, it makes our life only slightly harder. This extra work required because we are basically promising the compiler that even if an RDD is implicitly upcast, internally we are keeping all the checkpointed data of the correct type. Since an RDD is immutable, we are okay!
> We also need to modify all the places where we use T in function parameters.  So for example:
> {code}
> def ++[U >: T : ClassTag](other: RDD[U]): RDD[U] = this.union(other).asInstanceOf[RDD[U]]
> {code}
> We are now allowing you to append an RDD of a less specific type, and then returning a less specific new RDD.  This I would argue is a good change. We are strictly improving the power of the RDD interface, while maintaining reasonable type semantics.
> h2. So, why wouldn't we do it?
> There are a lot of places where we interact with invariant types.  We return both Maps and Arrays from a lot of public functions.  Arrays are invariant (but if we returned immutable sequences instead.... we would be good), and Maps are invariant in the Key (once again, immutable sequences of tuples would be great here).
> I don't think this is a deal breaker, and we may even be able to get away with it, without changing the returns types of these functions.  For example, I think that this should work, though once again requires make promises to the compiler:
> {code}
>   /**
>    * Return an array that contains all of the elements in this RDD.
>    */
>   def collect[U >: T](): Array[U] = {
>     val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
>     Array.concat(results: _*).asInstanceOf[Array[U]]
>   }
> {code}
> I started working on this [here|https://github.com/marmbrus/spark/tree/coveriantRDD].  Thoughts / suggestions are welcome!



--
This message was sent by Atlassian JIRA
(v6.2#6252)