You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/04/20 19:32:02 UTC

spark git commit: [SPARK-14555] First cut of Python API for Structured Streaming

Repository: spark
Updated Branches:
  refs/heads/master 834277884 -> 80bf48f43


[SPARK-14555] First cut of Python API for Structured Streaming

## What changes were proposed in this pull request?

This patch provides a first cut of python APIs for structured streaming. This PR provides the new classes:
 - ContinuousQuery
 - Trigger
 - ProcessingTime
in pyspark under `pyspark.sql.streaming`.

In addition, it contains the new methods added under:
 -  `DataFrameWriter`
     a) `startStream`
     b) `trigger`
     c) `queryName`

 -  `DataFrameReader`
     a) `stream`

 - `DataFrame`
    a) `isStreaming`

This PR doesn't contain all methods exposed for `ContinuousQuery`, for example:
 - `exception`
 - `sourceStatuses`
 - `sinkStatus`

They may be added in a follow up.

This PR also contains some very minor doc fixes in the Scala side.

## How was this patch tested?

Python doc tests

TODO:
 - [ ] verify Python docs look good

Author: Burak Yavuz <br...@gmail.com>
Author: Burak Yavuz <bu...@databricks.com>

Closes #12320 from brkyvz/stream-python.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80bf48f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80bf48f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80bf48f4

Branch: refs/heads/master
Commit: 80bf48f437939ddc3bb82c8c7530c8ae419f8427
Parents: 8342778
Author: Burak Yavuz <br...@gmail.com>
Authored: Wed Apr 20 10:32:01 2016 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Apr 20 10:32:01 2016 -0700

