You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/01/18 20:25:57 UTC
spark git commit: [SPARK-23143][SS][PYTHON] Added python API for
setting continuous trigger
Repository: spark
Updated Branches:
refs/heads/master 9678941f5 -> 2d41f040a
[SPARK-23143][SS][PYTHON] Added python API for setting continuous trigger
## What changes were proposed in this pull request?
Self-explanatory.
## How was this patch tested?
New python tests.
Author: Tathagata Das <ta...@gmail.com>
Closes #20309 from tdas/SPARK-23143.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2d41f040
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2d41f040
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2d41f040
Branch: refs/heads/master
Commit: 2d41f040a34d6483919fd5d491cf90eee5429290
Parents: 9678941
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Jan 18 12:25:52 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Jan 18 12:25:52 2018 -0800
----------------------------------------------------------------------
python/pyspark/sql/streaming.py | 23 +++++++++++++++++++----
python/pyspark/sql/tests.py | 6 ++++++
2 files changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2d41f040/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 24ae377..e2a97ac 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -786,7 +786,7 @@ class DataStreamWriter(object):
@keyword_only
@since(2.0)
- def trigger(self, processingTime=None, once=None):
+ def trigger(self, processingTime=None, once=None, continuous=None):
"""Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
@@ -802,23 +802,38 @@ class DataStreamWriter(object):
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
>>> # trigger the query for just once batch of data
>>> writer = sdf.writeStream.trigger(once=True)
+ >>> # trigger the query for execution every 5 seconds
+ >>> writer = sdf.writeStream.trigger(continuous='5 seconds')
"""
+ params = [processingTime, once, continuous]
+
+ if params.count(None) == 3:
+ raise ValueError('No trigger provided')
+ elif params.count(None) < 2:
+ raise ValueError('Multiple triggers not allowed.')
+
jTrigger = None
if processingTime is not None:
- if once is not None:
- raise ValueError('Multiple triggers not allowed.')
if type(processingTime) != str or len(processingTime.strip()) == 0:
raise ValueError('Value for processingTime must be a non empty string. Got: %s' %
processingTime)
interval = processingTime.strip()
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime(
interval)
+
elif once is not None:
if once is not True:
raise ValueError('Value for once must be True. Got: %s' % once)
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()
+
else:
- raise ValueError('No trigger provided')
+ if type(continuous) != str or len(continuous.strip()) == 0:
+ raise ValueError('Value for continuous must be a non empty string. Got: %s' %
+ continuous)
+ interval = continuous.strip()
+ jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Continuous(
+ interval)
+
self._jwrite = self._jwrite.trigger(jTrigger)
return self
http://git-wip-us.apache.org/repos/asf/spark/blob/2d41f040/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index f84aa3d..2548359 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1538,6 +1538,12 @@ class SQLTests(ReusedSQLTestCase):
except ValueError:
pass
+ # Should not take multiple args
+ try:
+ df.writeStream.trigger(processingTime='5 seconds', continuous='1 second')
+ except ValueError:
+ pass
+
# Should take only keyword args
try:
df.writeStream.trigger('5 seconds')
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org