You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/01/18 08:18:47 UTC

[spark] branch master updated: [SPARK-30539][PYTHON][SQL] Add DataFrame.tail in PySpark

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a6bdea3  [SPARK-30539][PYTHON][SQL] Add DataFrame.tail in PySpark
a6bdea3 is described below

commit a6bdea3ad4a5dde8a68aaf1db0d870ec36040c67
Author: HyukjinKwon <gu...@apache.org>
AuthorDate: Sat Jan 18 00:18:12 2020 -0800

    [SPARK-30539][PYTHON][SQL] Add DataFrame.tail in PySpark
    
    ### What changes were proposed in this pull request?
    
    https://github.com/apache/spark/pull/26809 added `Dataset.tail` API. It should be good to have it in PySpark API as well.
    
    ### Why are the changes needed?
    
    To support consistent APIs.
    
    ### Does this PR introduce any user-facing change?
    
    No. It adds a new API.
    
    ### How was this patch tested?
    
    Manually tested and doctest was added.
    
    Closes #27251 from HyukjinKwon/SPARK-30539.
    
    Authored-by: HyukjinKwon <gu...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 python/pyspark/sql/dataframe.py                          | 16 ++++++++++++++++
 .../src/main/scala/org/apache/spark/sql/Dataset.scala    | 10 ++++++++++
 2 files changed, 26 insertions(+)

diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 669de26..2432b81 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -605,6 +605,22 @@ class DataFrame(PandasMapOpsMixin, PandasConversionMixin):
         """
         return self.limit(num).collect()
 
+    @ignore_unicode_prefix
+    @since(3.0)
+    def tail(self, num):
+        """
+        Returns the last ``num`` rows as a :class:`list` of :class:`Row`.
+
+        Running tail requires moving data into the application's driver process, and doing so with
+        a very large ``num`` can crash the driver process with OutOfMemoryError.
+
+        >>> df.tail(1)
+        [Row(age=5, name=u'Bob')]
+        """
+        with SCCallSiteSync(self._sc):
+            sock_info = self._jdf.tailToPython(num)
+        return list(_load_from_socket(sock_info, BatchedSerializer(PickleSerializer())))
+
     @since(1.3)
     def foreach(self, f):
         """Applies the ``f`` function to all :class:`Row` of this :class:`DataFrame`.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index dd3d0f5..0eb1a26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3331,6 +3331,16 @@ class Dataset[T] private[sql](
     }
   }
 
+  private[sql] def tailToPython(n: Int): Array[Any] = {
+    EvaluatePython.registerPicklers()
+    withAction("tailToPython", queryExecution) { plan =>
+      val toJava: (Any) => Any = EvaluatePython.toJava(_, schema)
+      val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler(
+        plan.executeTail(n).iterator.map(toJava))
+      PythonRDD.serveIterator(iter, "serve-DataFrame")
+    }
+  }
+
   private[sql] def getRowsToPython(
       _numRows: Int,
       truncate: Int): Array[Any] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org