----------------------------------------------------------------------
 python/pyspark/__init__.py                      |  15 +++
 python/pyspark/ml/classification.py             |   1 +
 python/pyspark/ml/clustering.py                 |   2 +-
 python/pyspark/ml/evaluation.py                 |   3 +-
 python/pyspark/ml/feature.py                    |   4 +-
 python/pyspark/ml/pipeline.py                   |   5 +-
 python/pyspark/ml/recommendation.py             |   2 +-
 python/pyspark/ml/regression.py                 |   2 +-
 python/pyspark/ml/tests.py                      |   2 +-
 python/pyspark/ml/tuning.py                     |   4 +-
 python/pyspark/ml/util.py                       |  15 ---
 python/pyspark/sql/dataframe.py                 |  12 ++
 python/pyspark/sql/readwriter.py                | 121 +++++++++++++++++-
 python/pyspark/sql/streaming.py                 | 124 +++++++++++++++++++
 python/pyspark/sql/tests.py                     |  93 ++++++++++++++
 python/test_support/sql/streaming/text-test.txt |   2 +
 .../org/apache/spark/sql/ContinuousQuery.scala  |   4 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |   8 +-
 .../scala/org/apache/spark/sql/Trigger.scala    |  26 ++--
 19 files changed, 397 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 111ebaa..ec16874 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -37,6 +37,7 @@ Public classes:
 
 """
 
+from functools import wraps
 import types
 
 from pyspark.conf import SparkConf
@@ -84,6 +85,20 @@ def copy_func(f, name=None, sinceversion=None, doc=None):
     return fn
 
 
+def keyword_only(func):
+    """
+    A decorator that forces keyword arguments in the wrapped method
+    and saves actual input keyword arguments in `_input_kwargs`.
+    """
+    @wraps(func)
+    def wrapper(*args, **kwargs):
+        if len(args) > 1:
+            raise TypeError("Method %s forces keyword arguments." % func.__name__)
+        wrapper._input_kwargs = kwargs
+        return func(*args, **kwargs)
+    return wrapper
+
+
 # for back compatibility
 from pyspark.sql import SQLContext, HiveContext, Row
 

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/ml/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/classification.py b/python/pyspark/ml/classification.py
index de1321b..cc562d2 100644
--- a/python/pyspark/ml/classification.py
+++ b/python/pyspark/ml/classification.py
@@ -18,6 +18,7 @@
 import operator
 import warnings
 
+from pyspark import since, keyword_only
 from pyspark.ml import Estimator, Model
 from pyspark.ml.param.shared import *
 from pyspark.ml.regression import (

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/ml/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index 05aa2df..ecdaa3a 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from pyspark import since
+from pyspark import since, keyword_only
 from pyspark.ml.util import *
 from pyspark.ml.wrapper import JavaEstimator, JavaModel
 from pyspark.ml.param.shared import *

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/ml/evaluation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/evaluation.py b/python/pyspark/ml/evaluation.py
index 52a3fe8..455795f 100644
--- a/python/pyspark/ml/evaluation.py
+++ b/python/pyspark/ml/evaluation.py
@@ -17,11 +17,10 @@
 
 from abc import abstractmethod, ABCMeta
 
-from pyspark import since
+from pyspark import since, keyword_only
 from pyspark.ml.wrapper import JavaParams
 from pyspark.ml.param import Param, Params, TypeConverters
 from pyspark.ml.param.shared import HasLabelCol, HasPredictionCol, HasRawPredictionCol
-from pyspark.ml.util import keyword_only
 from pyspark.mllib.common import inherit_doc
 
 __all__ = ['Evaluator', 'BinaryClassificationEvaluator', 'RegressionEvaluator',

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/ml/feature.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 4310f15..a1911ce 100644
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -21,10 +21,10 @@ if sys.version > '3':
 
 from py4j.java_collections import JavaArray
 
-from pyspark import since
+from pyspark import since, keyword_only
 from pyspark.rdd import ignore_unicode_prefix
 from pyspark.ml.param.shared import *
-from pyspark.ml.util import keyword_only, JavaMLReadable, JavaMLWritable
+from pyspark.ml.util import JavaMLReadable, JavaMLWritable
 from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaTransformer, _jvm
 from pyspark.mllib.common import inherit_doc
 from pyspark.mllib.linalg import _convert_to_vector

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/ml/pipeline.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/pipeline.py b/python/pyspark/ml/pipeline.py
index e2651ae..146e403 100644
--- a/python/pyspark/ml/pipeline.py
+++ b/python/pyspark/ml/pipeline.py
@@ -20,11 +20,10 @@ import sys
 if sys.version > '3':
     basestring = str
 
-from pyspark import SparkContext
-from pyspark import since
+from pyspark import since, keyword_only, SparkContext
 from pyspark.ml import Estimator, Model, Transformer
 from pyspark.ml.param import Param, Params
-from pyspark.ml.util import keyword_only, JavaMLWriter, JavaMLReader, MLReadable, MLWritable
+from pyspark.ml.util import JavaMLWriter, JavaMLReader, MLReadable, MLWritable
 from pyspark.ml.wrapper import JavaParams
 from pyspark.mllib.common import inherit_doc
 

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/ml/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py
index 9d7f22a..4e42c46 100644
--- a/python/pyspark/ml/recommendation.py
+++ b/python/pyspark/ml/recommendation.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from pyspark import since
+from pyspark import since, keyword_only
 from pyspark.ml.util import *
 from pyspark.ml.wrapper import JavaEstimator, JavaModel
 from pyspark.ml.param.shared import *

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/ml/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py
index 8b68622..da74ab5 100644
--- a/python/pyspark/ml/regression.py
+++ b/python/pyspark/ml/regression.py
@@ -17,7 +17,7 @@
 
 import warnings
 
-from pyspark import since
+from pyspark import since, keyword_only
 from pyspark.ml.param.shared import *
 from pyspark.ml.util import *
 from pyspark.ml.wrapper import JavaEstimator, JavaModel, JavaWrapper

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/ml/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py
index f1bca6e..e954586 100644
--- a/python/pyspark/ml/tests.py
+++ b/python/pyspark/ml/tests.py
@@ -42,6 +42,7 @@ from shutil import rmtree
 import tempfile
 import numpy as np
 
+from pyspark import keyword_only
 from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer
 from pyspark.ml.classification import (
     LogisticRegression, DecisionTreeClassifier, OneVsRest, OneVsRestModel)
@@ -52,7 +53,6 @@ from pyspark.ml.param import Param, Params, TypeConverters
 from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed
 from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor
 from pyspark.ml.tuning import *
-from pyspark.ml.util import keyword_only
 from pyspark.ml.util import MLWritable, MLWriter
 from pyspark.ml.wrapper import JavaParams
 from pyspark.mllib.linalg import Vectors, DenseVector, SparseVector

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/ml/tuning.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/tuning.py b/python/pyspark/ml/tuning.py
index 5ac539e..ef14da4 100644
--- a/python/pyspark/ml/tuning.py
+++ b/python/pyspark/ml/tuning.py
@@ -19,11 +19,11 @@ import itertools
 import numpy as np
 
 from pyspark import SparkContext
-from pyspark import since
+from pyspark import since, keyword_only
 from pyspark.ml import Estimator, Model
 from pyspark.ml.param import Params, Param, TypeConverters
 from pyspark.ml.param.shared import HasSeed
-from pyspark.ml.util import keyword_only, JavaMLWriter, JavaMLReader, MLReadable, MLWritable
+from pyspark.ml.util import JavaMLWriter, JavaMLReader, MLReadable, MLWritable
 from pyspark.ml.wrapper import JavaParams
 from pyspark.sql.functions import rand
 from pyspark.mllib.common import inherit_doc, _py2java

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/ml/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py
index 841bfb4..7003e58 100644
--- a/python/pyspark/ml/util.py
+++ b/python/pyspark/ml/util.py
@@ -17,7 +17,6 @@
 
 import sys
 import uuid
-from functools import wraps
 
 if sys.version > '3':
     basestring = str
@@ -39,20 +38,6 @@ def _jvm():
         raise AttributeError("Cannot load _jvm from SparkContext. Is SparkContext initialized?")
 
 
-def keyword_only(func):
-    """
-    A decorator that forces keyword arguments in the wrapped method
-    and saves actual input keyword arguments in `_input_kwargs`.
-    """
-    @wraps(func)
-    def wrapper(*args, **kwargs):
-        if len(args) > 1:
-            raise TypeError("Method %s forces keyword arguments." % func.__name__)
-        wrapper._input_kwargs = kwargs
-        return func(*args, **kwargs)
-    return wrapper
-
-
 class Identifiable(object):
     """
     Object with a unique ID.

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 328bda6..bbe15f5 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -197,6 +197,18 @@ class DataFrame(object):
         """
         return self._jdf.isLocal()
 
+    @property
+    @since(2.0)
+    def isStreaming(self):
+        """Returns true if this :class:`Dataset` contains one or more sources that continuously
+        return data as it arrives. A :class:`Dataset` that reads data from a streaming source
+        must be executed as a :class:`ContinuousQuery` using the :func:`startStream` method in
+        :class:`DataFrameWriter`.  Methods that return a single answer, (e.g., :func:`count` or
+        :func:`collect`) will throw an :class:`AnalysisException` when there is a streaming
+        source present.
+        """
+        return self._jdf.isStreaming()
+
     @since(1.3)
     def show(self, n=20, truncate=True):
         """Prints the first ``n`` rows to the console.

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/sql/readwriter.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 0cef37e..6c809d1 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -22,7 +22,7 @@ if sys.version >= '3':
 
 from py4j.java_gateway import JavaClass
 
