You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/03/31 23:31:19 UTC
[spark] branch master updated: [SPARK-38711][PYTHON][SS] Refactor pyspark.sql.streaming module
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 43af6f9 [SPARK-38711][PYTHON][SS] Refactor pyspark.sql.streaming module
43af6f9 is described below
commit 43af6f9c7fe3feec9dd1aa26a9988f217ad4a8ab
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Fri Apr 1 08:29:30 2022 +0900
[SPARK-38711][PYTHON][SS] Refactor pyspark.sql.streaming module
### What changes were proposed in this pull request?
This PR changes `pyspark/sql/streaming.py` module to a package with two files: `pyspark/sql/streaming/query.py` and `pyspark/sql/streaming/readwriter.py`. This is similar with `pyspark/sql/dataframe.py` and `pyspark/sql/readwriter.py`.
There should be no user facing change because I kept the existing import at `pyspark/sql/streaming/__init__.py`.
### Why are the changes needed?
To make the codes easier to read.
### Does this PR introduce _any_ user-facing change?
No, dev-only.
### How was this patch tested?
Existing test cases should cover this change.
Closes #36023 from HyukjinKwon/refactoring-streaming-module.
Authored-by: Hyukjin Kwon <gu...@apache.org>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
.github/labeler.yml | 6 +-
dev/sparktestsupport/modules.py | 3 +-
python/pyspark/sql/streaming/__init__.py | 19 ++
python/pyspark/sql/streaming/query.py | 329 +++++++++++++++++++++
.../sql/{streaming.py => streaming/readwriter.py} | 289 +-----------------
python/setup.py | 1 +
6 files changed, 362 insertions(+), 285 deletions(-)
diff --git a/.github/labeler.yml b/.github/labeler.yml
index bd61902..b3f33cf 100644
--- a/.github/labeler.yml
+++ b/.github/labeler.yml
@@ -84,11 +84,11 @@ SPARK SHELL:
- "repl/**/*"
- "bin/spark-shell*"
SQL:
-#- any: ["**/sql/**/*", "!python/pyspark/sql/avro/**/*", "!python/pyspark/sql/streaming.py", "!python/pyspark/sql/tests/test_streaming.py"]
+#- any: ["**/sql/**/*", "!python/pyspark/sql/avro/**/*", "!python/pyspark/sql/streaming/**/*", "!python/pyspark/sql/tests/test_streaming.py"]
- "**/sql/**/*"
- "common/unsafe/**/*"
#- "!python/pyspark/sql/avro/**/*"
- #- "!python/pyspark/sql/streaming.py"
+ #- "!python/pyspark/sql/streaming/**/*"
#- "!python/pyspark/sql/tests/test_streaming.py"
- "bin/spark-sql*"
- "bin/beeline*"
@@ -124,7 +124,7 @@ MLLIB:
STRUCTURED STREAMING:
- "**/sql/**/streaming/**/*"
- "external/kafka-0-10-sql/**/*"
- - "python/pyspark/sql/streaming.py"
+ - "python/pyspark/sql/streaming/**/*"
- "python/pyspark/sql/tests/test_streaming.py"
- "**/*streaming.R"
PYTHON:
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index aab6305..d8a3508 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -423,7 +423,8 @@ pyspark_sql = Module(
"pyspark.sql.group",
"pyspark.sql.functions",
"pyspark.sql.readwriter",
- "pyspark.sql.streaming",
+ "pyspark.sql.streaming.query",
+ "pyspark.sql.streaming.readwriter",
"pyspark.sql.udf",
"pyspark.sql.window",
"pyspark.sql.avro.functions",
diff --git a/python/pyspark/sql/streaming/__init__.py b/python/pyspark/sql/streaming/__init__.py
new file mode 100644
index 0000000..1c610e3
--- /dev/null
+++ b/python/pyspark/sql/streaming/__init__.py
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark.sql.streaming.query import StreamingQuery, StreamingQueryManager # noqa: F401
+from pyspark.sql.streaming.readwriter import DataStreamReader, DataStreamWriter # noqa: F401
diff --git a/python/pyspark/sql/streaming/query.py b/python/pyspark/sql/streaming/query.py
new file mode 100644
index 0000000..8565435
--- /dev/null
+++ b/python/pyspark/sql/streaming/query.py
@@ -0,0 +1,329 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import json
+import sys
+from typing import Any, Dict, List, Optional
+
+from py4j.java_gateway import JavaObject
+
+from pyspark import since
+from pyspark.sql.utils import StreamingQueryException
+
+
+__all__ = ["StreamingQuery", "StreamingQueryManager"]
+
+
+class StreamingQuery:
+ """
+ A handle to a query that is executing continuously in the background as new data arrives.
+ All these methods are thread-safe.
+
+ .. versionadded:: 2.0.0
+
+ Notes
+ -----
+ This API is evolving.
+ """
+
+ def __init__(self, jsq: JavaObject) -> None:
+ self._jsq = jsq
+
+ @property # type: ignore[misc]
+ @since(2.0)
+ def id(self) -> str:
+ """Returns the unique id of this query that persists across restarts from checkpoint data.
+ That is, this id is generated when a query is started for the first time, and
+ will be the same every time it is restarted from checkpoint data.
+ There can only be one query with the same id active in a Spark cluster.
+ Also see, `runId`.
+ """
+ return self._jsq.id().toString()
+
+ @property # type: ignore[misc]
+ @since(2.1)
+ def runId(self) -> str:
+ """Returns the unique id of this query that does not persist across restarts. That is, every
+ query that is started (or restarted from checkpoint) will have a different runId.
+ """
+ return self._jsq.runId().toString()
+
+ @property # type: ignore[misc]
+ @since(2.0)
+ def name(self) -> str:
+ """Returns the user-specified name of the query, or null if not specified.
+ This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
+ as `dataframe.writeStream.queryName("query").start()`.
+ This name, if set, must be unique across all active queries.
+ """
+ return self._jsq.name()
+
+ @property # type: ignore[misc]
+ @since(2.0)
+ def isActive(self) -> bool:
+ """Whether this streaming query is currently active or not."""
+ return self._jsq.isActive()
+
+ @since(2.0)
+ def awaitTermination(self, timeout: Optional[int] = None) -> Optional[bool]:
+ """Waits for the termination of `this` query, either by :func:`query.stop()` or by an
+ exception. If the query has terminated with an exception, then the exception will be thrown.
+ If `timeout` is set, it returns whether the query has terminated or not within the
+ `timeout` seconds.
+
+ If the query has terminated, then all subsequent calls to this method will either return
+ immediately (if the query was terminated by :func:`stop()`), or throw the exception
+ immediately (if the query has terminated with exception).
+
+ throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
+ """
+ if timeout is not None:
+ if not isinstance(timeout, (int, float)) or timeout < 0:
+ raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
+ return self._jsq.awaitTermination(int(timeout * 1000))
+ else:
+ return self._jsq.awaitTermination()
+
+ @property # type: ignore[misc]
+ @since(2.1)
+ def status(self) -> Dict[str, Any]:
+ """
+ Returns the current status of the query.
+ """
+ return json.loads(self._jsq.status().json())
+
+ @property # type: ignore[misc]
+ @since(2.1)
+ def recentProgress(self) -> List[Dict[str, Any]]:
+ """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
+ The number of progress updates retained for each stream is configured by Spark session
+ configuration `spark.sql.streaming.numRecentProgressUpdates`.
+ """
+ return [json.loads(p.json()) for p in self._jsq.recentProgress()]
+
+ @property
+ def lastProgress(self) -> Optional[Dict[str, Any]]:
+ """
+ Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or
+ None if there were no progress updates
+
+ .. versionadded:: 2.1.0
+
+ Returns
+ -------
+ dict
+ """
+ lastProgress = self._jsq.lastProgress()
+ if lastProgress:
+ return json.loads(lastProgress.json())
+ else:
+ return None
+
+ def processAllAvailable(self) -> None:
+ """Blocks until all available data in the source has been processed and committed to the
+ sink. This method is intended for testing.
+
+ .. versionadded:: 2.0.0
+
+ Notes
+ -----
+ In the case of continually arriving data, this method may block forever.
+ Additionally, this method is only guaranteed to block until data that has been
+ synchronously appended data to a stream source prior to invocation.
+ (i.e. `getOffset` must immediately reflect the addition).
+ """
+ return self._jsq.processAllAvailable()
+
+ @since(2.0)
+ def stop(self) -> None:
+ """Stop this streaming query."""
+ self._jsq.stop()
+
+ def explain(self, extended: bool = False) -> None:
+ """Prints the (logical and physical) plans to the console for debugging purpose.
+
+ .. versionadded:: 2.1.0
+
+ Parameters
+ ----------
+ extended : bool, optional
+ default ``False``. If ``False``, prints only the physical plan.
+
+ Examples
+ --------
+ >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
+ >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
+ >>> sq.explain()
+ == Physical Plan ==
+ ...
+ >>> sq.explain(True)
+ == Parsed Logical Plan ==
+ ...
+ == Analyzed Logical Plan ==
+ ...
+ == Optimized Logical Plan ==
+ ...
+ == Physical Plan ==
+ ...
+ >>> sq.stop()
+ """
+ # Cannot call `_jsq.explain(...)` because it will print in the JVM process.
+ # We should print it in the Python process.
+ print(self._jsq.explainInternal(extended))
+
+ def exception(self) -> Optional[StreamingQueryException]:
+ """
+ .. versionadded:: 2.1.0
+
+ Returns
+ -------
+ :class:`StreamingQueryException`
+ the StreamingQueryException if the query was terminated by an exception, or None.
+ """
+ if self._jsq.exception().isDefined():
+ je = self._jsq.exception().get()
+ msg = je.toString().split(": ", 1)[1] # Drop the Java StreamingQueryException type info
+ stackTrace = "\n\t at ".join(map(lambda x: x.toString(), je.getStackTrace()))
+ return StreamingQueryException(msg, stackTrace, je.getCause())
+ else:
+ return None
+
+
+class StreamingQueryManager:
+ """A class to manage all the :class:`StreamingQuery` StreamingQueries active.
+
+ .. versionadded:: 2.0.0
+
+ Notes
+ -----
+ This API is evolving.
+ """
+
+ def __init__(self, jsqm: JavaObject) -> None:
+ self._jsqm = jsqm
+
+ @property
+ def active(self) -> List[StreamingQuery]:
+ """Returns a list of active queries associated with this SQLContext
+
+ .. versionadded:: 2.0.0
+
+ Examples
+ --------
+ >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
+ >>> sqm = spark.streams
+ >>> # get the list of active streaming queries
+ >>> [q.name for q in sqm.active]
+ ['this_query']
+ >>> sq.stop()
+ """
+ return [StreamingQuery(jsq) for jsq in self._jsqm.active()]
+
+ def get(self, id: str) -> StreamingQuery:
+ """Returns an active query from this SQLContext or throws exception if an active query
+ with this name doesn't exist.
+
+ .. versionadded:: 2.0.0
+
+ Examples
+ --------
+ >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
+ >>> sq.name
+ 'this_query'
+ >>> sq = spark.streams.get(sq.id)
+ >>> sq.isActive
+ True
+ >>> sq = sqlContext.streams.get(sq.id)
+ >>> sq.isActive
+ True
+ >>> sq.stop()
+ """
+ return StreamingQuery(self._jsqm.get(id))
+
+ @since(2.0)
+ def awaitAnyTermination(self, timeout: Optional[int] = None) -> Optional[bool]:
+ """Wait until any of the queries on the associated SQLContext has terminated since the
+ creation of the context, or since :func:`resetTerminated()` was called. If any query was
+ terminated with an exception, then the exception will be thrown.
+ If `timeout` is set, it returns whether the query has terminated or not within the
+ `timeout` seconds.
+
+ If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will
+ either return immediately (if the query was terminated by :func:`query.stop()`),
+ or throw the exception immediately (if the query was terminated with exception). Use
+ :func:`resetTerminated()` to clear past terminations and wait for new terminations.
+
+ In the case where multiple queries have terminated since :func:`resetTermination()`
+ was called, if any query has terminated with exception, then :func:`awaitAnyTermination()`
+ will throw any of the exception. For correctly documenting exceptions across multiple
+ queries, users need to stop all of them after any of them terminates with exception, and
+ then check the `query.exception()` for each query.
+
+ throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
+ """
+ if timeout is not None:
+ if not isinstance(timeout, (int, float)) or timeout < 0:
+ raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
+ return self._jsqm.awaitAnyTermination(int(timeout * 1000))
+ else:
+ return self._jsqm.awaitAnyTermination()
+
+ def resetTerminated(self) -> None:
+ """Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used
+ again to wait for new terminations.
+
+ .. versionadded:: 2.0.0
+
+ Examples
+ --------
+ >>> spark.streams.resetTerminated()
+ """
+ self._jsqm.resetTerminated()
+
+
+def _test() -> None:
+ import doctest
+ import os
+ from pyspark.sql import SparkSession, SQLContext
+ import pyspark.sql.streaming.query
+ from py4j.protocol import Py4JError
+
+ os.chdir(os.environ["SPARK_HOME"])
+
+ globs = pyspark.sql.streaming.query.__dict__.copy()
+ try:
+ spark = SparkSession._getActiveSessionOrCreate()
+ except Py4JError: # noqa: F821
+ spark = SparkSession(sc) # type: ignore[name-defined] # noqa: F821
+
+ globs["spark"] = spark
+ globs["sqlContext"] = SQLContext.getOrCreate(spark.sparkContext)
+ globs["sdf"] = spark.readStream.format("text").load("python/test_support/sql/streaming")
+
+ (failure_count, test_count) = doctest.testmod(
+ pyspark.sql.streaming.query,
+ globs=globs,
+ optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF,
+ )
+ globs["spark"].stop()
+
+ if failure_count:
+ sys.exit(-1)
+
+
+if __name__ == "__main__":
+ _test()
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming/readwriter.py
similarity index 80%
rename from python/pyspark/sql/streaming.py
rename to python/pyspark/sql/streaming/readwriter.py
index 7517a41..e87b600 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming/readwriter.py
@@ -16,291 +16,23 @@
#
import sys
-import json
from collections.abc import Iterator
-from typing import cast, overload, Any, Callable, Dict, List, Optional, TYPE_CHECKING, Union
+from typing import cast, overload, Any, Callable, List, Optional, TYPE_CHECKING, Union
from py4j.java_gateway import java_import, JavaObject
-from pyspark import since
from pyspark.sql.column import _to_seq
from pyspark.sql.readwriter import OptionUtils, to_str
+from pyspark.sql.streaming.query import StreamingQuery
from pyspark.sql.types import Row, StructType, StructField, StringType
-from pyspark.sql.utils import ForeachBatchFunction, StreamingQueryException
+from pyspark.sql.utils import ForeachBatchFunction
if TYPE_CHECKING:
from pyspark.sql.session import SparkSession
from pyspark.sql._typing import SupportsProcess, OptionalPrimitiveType
from pyspark.sql.dataframe import DataFrame
-__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]
-
-
-class StreamingQuery:
- """
- A handle to a query that is executing continuously in the background as new data arrives.
- All these methods are thread-safe.
-
- .. versionadded:: 2.0.0
-
- Notes
- -----
- This API is evolving.
- """
-
- def __init__(self, jsq: JavaObject) -> None:
- self._jsq = jsq
-
- @property # type: ignore[misc]
- @since(2.0)
- def id(self) -> str:
- """Returns the unique id of this query that persists across restarts from checkpoint data.
- That is, this id is generated when a query is started for the first time, and
- will be the same every time it is restarted from checkpoint data.
- There can only be one query with the same id active in a Spark cluster.
- Also see, `runId`.
- """
- return self._jsq.id().toString()
-
- @property # type: ignore[misc]
- @since(2.1)
- def runId(self) -> str:
- """Returns the unique id of this query that does not persist across restarts. That is, every
- query that is started (or restarted from checkpoint) will have a different runId.
- """
- return self._jsq.runId().toString()
-
- @property # type: ignore[misc]
- @since(2.0)
- def name(self) -> str:
- """Returns the user-specified name of the query, or null if not specified.
- This name can be specified in the `org.apache.spark.sql.streaming.DataStreamWriter`
- as `dataframe.writeStream.queryName("query").start()`.
- This name, if set, must be unique across all active queries.
- """
- return self._jsq.name()
-
- @property # type: ignore[misc]
- @since(2.0)
- def isActive(self) -> bool:
- """Whether this streaming query is currently active or not."""
- return self._jsq.isActive()
-
- @since(2.0)
- def awaitTermination(self, timeout: Optional[int] = None) -> Optional[bool]:
- """Waits for the termination of `this` query, either by :func:`query.stop()` or by an
- exception. If the query has terminated with an exception, then the exception will be thrown.
- If `timeout` is set, it returns whether the query has terminated or not within the
- `timeout` seconds.
-
- If the query has terminated, then all subsequent calls to this method will either return
- immediately (if the query was terminated by :func:`stop()`), or throw the exception
- immediately (if the query has terminated with exception).
-
- throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
- """
- if timeout is not None:
- if not isinstance(timeout, (int, float)) or timeout < 0:
- raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
- return self._jsq.awaitTermination(int(timeout * 1000))
- else:
- return self._jsq.awaitTermination()
-
- @property # type: ignore[misc]
- @since(2.1)
- def status(self) -> Dict[str, Any]:
- """
- Returns the current status of the query.
- """
- return json.loads(self._jsq.status().json())
-
- @property # type: ignore[misc]
- @since(2.1)
- def recentProgress(self) -> List[Dict[str, Any]]:
- """Returns an array of the most recent [[StreamingQueryProgress]] updates for this query.
- The number of progress updates retained for each stream is configured by Spark session
- configuration `spark.sql.streaming.numRecentProgressUpdates`.
- """
- return [json.loads(p.json()) for p in self._jsq.recentProgress()]
-
- @property
- def lastProgress(self) -> Optional[Dict[str, Any]]:
- """
- Returns the most recent :class:`StreamingQueryProgress` update of this streaming query or
- None if there were no progress updates
-
- .. versionadded:: 2.1.0
-
- Returns
- -------
- dict
- """
- lastProgress = self._jsq.lastProgress()
- if lastProgress:
- return json.loads(lastProgress.json())
- else:
- return None
-
- def processAllAvailable(self) -> None:
- """Blocks until all available data in the source has been processed and committed to the
- sink. This method is intended for testing.
-
- .. versionadded:: 2.0.0
-
- Notes
- -----
- In the case of continually arriving data, this method may block forever.
- Additionally, this method is only guaranteed to block until data that has been
- synchronously appended data to a stream source prior to invocation.
- (i.e. `getOffset` must immediately reflect the addition).
- """
- return self._jsq.processAllAvailable()
-
- @since(2.0)
- def stop(self) -> None:
- """Stop this streaming query."""
- self._jsq.stop()
-
- def explain(self, extended: bool = False) -> None:
- """Prints the (logical and physical) plans to the console for debugging purpose.
-
- .. versionadded:: 2.1.0
-
- Parameters
- ----------
- extended : bool, optional
- default ``False``. If ``False``, prints only the physical plan.
-
- Examples
- --------
- >>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
- >>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
- >>> sq.explain()
- == Physical Plan ==
- ...
- >>> sq.explain(True)
- == Parsed Logical Plan ==
- ...
- == Analyzed Logical Plan ==
- ...
- == Optimized Logical Plan ==
- ...
- == Physical Plan ==
- ...
- >>> sq.stop()
- """
- # Cannot call `_jsq.explain(...)` because it will print in the JVM process.
- # We should print it in the Python process.
- print(self._jsq.explainInternal(extended))
-
- def exception(self) -> Optional[StreamingQueryException]:
- """
- .. versionadded:: 2.1.0
-
- Returns
- -------
- :class:`StreamingQueryException`
- the StreamingQueryException if the query was terminated by an exception, or None.
- """
- if self._jsq.exception().isDefined():
- je = self._jsq.exception().get()
- msg = je.toString().split(": ", 1)[1] # Drop the Java StreamingQueryException type info
- stackTrace = "\n\t at ".join(map(lambda x: x.toString(), je.getStackTrace()))
- return StreamingQueryException(msg, stackTrace, je.getCause())
- else:
- return None
-
-
-class StreamingQueryManager:
- """A class to manage all the :class:`StreamingQuery` StreamingQueries active.
-
- .. versionadded:: 2.0.0
-
- Notes
- -----
- This API is evolving.
- """
-
- def __init__(self, jsqm: JavaObject) -> None:
- self._jsqm = jsqm
-
- @property
- def active(self) -> List[StreamingQuery]:
- """Returns a list of active queries associated with this SQLContext
-
- .. versionadded:: 2.0.0
-
- Examples
- --------
- >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
- >>> sqm = spark.streams
- >>> # get the list of active streaming queries
- >>> [q.name for q in sqm.active]
- ['this_query']
- >>> sq.stop()
- """
- return [StreamingQuery(jsq) for jsq in self._jsqm.active()]
-
- def get(self, id: str) -> StreamingQuery:
- """Returns an active query from this SQLContext or throws exception if an active query
- with this name doesn't exist.
-
- .. versionadded:: 2.0.0
-
- Examples
- --------
- >>> sq = sdf.writeStream.format('memory').queryName('this_query').start()
- >>> sq.name
- 'this_query'
- >>> sq = spark.streams.get(sq.id)
- >>> sq.isActive
- True
- >>> sq = sqlContext.streams.get(sq.id)
- >>> sq.isActive
- True
- >>> sq.stop()
- """
- return StreamingQuery(self._jsqm.get(id))
-
- @since(2.0)
- def awaitAnyTermination(self, timeout: Optional[int] = None) -> Optional[bool]:
- """Wait until any of the queries on the associated SQLContext has terminated since the
- creation of the context, or since :func:`resetTerminated()` was called. If any query was
- terminated with an exception, then the exception will be thrown.
- If `timeout` is set, it returns whether the query has terminated or not within the
- `timeout` seconds.
-
- If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will
- either return immediately (if the query was terminated by :func:`query.stop()`),
- or throw the exception immediately (if the query was terminated with exception). Use
- :func:`resetTerminated()` to clear past terminations and wait for new terminations.
-
- In the case where multiple queries have terminated since :func:`resetTermination()`
- was called, if any query has terminated with exception, then :func:`awaitAnyTermination()`
- will throw any of the exception. For correctly documenting exceptions across multiple
- queries, users need to stop all of them after any of them terminates with exception, and
- then check the `query.exception()` for each query.
-
- throws :class:`StreamingQueryException`, if `this` query has terminated with an exception
- """
- if timeout is not None:
- if not isinstance(timeout, (int, float)) or timeout < 0:
- raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
- return self._jsqm.awaitAnyTermination(int(timeout * 1000))
- else:
- return self._jsqm.awaitAnyTermination()
-
- def resetTerminated(self) -> None:
- """Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used
- again to wait for new terminations.
-
- .. versionadded:: 2.0.0
-
- Examples
- --------
- >>> spark.streams.resetTerminated()
- """
- self._jsqm.resetTerminated()
+__all__ = ["DataStreamReader", "DataStreamWriter"]
class DataStreamReader(OptionUtils):
@@ -860,8 +592,6 @@ class DataStreamWriter:
self._jwrite = df._jdf.writeStream()
def _sq(self, jsq: JavaObject) -> StreamingQuery:
- from pyspark.sql.streaming import StreamingQuery
-
return StreamingQuery(jsq)
def outputMode(self, outputMode: str) -> "DataStreamWriter":
@@ -1472,28 +1202,25 @@ def _test() -> None:
import doctest
import os
import tempfile
- from pyspark.sql import SparkSession, SQLContext
- import pyspark.sql.streaming
+ from pyspark.sql import SparkSession
+ import pyspark.sql.streaming.readwriter
from py4j.protocol import Py4JError
os.chdir(os.environ["SPARK_HOME"])
- globs = pyspark.sql.streaming.__dict__.copy()
+ globs = pyspark.sql.streaming.readwriter.__dict__.copy()
try:
spark = SparkSession._getActiveSessionOrCreate()
except Py4JError: # noqa: F821
spark = SparkSession(sc) # type: ignore[name-defined] # noqa: F821
globs["tempfile"] = tempfile
- globs["os"] = os
globs["spark"] = spark
- globs["sqlContext"] = SQLContext.getOrCreate(spark.sparkContext)
globs["sdf"] = spark.readStream.format("text").load("python/test_support/sql/streaming")
globs["sdf_schema"] = StructType([StructField("data", StringType(), True)])
- globs["df"] = globs["spark"].readStream.format("text").load("python/test_support/sql/streaming")
(failure_count, test_count) = doctest.testmod(
- pyspark.sql.streaming,
+ pyspark.sql.streaming.readwriter,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF,
)
diff --git a/python/setup.py b/python/setup.py
index a8d16ff..6128b20 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -216,6 +216,7 @@ try:
'pyspark.sql',
'pyspark.sql.avro',
'pyspark.sql.pandas',
+ 'pyspark.sql.streaming',
'pyspark.streaming',
'pyspark.bin',
'pyspark.sbin',
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org