You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2015/12/17 07:10:31 UTC
spark git commit: [SPARK-11904][PYSPARK] reduceByKeyAndWindow does
not require checkpointing when invFunc is None
Repository: spark
Updated Branches:
refs/heads/master 97678edea -> 437583f69
[SPARK-11904][PYSPARK] reduceByKeyAndWindow does not require checkpointing when invFunc is None
when invFunc is None, `reduceByKeyAndWindow(func, None, winsize, slidesize)` is equivalent to
reduceByKey(func).window(winsize, slidesize).reduceByKey(winsize, slidesize)
and no checkpoint is necessary. The corresponding Scala code does exactly that, but Python code always creates a windowed stream with obligatory checkpointing. The patch fixes this.
I do not know how to unit-test this.
Author: David Tolpin <da...@gmail.com>
Closes #9888 from dtolpin/master.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/437583f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/437583f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/437583f6
Branch: refs/heads/master
Commit: 437583f692e30b8dc03b339a34e92595d7b992ba
Parents: 97678ed
Author: David Tolpin <da...@gmail.com>
Authored: Wed Dec 16 22:10:24 2015 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Wed Dec 16 22:10:24 2015 -0800
----------------------------------------------------------------------
python/pyspark/streaming/dstream.py | 45 ++++++++++++++++----------------
1 file changed, 23 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/437583f6/python/pyspark/streaming/dstream.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py
index f61137c..b994a53 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -542,31 +542,32 @@ class DStream(object):
reduced = self.reduceByKey(func, numPartitions)
- def reduceFunc(t, a, b):
- b = b.reduceByKey(func, numPartitions)
- r = a.union(b).reduceByKey(func, numPartitions) if a else b
- if filterFunc:
- r = r.filter(filterFunc)
- return r
-
- def invReduceFunc(t, a, b):
- b = b.reduceByKey(func, numPartitions)
- joined = a.leftOuterJoin(b, numPartitions)
- return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
- if kv[1] is not None else kv[0])
-
- jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
if invFunc:
+ def reduceFunc(t, a, b):
+ b = b.reduceByKey(func, numPartitions)
+ r = a.union(b).reduceByKey(func, numPartitions) if a else b
+ if filterFunc:
+ r = r.filter(filterFunc)
+ return r
+
+ def invReduceFunc(t, a, b):
+ b = b.reduceByKey(func, numPartitions)
+ joined = a.leftOuterJoin(b, numPartitions)
+ return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
+ if kv[1] is not None else kv[0])
+
+ jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer)
+ if slideDuration is None:
+ slideDuration = self._slideDuration
+ dstream = self._sc._jvm.PythonReducedWindowedDStream(
+ reduced._jdstream.dstream(),
+ jreduceFunc, jinvReduceFunc,
+ self._ssc._jduration(windowDuration),
+ self._ssc._jduration(slideDuration))
+ return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
else:
- jinvReduceFunc = None
- if slideDuration is None:
- slideDuration = self._slideDuration
- dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(),
- jreduceFunc, jinvReduceFunc,
- self._ssc._jduration(windowDuration),
- self._ssc._jduration(slideDuration))
- return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
+ return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions)
def updateStateByKey(self, updateFunc, numPartitions=None, initialRDD=None):
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org