-from pyspark import RDD, since
+from pyspark import RDD, since, keyword_only
 from pyspark.rdd import ignore_unicode_prefix
 from pyspark.sql.column import _to_seq
 from pyspark.sql.types import *
@@ -136,6 +136,32 @@ class DataFrameReader(object):
         else:
             return self._df(self._jreader.load())
 
+    @since(2.0)
+    def stream(self, path=None, format=None, schema=None, **options):
+        """Loads a data stream from a data source and returns it as a :class`DataFrame`.
+
+        :param path: optional string for file-system backed data sources.
+        :param format: optional string for format of the data source. Default to 'parquet'.
+        :param schema: optional :class:`StructType` for the input schema.
+        :param options: all other string options
+
+        >>> df = sqlContext.read.format('text').stream('python/test_support/sql/streaming')
+        >>> df.isStreaming
+        True
+        """
+        if format is not None:
+            self.format(format)
+        if schema is not None:
+            self.schema(schema)
+        self.options(**options)
+        if path is not None:
+            if type(path) != str or len(path.strip()) == 0:
+                raise ValueError("If the path is provided for stream, it needs to be a " +
+                                 "non-empty string. List of paths are not supported.")
+            return self._df(self._jreader.stream(path))
+        else:
+            return self._df(self._jreader.stream())
+
     @since(1.4)
     def json(self, path, schema=None):
         """
@@ -334,6 +360,10 @@ class DataFrameWriter(object):
         self._sqlContext = df.sql_ctx
         self._jwrite = df._jdf.write()
 
+    def _cq(self, jcq):
+        from pyspark.sql.streaming import ContinuousQuery
+        return ContinuousQuery(jcq, self._sqlContext)
+
     @since(1.4)
     def mode(self, saveMode):
         """Specifies the behavior when data or table already exists.
