You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/01/29 22:53:15 UTC
spark git commit: [SPARK-13082][PYSPARK] Backport the fix of
'read.json(rdd)' in #10559 to branch-1.6
Repository: spark
Updated Branches:
refs/heads/branch-1.6 96e32db5c -> 84dab7260
[SPARK-13082][PYSPARK] Backport the fix of 'read.json(rdd)' in #10559 to branch-1.6
SPARK-13082 actually fixed by #10559. However, it's a big PR and not backported to 1.6. This PR just backported the fix of 'read.json(rdd)' to branch-1.6.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #10988 from zsxwing/json-rdd.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84dab726
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84dab726
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84dab726
Branch: refs/heads/branch-1.6
Commit: 84dab7260e9a33586ad4002cd826a5ae7c8c4141
Parents: 96e32db
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Fri Jan 29 13:53:11 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Fri Jan 29 13:53:11 2016 -0800
----------------------------------------------------------------------
python/pyspark/sql/readwriter.py | 12 +++++++++++-
python/pyspark/sql/tests.py | 6 +++---
2 files changed, 14 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/84dab726/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index a3d7eca..97da3d9 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -177,7 +177,17 @@ class DataFrameReader(object):
elif type(path) == list:
return self._df(self._jreader.json(self._sqlContext._sc._jvm.PythonUtils.toSeq(path)))
elif isinstance(path, RDD):
- return self._df(self._jreader.json(path._jrdd))
+ def func(iterator):
+ for x in iterator:
+ if not isinstance(x, basestring):
+ x = unicode(x)
+ if isinstance(x, unicode):
+ x = x.encode("utf-8")
+ yield x
+ keyed = path.mapPartitions(func)
+ keyed._bypass_serializer = True
+ jrdd = keyed._jrdd.map(self._sqlContext._jvm.BytesToString())
+ return self._df(self._jreader.json(jrdd))
else:
raise TypeError("path can be only string or RDD")
http://git-wip-us.apache.org/repos/asf/spark/blob/84dab726/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 8be9d92..6b06984 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -326,7 +326,7 @@ class SQLTests(ReusedPySparkTestCase):
def test_basic_functions(self):
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
- df = self.sqlCtx.jsonRDD(rdd)
+ df = self.sqlCtx.read.json(rdd)
df.count()
df.collect()
df.schema
@@ -345,7 +345,7 @@ class SQLTests(ReusedPySparkTestCase):
df.collect()
def test_apply_schema_to_row(self):
- df = self.sqlCtx.jsonRDD(self.sc.parallelize(["""{"a":2}"""]))
+ df = self.sqlCtx.read.json(self.sc.parallelize(["""{"a":2}"""]))
df2 = self.sqlCtx.createDataFrame(df.map(lambda x: x), df.schema)
self.assertEqual(df.collect(), df2.collect())
@@ -821,7 +821,7 @@ class SQLTests(ReusedPySparkTestCase):
def test_help_command(self):
# Regression test for SPARK-5464
rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
- df = self.sqlCtx.jsonRDD(rdd)
+ df = self.sqlCtx.read.json(rdd)
# render_doc() reproduces the help() exception without printing output
pydoc.render_doc(df)
pydoc.render_doc(df.foo)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org