You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/11/22 01:23:55 UTC

spark git commit: [SPARK-18493] Add missing python APIs: withWatermark and checkpoint to dataframe

Repository: spark
Updated Branches:
  refs/heads/master a2d464770 -> 97a8239a6


[SPARK-18493] Add missing python APIs: withWatermark and checkpoint to dataframe

## What changes were proposed in this pull request?

This PR adds two of the newly added methods of `Dataset`s to Python:
`withWatermark` and `checkpoint`

## How was this patch tested?

Doc tests

Author: Burak Yavuz <br...@gmail.com>

Closes #15921 from brkyvz/py-watermark.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/97a8239a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/97a8239a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/97a8239a

Branch: refs/heads/master
Commit: 97a8239a625df455d2c439f3628a529d6d9413ca
Parents: a2d4647
Author: Burak Yavuz <br...@gmail.com>
Authored: Mon Nov 21 17:24:02 2016 -0800
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Mon Nov 21 17:24:02 2016 -0800

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                 | 57 ++++++++++++++++++--
 .../scala/org/apache/spark/sql/Dataset.scala    | 10 +++-
 2 files changed, 62 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/97a8239a/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 3899890..6fe6226 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -322,6 +322,54 @@ class DataFrame(object):
     def __repr__(self):
         return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
 
+    @since(2.1)
+    def checkpoint(self, eager=True):
+        """Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
+        logical plan of this DataFrame, which is especially useful in iterative algorithms where the
+        plan may grow exponentially. It will be saved to files inside the checkpoint
+        directory set with L{SparkContext.setCheckpointDir()}.
+
+        :param eager: Whether to checkpoint this DataFrame immediately
+
+        .. note:: Experimental
+        """
+        jdf = self._jdf.checkpoint(eager)
+        return DataFrame(jdf, self.sql_ctx)
+
+    @since(2.1)
+    def withWatermark(self, eventTime, delayThreshold):
+        """Defines an event time watermark for this :class:`DataFrame`. A watermark tracks a point
+        in time before which we assume no more late data is going to arrive.
+
+        Spark will use this watermark for several purposes:
+          - To know when a given time window aggregation can be finalized and thus can be emitted
+            when using output modes that do not allow updates.
+
+          - To minimize the amount of state that we need to keep for on-going aggregations.
+
+        The current watermark is computed by looking at the `MAX(eventTime)` seen across
+        all of the partitions in the query minus a user specified `delayThreshold`.  Due to the cost
+        of coordinating this value across partitions, the actual watermark used is only guaranteed
+        to be at least `delayThreshold` behind the actual event time.  In some cases we may still
+        process records that arrive more than `delayThreshold` late.
+
+        :param eventTime: the name of the column that contains the event time of the row.
+        :param delayThreshold: the minimum delay to wait to data to arrive late, relative to the
+            latest record that has been processed in the form of an interval
+            (e.g. "1 minute" or "5 hours").
+
+        .. note:: Experimental
+
+        >>> sdf.select('name', sdf.time.cast('timestamp')).withWatermark('time', '10 minutes')
+        DataFrame[name: string, time: timestamp]
+        """
+        if not eventTime or type(eventTime) is not str:
+            raise TypeError("eventTime should be provided as a string")
+        if not delayThreshold or type(delayThreshold) is not str:
+            raise TypeError("delayThreshold should be provided as a string interval")
+        jdf = self._jdf.withWatermark(eventTime, delayThreshold)
+        return DataFrame(jdf, self.sql_ctx)
+
     @since(1.3)
     def count(self):
         """Returns the number of rows in this :class:`DataFrame`.
@@ -1626,6 +1674,7 @@ def _test():
     from pyspark.context import SparkContext
     from pyspark.sql import Row, SQLContext, SparkSession
     import pyspark.sql.dataframe
+    from pyspark.sql.functions import from_unixtime
     globs = pyspark.sql.dataframe.__dict__.copy()
     sc = SparkContext('local[4]', 'PythonTest')
     globs['sc'] = sc
@@ -1638,9 +1687,11 @@ def _test():
     globs['df3'] = sc.parallelize([Row(name='Alice', age=2),
                                    Row(name='Bob', age=5)]).toDF()
     globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80),
-                                  Row(name='Bob', age=5, height=None),
-                                  Row(name='Tom', age=None, height=None),
-                                  Row(name=None, age=None, height=None)]).toDF()
+                                   Row(name='Bob', age=5, height=None),
+                                   Row(name='Tom', age=None, height=None),
+                                   Row(name=None, age=None, height=None)]).toDF()
+    globs['sdf'] = sc.parallelize([Row(name='Tom', time=1479441846),
+                                   Row(name='Bob', time=1479442946)]).toDF()
 
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.dataframe, globs=globs,

http://git-wip-us.apache.org/repos/asf/spark/blob/97a8239a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 3c75a6a..7ba6ffc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -485,7 +485,10 @@ class Dataset[T] private[sql](
   def isStreaming: Boolean = logicalPlan.isStreaming
 
   /**
-   * Returns a checkpointed version of this Dataset.
+   * Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate
+   * the logical plan of this Dataset, which is especially useful in iterative algorithms where the
+   * plan may grow exponentially. It will be saved to files inside the checkpoint
+   * directory set with `SparkContext#setCheckpointDir`.
    *
    * @group basic
    * @since 2.1.0
@@ -495,7 +498,10 @@ class Dataset[T] private[sql](
   def checkpoint(): Dataset[T] = checkpoint(eager = true)
 
   /**
-   * Returns a checkpointed version of this Dataset.
+   * Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the
+   * logical plan of this Dataset, which is especially useful in iterative algorithms where the
+   * plan may grow exponentially. It will be saved to files inside the checkpoint
+   * directory set with `SparkContext#setCheckpointDir`.
    *
    * @group basic
    * @since 2.1.0


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org