@@ -395,6 +425,44 @@ class DataFrameWriter(object):
         self._jwrite = self._jwrite.partitionBy(_to_seq(self._sqlContext._sc, cols))
         return self
 
+    @since(2.0)
+    def queryName(self, queryName):
+        """Specifies the name of the :class:`ContinuousQuery` that can be started with
+        :func:`startStream`. This name must be unique among all the currently active queries
+        in the associated SQLContext.
+
+        :param queryName: unique name for the query
+
+        >>> writer = sdf.write.queryName('streaming_query')
+        """
+        if not queryName or type(queryName) != str or len(queryName.strip()) == 0:
+            raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName)
+        self._jwrite = self._jwrite.queryName(queryName)
+        return self
+
+    @keyword_only
+    @since(2.0)
+    def trigger(self, processingTime=None):
+        """Set the trigger for the stream query. If this is not set it will run the query as fast
+        as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
+
+        :param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
+
+        >>> # trigger the query for execution every 5 seconds
+        >>> writer = sdf.write.trigger(processingTime='5 seconds')
+        """
+        from pyspark.sql.streaming import ProcessingTime
+        trigger = None
+        if processingTime is not None:
+            if type(processingTime) != str or len(processingTime.strip()) == 0:
+                raise ValueError('The processing time must be a non empty string. Got: %s' %
+                                 processingTime)
+            trigger = ProcessingTime(processingTime)
+        if trigger is None:
+            raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
+        self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._sqlContext))
+        return self
+
     @since(1.4)
     def save(self, path=None, format=None, mode=None, partitionBy=None, **options):
         """Saves the contents of the :class:`DataFrame` to a data source.
@@ -426,6 +494,55 @@ class DataFrameWriter(object):
         else:
             self._jwrite.save(path)
 
+    @ignore_unicode_prefix
+    @since(2.0)
+    def startStream(self, path=None, format=None, partitionBy=None, queryName=None, **options):
+        """Streams the contents of the :class:`DataFrame` to a data source.
+
+        The data source is specified by the ``format`` and a set of ``options``.
+        If ``format`` is not specified, the default data source configured by
+        ``spark.sql.sources.default`` will be used.
+
+        :param path: the path in a Hadoop supported file system
+        :param format: the format used to save
+
+            * ``append``: Append contents of this :class:`DataFrame` to existing data.
+            * ``overwrite``: Overwrite existing data.
+            * ``ignore``: Silently ignore this operation if data already exists.
+            * ``error`` (default case): Throw an exception if data already exists.
+        :param partitionBy: names of partitioning columns
+        :param queryName: unique name for the query
+        :param options: All other string options. You may want to provide a `checkpointLocation`
+            for most streams, however it is not required for a `memory` stream.
+
+        >>> cq = sdf.write.format('memory').queryName('this_query').startStream()
+        >>> cq.isActive
+        True
+        >>> cq.name
+        u'this_query'
+        >>> cq.stop()
+        >>> cq.isActive
+        False
+        >>> cq = sdf.write.trigger(processingTime='5 seconds').startStream(
+        ...     queryName='that_query', format='memory')
+        >>> cq.name
+        u'that_query'
+        >>> cq.isActive
+        True
+        >>> cq.stop()
+        """
+        self.options(**options)
+        if partitionBy is not None:
+            self.partitionBy(partitionBy)
+        if format is not None:
+            self.format(format)
+        if queryName is not None:
+            self.queryName(queryName)
+        if path is None:
+            return self._cq(self._jwrite.startStream())
+        else:
+            return self._cq(self._jwrite.startStream(path))
+
     @since(1.4)
     def insertInto(self, tableName, overwrite=False):
         """Inserts the content of the :class:`DataFrame` to the specified table.
@@ -625,6 +742,8 @@ def _test():
     globs['sqlContext'] = SQLContext(sc)
     globs['hiveContext'] = HiveContext(sc)
     globs['df'] = globs['sqlContext'].read.parquet('python/test_support/sql/parquet_partitioned')
