You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/01/18 20:25:57 UTC

spark git commit: [SPARK-23143][SS][PYTHON] Added python API for setting continuous trigger

Repository: spark
Updated Branches:
  refs/heads/master 9678941f5 -> 2d41f040a


[SPARK-23143][SS][PYTHON] Added python API for setting continuous trigger

## What changes were proposed in this pull request?
Self-explanatory.

## How was this patch tested?
New python tests.

Author: Tathagata Das <ta...@gmail.com>

Closes #20309 from tdas/SPARK-23143.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d41f040
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d41f040
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d41f040

Branch: refs/heads/master
Commit: 2d41f040a34d6483919fd5d491cf90eee5429290
Parents: 9678941
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Jan 18 12:25:52 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Jan 18 12:25:52 2018 -0800

----------------------------------------------------------------------
 python/pyspark/sql/streaming.py | 23 +++++++++++++++++++----
 python/pyspark/sql/tests.py     |  6 ++++++
 2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2d41f040/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 24ae377..e2a97ac 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -786,7 +786,7 @@ class DataStreamWriter(object):
 
     @keyword_only
     @since(2.0)
-    def trigger(self, processingTime=None, once=None):
+    def trigger(self, processingTime=None, once=None, continuous=None):
         """Set the trigger for the stream query. If this is not set it will run the query as fast
         as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
 
@@ -802,23 +802,38 @@ class DataStreamWriter(object):
         >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
         >>> # trigger the query for just once batch of data
         >>> writer = sdf.writeStream.trigger(once=True)
+        >>> # trigger the query for execution every 5 seconds
+        >>> writer = sdf.writeStream.trigger(continuous='5 seconds')
         """
+        params = [processingTime, once, continuous]
+
+        if params.count(None) == 3:
+            raise ValueError('No trigger provided')
+        elif params.count(None) < 2:
+            raise ValueError('Multiple triggers not allowed.')
+
         jTrigger = None
         if processingTime is not None:
-            if once is not None:
-                raise ValueError('Multiple triggers not allowed.')
             if type(processingTime) != str or len(processingTime.strip()) == 0:
                 raise ValueError('Value for processingTime must be a non empty string. Got: %s' %
                                  processingTime)
             interval = processingTime.strip()
             jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime(
                 interval)
+
         elif once is not None:
             if once is not True:
                 raise ValueError('Value for once must be True. Got: %s' % once)
             jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()
+
         else:
-            raise ValueError('No trigger provided')
+            if type(continuous) != str or len(continuous.strip()) == 0:
+                raise ValueError('Value for continuous must be a non empty string. Got: %s' %
+                                 continuous)
+            interval = continuous.strip()
+            jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Continuous(
+                interval)
+
         self._jwrite = self._jwrite.trigger(jTrigger)
         return self
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2d41f040/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index f84aa3d..2548359 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1538,6 +1538,12 @@ class SQLTests(ReusedSQLTestCase):
         except ValueError:
             pass
 
+        # Should not take multiple args
+        try:
+            df.writeStream.trigger(processingTime='5 seconds', continuous='1 second')
+        except ValueError:
+            pass
+
         # Should take only keyword args
         try:
             df.writeStream.trigger('5 seconds')


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org