You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by cloud-fan <gi...@git.apache.org> on 2018/08/10 00:41:59 UTC
[GitHub] spark pull request #22011: [SPARK-24822][PySpark] Python support for barrier...
Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22011#discussion_r209118962
--- Diff: python/pyspark/rdd.py ---
@@ -2429,6 +2449,36 @@ def _wrap_function(sc, func, deserializer, serializer, profiler=None):
sc.pythonVer, broadcast_vars, sc._javaAccumulator)
+class RDDBarrier(object):
+
+ """
+ .. note:: Experimental
+
+ An RDDBarrier turns an RDD into a barrier RDD, which forces Spark to launch tasks of the stage
+ contains this RDD together.
+
+ .. versionadded:: 2.4.0
+ """
+
+ def __init__(self, rdd):
+ self.rdd = rdd
+ self._jrdd = rdd._jrdd
+
+ def mapPartitions(self, f, preservesPartitioning=False):
+ """
+ .. note:: Experimental
+
+ Return a new RDD by applying a function to each partition of this RDD.
+
+ .. versionadded:: 2.4.0
+ """
+ def func(s, iterator):
+ return f(iterator)
+ jBarrierRdd = self._jrdd.rdd().barrier().toJavaRDD()
--- End diff --
This will materialize the java RDD, which means the map functions before and after barrier will be executed by 2 python workers.
We should not materialize the java RDD here, but just set a isBarrier flag in the pythhon `PipelinedRDD`.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org