You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2023/06/09 04:11:50 UTC

[spark] branch master updated: [SPARK-44010][PYTHON][SS][MINOR] Python StreamingQueryProgress rowsPerSecond type fix

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new efa1eb98ef3 [SPARK-44010][PYTHON][SS][MINOR] Python StreamingQueryProgress rowsPerSecond type fix
efa1eb98ef3 is described below

commit efa1eb98ef302fa4e16801a0d7027769c7ac845d
Author: Wei Liu <we...@databricks.com>
AuthorDate: Fri Jun 9 13:11:34 2023 +0900

    [SPARK-44010][PYTHON][SS][MINOR] Python StreamingQueryProgress rowsPerSecond type fix
    
    ### What changes were proposed in this pull request?
    
    Fix Python StreamingQueryProgress' inputRowsPerSecond and processedRowsPerSecond's return type. They should be float according to `org/apache/spark/sql/streaming/progress.scala`. Also lifted initialization of the two values to initializer, to align with the implementation of `SourceProgress`
    
    ### Why are the changes needed?
    
    Better typed API
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing unit tests
    
    Closes #41522 from WweiL/SPARK-44010-sqp-type-fix.
    
    Authored-by: Wei Liu <we...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 python/pyspark/sql/streaming/listener.py | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/sql/streaming/listener.py b/python/pyspark/sql/streaming/listener.py
index 9b0ae4938a1..33482664a7b 100644
--- a/python/pyspark/sql/streaming/listener.py
+++ b/python/pyspark/sql/streaming/listener.py
@@ -331,6 +331,8 @@ class StreamingQueryProgress:
         self._name: Optional[str] = jprogress.name()
         self._timestamp: str = jprogress.timestamp()
         self._batchId: int = jprogress.batchId()
+        self._inputRowsPerSecond: float = jprogress.inputRowsPerSecond()
+        self._processedRowsPerSecond: float = jprogress.processedRowsPerSecond()
         self._batchDuration: int = jprogress.batchDuration()
         self._durationMs: Dict[str, int] = dict(jprogress.durationMs())
         self._eventTime: Dict[str, str] = dict(jprogress.eventTime())
@@ -453,18 +455,18 @@ class StreamingQueryProgress:
         return self._jprogress.numInputRows()
 
     @property
-    def inputRowsPerSecond(self) -> str:
+    def inputRowsPerSecond(self) -> float:
         """
         The aggregate (across all sources) rate of data arriving.
         """
-        return self._jprogress.inputRowsPerSecond()
+        return self._inputRowsPerSecond
 
     @property
-    def processedRowsPerSecond(self) -> str:
+    def processedRowsPerSecond(self) -> float:
         """
         The aggregate (across all sources) rate at which Spark is processing data..
         """
-        return self._jprogress.processedRowsPerSecond()
+        return self._processedRowsPerSecond
 
     @property
     def json(self) -> str:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org