You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bahir.apache.org by lr...@apache.org on 2016/06/10 15:23:57 UTC

[18/50] [abbrv] bahir git commit: [SPARK-12353][STREAMING][PYSPARK] Fix countByValue inconsistent output in Python API

[SPARK-12353][STREAMING][PYSPARK] Fix countByValue inconsistent output in Python API

The semantics of Python countByValue is different from Scala API, it is more like countDistinctValue, so here change to make it consistent with Scala/Java API.

Author: jerryshao <ss...@hortonworks.com>

Closes #10350 from jerryshao/SPARK-12353.


Project: http://git-wip-us.apache.org/repos/asf/bahir/repo
Commit: http://git-wip-us.apache.org/repos/asf/bahir/commit/47e0e0ed
Tree: http://git-wip-us.apache.org/repos/asf/bahir/tree/47e0e0ed
Diff: http://git-wip-us.apache.org/repos/asf/bahir/diff/47e0e0ed

Branch: refs/heads/master
Commit: 47e0e0edd2974140e7ca88924681319487cfbc92
Parents: e2d30ef
Author: jerryshao <ss...@hortonworks.com>
Authored: Mon Dec 28 10:43:23 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Dec 28 10:43:23 2015 +0000

----------------------------------------------------------------------
 streaming-mqtt/python/dstream.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bahir/blob/47e0e0ed/streaming-mqtt/python/dstream.py
----------------------------------------------------------------------
diff --git a/streaming-mqtt/python/dstream.py b/streaming-mqtt/python/dstream.py
index adc2651..86447f5 100644
--- a/streaming-mqtt/python/dstream.py
+++ b/streaming-mqtt/python/dstream.py
@@ -247,7 +247,7 @@ class DStream(object):
         Return a new DStream in which each RDD contains the counts of each
         distinct value in each RDD of this DStream.
         """
-        return self.map(lambda x: (x, None)).reduceByKey(lambda x, y: None).count()
+        return self.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x+y)
 
     def saveAsTextFiles(self, prefix, suffix=None):
         """
@@ -493,7 +493,7 @@ class DStream(object):
         keyed = self.map(lambda x: (x, 1))
         counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
                                              windowDuration, slideDuration, numPartitions)
-        return counted.filter(lambda kv: kv[1] > 0).count()
+        return counted.filter(lambda kv: kv[1] > 0)
 
     def groupByKeyAndWindow(self, windowDuration, slideDuration, numPartitions=None):
         """