+    globs['sdf'] =\
+        globs['sqlContext'].read.format('text').stream('python/test_support/sql/streaming')
 
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.readwriter, globs=globs,

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
new file mode 100644
index 0000000..5495616
--- /dev/null
+++ b/python/pyspark/sql/streaming.py
@@ -0,0 +1,124 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABCMeta, abstractmethod
+
+from pyspark import since
+
+__all__ = ["ContinuousQuery"]
+
+
+class ContinuousQuery(object):
+    """
+    A handle to a query that is executing continuously in the background as new data arrives.
+    All these methods are thread-safe.
+
+    .. note:: Experimental
+
+    .. versionadded:: 2.0
+    """
+
+    def __init__(self, jcq, sqlContext):
+        self._jcq = jcq
+        self._sqlContext = sqlContext
+
+    @property
+    @since(2.0)
+    def name(self):
+        """The name of the continuous query.
+        """
+        return self._jcq.name()
+
+    @property
+    @since(2.0)
+    def isActive(self):
+        """Whether this continuous query is currently active or not.
+        """
+        return self._jcq.isActive()
+
+    @since(2.0)
+    def awaitTermination(self, timeoutMs=None):
+        """Waits for the termination of `this` query, either by :func:`query.stop()` or by an
+        exception. If the query has terminated with an exception, then the exception will be thrown.
+        If `timeoutMs` is set, it returns whether the query has terminated or not within the
+        `timeoutMs` milliseconds.
+
+        If the query has terminated, then all subsequent calls to this method will either return
+        immediately (if the query was terminated by :func:`stop()`), or throw the exception
+        immediately (if the query has terminated with exception).
+
+        throws ContinuousQueryException, if `this` query has terminated with an exception
+        """
+        if timeoutMs is not None:
+            if type(timeoutMs) != int or timeoutMs < 0:
+                raise ValueError("timeoutMs must be a positive integer. Got %s" % timeoutMs)
+            return self._jcq.awaitTermination(timeoutMs)
+        else:
+            return self._jcq.awaitTermination()
+
+    @since(2.0)
+    def processAllAvailable(self):
+        """Blocks until all available data in the source has been processed an committed to the
+        sink. This method is intended for testing. Note that in the case of continually arriving
+        data, this method may block forever. Additionally, this method is only guaranteed to block
+        until data that has been synchronously appended data to a stream source prior to invocation.
+        (i.e. `getOffset` must immediately reflect the addition).
+        """
+        return self._jcq.processAllAvailable()
+
+    @since(2.0)
+    def stop(self):
+        """Stop this continuous query.
+        """
+        self._jcq.stop()
+
+
+class Trigger(object):
+    """Used to indicate how often results should be produced by a :class:`ContinuousQuery`.
+
+    .. note:: Experimental
+
+    .. versionadded:: 2.0
+    """
+
+    __metaclass__ = ABCMeta
+
+    @abstractmethod
+    def _to_java_trigger(self, sqlContext):
+        """Internal method to construct the trigger on the jvm.
+        """
+        pass
+
+
+class ProcessingTime(Trigger):
+    """A trigger that runs a query periodically based on the processing time. If `interval` is 0,
+    the query will run as fast as possible.
+
+    The interval should be given as a string, e.g. '2 seconds', '5 minutes', ...
+
+    .. note:: Experimental
+
+    .. versionadded:: 2.0
+    """
+
+    def __init__(self, interval):
+        if interval is None or type(interval) != str or len(interval.strip()) == 0:
+            raise ValueError("interval should be a non empty interval string, e.g. '2 seconds'.")
+        self.interval = interval
+
+    def _to_java_trigger(self, sqlContext):
+        return sqlContext._sc._jvm.org.apache.spark.sql.ProcessingTime.create(self.interval)

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index d4c221d..1e864b4 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -879,6 +879,99 @@ class SQLTests(ReusedPySparkTestCase):
 
         shutil.rmtree(tmpPath)
 
