You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/01/18 08:18:47 UTC
[spark] branch master updated: [SPARK-30539][PYTHON][SQL] Add
DataFrame.tail in PySpark
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new a6bdea3 [SPARK-30539][PYTHON][SQL] Add DataFrame.tail in PySpark
a6bdea3 is described below
commit a6bdea3ad4a5dde8a68aaf1db0d870ec36040c67
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Sat Jan 18 00:18:12 2020 -0800
[SPARK-30539][PYTHON][SQL] Add DataFrame.tail in PySpark
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/26809 added `Dataset.tail` API. It should be good to have it in PySpark API as well.
### Why are the changes needed?
To support consistent APIs.
### Does this PR introduce any user-facing change?
No. It adds a new API.
### How was this patch tested?
Manually tested and doctest was added.
Closes #27251 from HyukjinKwon/SPARK-30539.
Authored-by: HyukjinKwon <gu...@apache.org>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
python/pyspark/sql/dataframe.py | 16 ++++++++++++++++
.../src/main/scala/org/apache/spark/sql/Dataset.scala | 10 ++++++++++
2 files changed, 26 insertions(+)
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 669de26..2432b81 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -605,6 +605,22 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
"""
return self.limit(num).collect()
+ @ignore_unicode_prefix
+ @since(3.0)
+ def tail(self, num):
+ """
+ Returns the last ``num`` rows as a :class:`list` of :class:`Row`.
+
+ Running tail requires moving data into the application's driver process, and doing so with
+ a very large ``num`` can crash the driver process with OutOfMemoryError.
+
+ >>> df.tail(1)
+ [Row(age=5, name=u'Bob')]
+ """
+ with SCCallSiteSync(self._sc):
+ sock_info = self._jdf.tailToPython(num)
+ return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
+
@since(1.3)
def foreach(self, f):
"""Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index dd3d0f5..0eb1a26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3331,6 +3331,16 @@ class Dataset[T] private[sql](
}
}
+ private[sql] def tailToPython(n: Int): Array[Any] = {
+ EvaluatePython.registerPicklers()
+ withAction("tailToPython", queryExecution) { plan =>
+ val toJava: (Any) => Any = EvaluatePython.toJava(_, schema)
+ val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler(
+ plan.executeTail(n).iterator.map(toJava))
+ PythonRDD.serveIterator(iter, "serve-DataFrame")
+ }
+ }
+
private[sql] def getRowsToPython(
_numRows: Int,
truncate: Int): Array[Any] = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org