You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/02/17 19:22:51 UTC
spark git commit: [SPARK-5859] [PySpark] [SQL] fix DataFrame Python
API
Repository: spark
Updated Branches:
refs/heads/master c74b07fa9 -> d8adefefc
[SPARK-5859] [PySpark] [SQL] fix DataFrame Python API
1. added explain()
2. add isLocal()
3. do not call show() in __repl__
4. add foreach() and foreachPartition()
5. add distinct()
6. fix functions.col()/column()/lit()
7. fix unit tests in sql/functions.py
8. fix unicode in showString()
Author: Davies Liu <da...@databricks.com>
Closes #4645 from davies/df6 and squashes the following commits:
6b46a2c [Davies Liu] fix DataFrame Python API
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d8adefef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d8adefef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d8adefef
Branch: refs/heads/master
Commit: d8adefefcc2a4af32295440ed1d4917a6968f017
Parents: c74b07f
Author: Davies Liu <da...@databricks.com>
Authored: Tue Feb 17 10:22:48 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Feb 17 10:22:48 2015 -0800
----------------------------------------------------------------------
python/pyspark/sql/dataframe.py | 65 ++++++++++++++++++++++++++++++------
python/pyspark/sql/functions.py | 12 +++----
2 files changed, 59 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/d8adefef/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 28a59e7..8417240 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -238,6 +238,22 @@ class DataFrame(object):
"""
print (self._jdf.schema().treeString())
+ def explain(self, extended=False):
+ """
+ Prints the plans (logical and physical) to the console for
+ debugging purpose.
+
+ If extended is False, only prints the physical plan.
+ """
+ self._jdf.explain(extended)
+
+ def isLocal(self):
+ """
+ Returns True if the `collect` and `take` methods can be run locally
+ (without any Spark executors).
+ """
+ return self._jdf.isLocal()
+
def show(self):
"""
Print the first 20 rows.
@@ -247,14 +263,12 @@ class DataFrame(object):
2 Alice
5 Bob
>>> df
- age name
- 2 Alice
- 5 Bob
+ DataFrame[age: int, name: string]
"""
- print (self)
+ print self._jdf.showString().encode('utf8', 'ignore')
def __repr__(self):
- return self._jdf.showString()
+ return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
def count(self):
"""Return the number of elements in this RDD.
@@ -336,6 +350,8 @@ class DataFrame(object):
"""
Return a new RDD by applying a function to each partition.
+ It's a shorthand for df.rdd.mapPartitions()
+
>>> rdd = sc.parallelize([1, 2, 3, 4], 4)
>>> def f(iterator): yield 1
>>> rdd.mapPartitions(f).sum()
@@ -343,6 +359,31 @@ class DataFrame(object):
"""
return self.rdd.mapPartitions(f, preservesPartitioning)
+ def foreach(self, f):
+ """
+ Applies a function to all rows of this DataFrame.
+
+ It's a shorthand for df.rdd.foreach()
+
+ >>> def f(person):
+ ... print person.name
+ >>> df.foreach(f)
+ """
+ return self.rdd.foreach(f)
+
+ def foreachPartition(self, f):
+ """
+ Applies a function to each partition of this DataFrame.
+
+ It's a shorthand for df.rdd.foreachPartition()
+
+ >>> def f(people):
+ ... for person in people:
+ ... print person.name
+ >>> df.foreachPartition(f)
+ """
+ return self.rdd.foreachPartition(f)
+
def cache(self):
""" Persist with the default storage level (C{MEMORY_ONLY_SER}).
"""
@@ -377,8 +418,13 @@ class DataFrame(object):
""" Return a new :class:`DataFrame` that has exactly `numPartitions`
partitions.
"""
- rdd = self._jdf.repartition(numPartitions, None)
- return DataFrame(rdd, self.sql_ctx)
+ return DataFrame(self._jdf.repartition(numPartitions, None), self.sql_ctx)
+
+ def distinct(self):
+ """
+ Return a new :class:`DataFrame` containing the distinct rows in this DataFrame.
+ """
+ return DataFrame(self._jdf.distinct(), self.sql_ctx)
def sample(self, withReplacement, fraction, seed=None):
"""
@@ -957,10 +1003,7 @@ class Column(DataFrame):
return Column(jc, self.sql_ctx)
def __repr__(self):
- if self._jdf.isComputable():
- return self._jdf.samples()
- else:
- return 'Column<%s>' % self._jdf.toString()
+ return 'Column<%s>' % self._jdf.toString().encode('utf8')
def toPandas(self):
"""
http://git-wip-us.apache.org/repos/asf/spark/blob/d8adefef/python/pyspark/sql/functions.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index d0e0906..fc61162 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -37,7 +37,7 @@ def _create_function(name, doc=""):
""" Create a function for aggregator by name"""
def _(col):
sc = SparkContext._active_spark_context
- jc = getattr(sc._jvm.functions, name)(_to_java_column(col))
+ jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
return Column(jc)
_.__name__ = name
_.__doc__ = doc
@@ -140,6 +140,7 @@ class UserDefinedFunction(object):
def udf(f, returnType=StringType()):
"""Create a user defined function (UDF)
+ >>> from pyspark.sql.types import IntegerType
>>> slen = udf(lambda s: len(s), IntegerType())
>>> df.select(slen(df.name).alias('slen')).collect()
[Row(slen=5), Row(slen=3)]
@@ -151,17 +152,14 @@ def _test():
import doctest
from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext
- import pyspark.sql.dataframe
- globs = pyspark.sql.dataframe.__dict__.copy()
+ import pyspark.sql.functions
+ globs = pyspark.sql.functions.__dict__.copy()
sc = SparkContext('local[4]', 'PythonTest')
globs['sc'] = sc
globs['sqlCtx'] = SQLContext(sc)
globs['df'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF()
- globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF()
- globs['df3'] = sc.parallelize([Row(name='Alice', age=2, height=80),
- Row(name='Bob', age=5, height=85)]).toDF()
(failure_count, test_count) = doctest.testmod(
- pyspark.sql.dataframe, globs=globs,
+ pyspark.sql.functions, globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE)
globs['sc'].stop()
if failure_count:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org