+    def test_stream_trigger_takes_keyword_args(self):
+        df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming')
+        try:
+            df.write.trigger('5 seconds')
+            self.fail("Should have thrown an exception")
+        except TypeError:
+            # should throw error
+            pass
+
+    def test_stream_read_options(self):
+        schema = StructType([StructField("data", StringType(), False)])
+        df = self.sqlCtx.read.format('text').option('path', 'python/test_support/sql/streaming')\
+            .schema(schema).stream()
+        self.assertTrue(df.isStreaming)
+        self.assertEqual(df.schema.simpleString(), "struct<data:string>")
+
+    def test_stream_read_options_overwrite(self):
+        bad_schema = StructType([StructField("test", IntegerType(), False)])
+        schema = StructType([StructField("data", StringType(), False)])
+        df = self.sqlCtx.read.format('csv').option('path', 'python/test_support/sql/fake') \
+            .schema(bad_schema).stream(path='python/test_support/sql/streaming',
+                                       schema=schema, format='text')
+        self.assertTrue(df.isStreaming)
+        self.assertEqual(df.schema.simpleString(), "struct<data:string>")
+
+    def test_stream_save_options(self):
+        df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming')
+        tmpPath = tempfile.mkdtemp()
+        shutil.rmtree(tmpPath)
+        self.assertTrue(df.isStreaming)
+        out = os.path.join(tmpPath, 'out')
+        chk = os.path.join(tmpPath, 'chk')
+        cq = df.write.option('checkpointLocation', chk).queryName('this_query')\
+            .format('parquet').option('path', out).startStream()
+        self.assertEqual(cq.name, 'this_query')
+        self.assertTrue(cq.isActive)
+        cq.processAllAvailable()
+        output_files = []
+        for _, _, files in os.walk(out):
+            output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
+        self.assertTrue(len(output_files) > 0)
+        self.assertTrue(len(os.listdir(chk)) > 0)
+        cq.stop()
+        shutil.rmtree(tmpPath)
+
+    def test_stream_save_options_overwrite(self):
+        df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming')
+        tmpPath = tempfile.mkdtemp()
+        shutil.rmtree(tmpPath)
+        self.assertTrue(df.isStreaming)
+        out = os.path.join(tmpPath, 'out')
+        chk = os.path.join(tmpPath, 'chk')
+        fake1 = os.path.join(tmpPath, 'fake1')
+        fake2 = os.path.join(tmpPath, 'fake2')
+        cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \
+            .queryName('fake_query').startStream(path=out, format='parquet', queryName='this_query',
+                                                 checkpointLocation=chk)
+        self.assertEqual(cq.name, 'this_query')
+        self.assertTrue(cq.isActive)
+        cq.processAllAvailable()
+        output_files = []
+        for _, _, files in os.walk(out):
+            output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
+        self.assertTrue(len(output_files) > 0)
+        self.assertTrue(len(os.listdir(chk)) > 0)
+        self.assertFalse(os.path.isdir(fake1))  # should not have been created
+        self.assertFalse(os.path.isdir(fake2))  # should not have been created
+        cq.stop()
+        shutil.rmtree(tmpPath)
+
+    def test_stream_await_termination(self):
+        df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming')
+        tmpPath = tempfile.mkdtemp()
+        shutil.rmtree(tmpPath)
+        self.assertTrue(df.isStreaming)
+        out = os.path.join(tmpPath, 'out')
+        chk = os.path.join(tmpPath, 'chk')
+        cq = df.write.startStream(path=out, format='parquet', queryName='this_query',
+                                  checkpointLocation=chk)
+        self.assertTrue(cq.isActive)
+        try:
+            cq.awaitTermination("hello")
+            self.fail("Expected a value exception")
+        except ValueError:
+            pass
+        now = time.time()
+        res = cq.awaitTermination(2600)  # test should take at least 2 seconds
+        duration = time.time() - now
+        self.assertTrue(duration >= 2)
+        self.assertFalse(res)
+        cq.stop()
+        shutil.rmtree(tmpPath)
+
     def test_help_command(self):
         # Regression test for SPARK-5464
         rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/python/test_support/sql/streaming/text-test.txt
