You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/09 04:11:50 UTC
[spark] branch master updated: [SPARK-44010][PYTHON][SS][MINOR] Python StreamingQueryProgress rowsPerSecond type fix
This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new efa1eb98ef3 [SPARK-44010][PYTHON][SS][MINOR] Python StreamingQueryProgress rowsPerSecond type fix
efa1eb98ef3 is described below
commit efa1eb98ef302fa4e16801a0d7027769c7ac845d
Author: Wei Liu <we...@databricks.com>
AuthorDate: Fri Jun 9 13:11:34 2023 +0900
[SPARK-44010][PYTHON][SS][MINOR] Python StreamingQueryProgress rowsPerSecond type fix
### What changes were proposed in this pull request?
Fix Python StreamingQueryProgress' inputRowsPerSecond and processedRowsPerSecond's return type. They should be float according to `org/apache/spark/sql/streaming/progress.scala`. Also lifted initialization of the two values to initializer, to align with the implementation of `SourceProgress`
### Why are the changes needed?
Better typed API
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Existing unit tests
Closes #41522 from WweiL/SPARK-44010-sqp-type-fix.
Authored-by: Wei Liu <we...@databricks.com>
Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
python/pyspark/sql/streaming/listener.py | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/python/pyspark/sql/streaming/listener.py b/python/pyspark/sql/streaming/listener.py
index 9b0ae4938a1..33482664a7b 100644
--- a/python/pyspark/sql/streaming/listener.py
+++ b/python/pyspark/sql/streaming/listener.py
@@ -331,6 +331,8 @@ class StreamingQueryProgress:
self._name: Optional[str] = jprogress.name()
self._timestamp: str = jprogress.timestamp()
self._batchId: int = jprogress.batchId()
+ self._inputRowsPerSecond: float = jprogress.inputRowsPerSecond()
+ self._processedRowsPerSecond: float = jprogress.processedRowsPerSecond()
self._batchDuration: int = jprogress.batchDuration()
self._durationMs: Dict[str, int] = dict(jprogress.durationMs())
self._eventTime: Dict[str, str] = dict(jprogress.eventTime())
@@ -453,18 +455,18 @@ class StreamingQueryProgress:
return self._jprogress.numInputRows()
@property
- def inputRowsPerSecond(self) -> str:
+ def inputRowsPerSecond(self) -> float:
"""
The aggregate (across all sources) rate of data arriving.
"""
- return self._jprogress.inputRowsPerSecond()
+ return self._inputRowsPerSecond
@property
- def processedRowsPerSecond(self) -> str:
+ def processedRowsPerSecond(self) -> float:
"""
The aggregate (across all sources) rate at which Spark is processing data..
"""
- return self._jprogress.processedRowsPerSecond()
+ return self._processedRowsPerSecond
@property
def json(self) -> str:
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org