You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2018/08/10 00:41:59 UTC

[GitHub] spark pull request #22011: [SPARK-24822][PySpark] Python support for barrier...

Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22011#discussion_r209118962
  
    --- Diff: python/pyspark/rdd.py ---
    @@ -2429,6 +2449,36 @@ def _wrap_function(sc, func, deserializer, serializer, profiler=None):
                                       sc.pythonVer, broadcast_vars, sc._javaAccumulator)
     
     
    +class RDDBarrier(object):
    +
    +    """
    +    .. note:: Experimental
    +
    +    An RDDBarrier turns an RDD into a barrier RDD, which forces Spark to launch tasks of the stage
    +    contains this RDD together.
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    def __init__(self, rdd):
    +        self.rdd = rdd
    +        self._jrdd = rdd._jrdd
    +
    +    def mapPartitions(self, f, preservesPartitioning=False):
    +        """
    +        .. note:: Experimental
    +
    +        Return a new RDD by applying a function to each partition of this RDD.
    +
    +        .. versionadded:: 2.4.0
    +        """
    +        def func(s, iterator):
    +            return f(iterator)
    +        jBarrierRdd = self._jrdd.rdd().barrier().toJavaRDD()
    --- End diff --
    
    This will materialize the java RDD, which means the map functions before and after barrier will be executed by 2 python workers.
    
    We should not materialize the java RDD here, but just set a isBarrier flag in the pythhon `PipelinedRDD`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org