----------------------------------------------------------------------
diff --git a/python/test_support/sql/streaming/text-test.txt b/python/test_support/sql/streaming/text-test.txt
new file mode 100644
index 0000000..ae1e76c
--- /dev/null
+++ b/python/test_support/sql/streaming/text-test.txt
@@ -0,0 +1,2 @@
+hello
+this
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
index d9973b0..953169b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/ContinuousQuery.scala
@@ -56,7 +56,7 @@ trait ContinuousQuery {
    * Returns current status of all the sources.
    * @since 2.0.0
    */
-   def sourceStatuses: Array[SourceStatus]
+  def sourceStatuses: Array[SourceStatus]
 
   /** Returns current status of the sink. */
   def sinkStatus: SinkStatus
@@ -77,7 +77,7 @@ trait ContinuousQuery {
 
   /**
    * Waits for the termination of `this` query, either by `query.stop()` or by an exception.
-   * If the query has terminated with an exception, then the exception will be throw.
+   * If the query has terminated with an exception, then the exception will be thrown.
    * Otherwise, it returns whether the query has terminated or not within the `timeoutMs`
    * milliseconds.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 1deeb8a..0745ef4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -85,18 +85,18 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    *
    * Scala Example:
    * {{{
-   *   def.writer.trigger(ProcessingTime("10 seconds"))
+   *   df.write.trigger(ProcessingTime("10 seconds"))
    *
    *   import scala.concurrent.duration._
-   *   def.writer.trigger(ProcessingTime(10.seconds))
+   *   df.write.trigger(ProcessingTime(10.seconds))
    * }}}
    *
    * Java Example:
    * {{{
-   *   def.writer.trigger(ProcessingTime.create("10 seconds"))
+   *   df.write.trigger(ProcessingTime.create("10 seconds"))
    *
    *   import java.util.concurrent.TimeUnit
-   *   def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
    * }}}
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/80bf48f4/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala
index c4e54b3..256e8a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Trigger.scala
@@ -35,23 +35,23 @@ sealed trait Trigger {}
 
 /**
  * :: Experimental ::
- * A trigger that runs a query periodically based on the processing time. If `intervalMs` is 0,
+ * A trigger that runs a query periodically based on the processing time. If `interval` is 0,
  * the query will run as fast as possible.
  *
  * Scala Example:
  * {{{
- *   def.writer.trigger(ProcessingTime("10 seconds"))
+ *   df.write.trigger(ProcessingTime("10 seconds"))
  *
  *   import scala.concurrent.duration._
- *   def.writer.trigger(ProcessingTime(10.seconds))
+ *   df.write.trigger(ProcessingTime(10.seconds))
  * }}}
  *
  * Java Example:
  * {{{
- *   def.writer.trigger(ProcessingTime.create("10 seconds"))
+ *   df.write.trigger(ProcessingTime.create("10 seconds"))
  *
  *   import java.util.concurrent.TimeUnit
- *   def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+ *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
  * }}}
  */
 @Experimental
@@ -67,11 +67,11 @@ case class ProcessingTime(intervalMs: Long) extends Trigger {
 object ProcessingTime {
 
   /**
-   * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible.
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible.
    *
    * Example:
    * {{{
-   *   def.writer.trigger(ProcessingTime("10 seconds"))
+   *   df.write.trigger(ProcessingTime("10 seconds"))
    * }}}
    */
   def apply(interval: String): ProcessingTime = {
@@ -94,12 +94,12 @@ object ProcessingTime {
   }
 
   /**
-   * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible.
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible.
    *
    * Example:
    * {{{
    *   import scala.concurrent.duration._
-   *   def.writer.trigger(ProcessingTime(10.seconds))
+   *   df.write.trigger(ProcessingTime(10.seconds))
    * }}}
    */
   def apply(interval: Duration): ProcessingTime = {
@@ -107,11 +107,11 @@ object ProcessingTime {
   }
 
   /**
-   * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible.
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible.
    *
    * Example:
    * {{{
-   *   def.writer.trigger(ProcessingTime.create("10 seconds"))
+   *   df.write.trigger(ProcessingTime.create("10 seconds"))
    * }}}
    */
   def create(interval: String): ProcessingTime = {
@@ -119,12 +119,12 @@ object ProcessingTime {
   }
 
   /**
-   * Create a [[ProcessingTime]]. If `intervalMs` is 0, the query will run as fast as possible.
+   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible.
    *
    * Example:
    * {{{
    *   import java.util.concurrent.TimeUnit
-   *   def.writer.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
+   *   df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
    * }}}
    */
   def create(interval: Long, unit: TimeUnit): ProcessingTime = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org