You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/01/29 22:53:15 UTC

spark git commit: [SPARK-13082][PYSPARK] Backport the fix of 'read.json(rdd)' in #10559 to branch-1.6

Repository: spark
Updated Branches:
  refs/heads/branch-1.6 96e32db5c -> 84dab7260


[SPARK-13082][PYSPARK] Backport the fix of 'read.json(rdd)' in #10559 to branch-1.6

SPARK-13082 actually fixed by  #10559. However, it's a big PR and not backported to 1.6. This PR just backported the fix of 'read.json(rdd)' to branch-1.6.

Author: Shixiong Zhu <sh...@databricks.com>

Closes #10988 from zsxwing/json-rdd.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84dab726
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84dab726
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84dab726

Branch: refs/heads/branch-1.6
Commit: 84dab7260e9a33586ad4002cd826a5ae7c8c4141
Parents: 96e32db
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Fri Jan 29 13:53:11 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Jan 29 13:53:11 2016 -0800

----------------------------------------------------------------------
 python/pyspark/sql/readwriter.py | 12 +++++++++++-
 python/pyspark/sql/tests.py      |  6 +++---
 2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/84dab726/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index a3d7eca..97da3d9 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -177,7 +177,17 @@ class DataFrameReader(object):
         elif type(path) == list:
             return self._df(self._jreader.json(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
         elif isinstance(path, RDD):
-            return self._df(self._jreader.json(path._jrdd))
+            def func(iterator):
+                for x in iterator:
+                    if not isinstance(x, basestring):
+                        x = unicode(x)
+                    if isinstance(x, unicode):
+                        x = x.encode("utf-8")
+                    yield x
+            keyed = path.mapPartitions(func)
+            keyed._bypass_serializer = True
+            jrdd = keyed._jrdd.map(self._sqlContext._jvm.BytesToString())
+            return self._df(self._jreader.json(jrdd))
         else:
             raise TypeError("path can be only string or RDD")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/84dab726/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 8be9d92..6b06984 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -326,7 +326,7 @@ class SQLTests(ReusedPySparkTestCase):
 
     def test_basic_functions(self):
         rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
-        df = self.sqlCtx.jsonRDD(rdd)
+        df = self.sqlCtx.read.json(rdd)
         df.count()
         df.collect()
         df.schema
@@ -345,7 +345,7 @@ class SQLTests(ReusedPySparkTestCase):
         df.collect()
 
     def test_apply_schema_to_row(self):
-        df = self.sqlCtx.jsonRDD(self.sc.parallelize(["""{"a":2}"""]))
+        df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""]))
         df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema)
         self.assertEqual(df.collect(), df2.collect())
 
@@ -821,7 +821,7 @@ class SQLTests(ReusedPySparkTestCase):
     def test_help_command(self):
         # Regression test for SPARK-5464
         rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
-        df = self.sqlCtx.jsonRDD(rdd)
+        df = self.sqlCtx.read.json(rdd)
         # render_doc() reproduces the help() exception without printing output
         pydoc.render_doc(df)
         pydoc.render_doc(df.foo)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org