You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/09/11 20:50:44 UTC

git commit: [SPARK-3047] [PySpark] add an option to use str in textFileRDD

Repository: spark
Updated Branches:
  refs/heads/master ed1980ffa -> 1ef656ea8


[SPARK-3047] [PySpark] add an option to use str in textFileRDD

str is much efficient than unicode (both CPU and memory), it'e better to use str in textFileRDD. In order to keep compatibility, use unicode by default. (Maybe change it in the future).

use_unicode=True:

daviesliudm:~/work/spark$ time python wc.py
(u'./universe/spark/sql/core/target/java/org/apache/spark/sql/execution/ExplainCommand$.java', 7776)

real	2m8.298s
user	0m0.185s
sys	0m0.064s

use_unicode=False

daviesliudm:~/work/spark$ time python wc.py
('./universe/spark/sql/core/target/java/org/apache/spark/sql/execution/ExplainCommand$.java', 7776)

real	1m26.402s
user	0m0.182s
sys	0m0.062s

We can see that it got 32% improvement!

Author: Davies Liu <da...@gmail.com>

Closes #1951 from davies/unicode and squashes the following commits:

8352d57 [Davies Liu] update version number
a286f2f [Davies Liu] rollback loads()
85246e5 [Davies Liu] add docs for use_unicode
a0295e1 [Davies Liu] add an option to use str in textFile()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ef656ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ef656ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ef656ea

Branch: refs/heads/master
Commit: 1ef656ea85b4b93c7b0f3cf8042b63a0de0901cb
Parents: ed1980f
Author: Davies Liu <da...@gmail.com>
Authored: Thu Sep 11 11:50:36 2014 -0700
Committer: Josh Rosen <jo...@apache.org>
Committed: Thu Sep 11 11:50:36 2014 -0700

----------------------------------------------------------------------
 python/pyspark/context.py     | 16 ++++++++++++----
 python/pyspark/serializers.py | 18 +++++++++++-------
 2 files changed, 23 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1ef656ea/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 84bc0a3..3ab98e2 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -331,12 +331,16 @@ class SparkContext(object):
         return RDD(self._jsc.objectFile(name, minPartitions), self,
                    BatchedSerializer(PickleSerializer()))
 
-    def textFile(self, name, minPartitions=None):
+    def textFile(self, name, minPartitions=None, use_unicode=True):
         """
         Read a text file from HDFS, a local file system (available on all
         nodes), or any Hadoop-supported file system URI, and return it as an
         RDD of Strings.
 
+        If use_unicode is False, the strings will be kept as `str` (encoding
+        as `utf-8`), which is faster and smaller than unicode. (Added in
+        Spark 1.2)
+
         >>> path = os.path.join(tempdir, "sample-text.txt")
         >>> with open(path, "w") as testFile:
         ...    testFile.write("Hello world!")
@@ -346,9 +350,9 @@ class SparkContext(object):
         """
         minPartitions = minPartitions or min(self.defaultParallelism, 2)
         return RDD(self._jsc.textFile(name, minPartitions), self,
-                   UTF8Deserializer())
+                   UTF8Deserializer(use_unicode))
 
-    def wholeTextFiles(self, path, minPartitions=None):
+    def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
         """
         Read a directory of text files from HDFS, a local file system
         (available on all nodes), or any  Hadoop-supported file system
@@ -356,6 +360,10 @@ class SparkContext(object):
         key-value pair, where the key is the path of each file, the
         value is the content of each file.
 
+        If use_unicode is False, the strings will be kept as `str` (encoding
+        as `utf-8`), which is faster and smaller than unicode. (Added in
+        Spark 1.2)
+
         For example, if you have the following files::
 
           hdfs://a-hdfs-path/part-00000
@@ -386,7 +394,7 @@ class SparkContext(object):
         """
         minPartitions = minPartitions or self.defaultMinPartitions
         return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
-                   PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
+                   PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))
 
     def _dictToJavaMap(self, d):
         jm = self._jvm.java.util.HashMap()

http://git-wip-us.apache.org/repos/asf/spark/blob/1ef656ea/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 55e6cf3..7b2710b 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -429,18 +429,22 @@ class UTF8Deserializer(Serializer):
     Deserializes streams written by String.getBytes.
     """
 
+    def __init__(self, use_unicode=False):
+        self.use_unicode = use_unicode
+
     def loads(self, stream):
         length = read_int(stream)
-        return stream.read(length).decode('utf8')
+        s = stream.read(length)
+        return s.decode("utf-8") if self.use_unicode else s
 
     def load_stream(self, stream):
-        while True:
-            try:
+        try:
+            while True:
                 yield self.loads(stream)
-            except struct.error:
-                return
-            except EOFError:
-                return
+        except struct.error:
+            return
+        except EOFError:
+            return
 
 
 def read_long(stream):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org