You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/10 12:40:42 UTC

[GitHub] [spark] HeartSaVioR commented on a diff in pull request #37461: [SPARK-40027][PYTHON][SS][DOCS] Add self-contained examples for pyspark.sql.streaming.readwriter

HeartSaVioR commented on code in PR #37461:
URL: https://github.com/apache/spark/pull/37461#discussion_r942398004


##########
python/pyspark/sql/streaming/readwriter.py:
##########
@@ -1176,15 +1459,28 @@ def toTable(
 
         Examples
         --------
-        >>> sdf.writeStream.format('parquet').queryName('query').toTable('output_table')
-        ... # doctest: +SKIP
-
-        >>> sdf.writeStream.trigger(processingTime='5 seconds').toTable(
-        ...     'output_table',
-        ...     queryName='that_query',
-        ...     outputMode="append",
-        ...     format='parquet',
-        ...     checkpointLocation='/tmp/checkpoint') # doctest: +SKIP
+        Save a data stream to a table.
+
+        >>> import tempfile
+        >>> import time
+        >>> _ = spark.sql("DROP TABLE IF EXISTS my_table2")
+        >>> with tempfile.TemporaryDirectory() as d:
+        ...     # Create a table with Rate source.
+        ...     q = spark.readStream.format("rate").option(
+        ...         "rowsPerSecond", 10).load().writeStream.toTable(
+        ...             "my_table2",
+        ...             queryName='that_query',
+        ...             outputMode="append",
+        ...             format='parquet',
+        ...             checkpointLocation=d)
+        ...     time.sleep(3)
+        ...     q.stop()
+        ...     spark.read.table("my_table2").show()
+        ...     _ = spark.sql("DROP TABLE my_table2")
+        +...---------+-----+

Review Comment:
   nit: here as well?



##########
python/pyspark/sql/streaming/readwriter.py:
##########
@@ -692,6 +912,28 @@ def partitionBy(self, *cols: str) -> "DataStreamWriter":  # type: ignore[misc]
         Notes
         -----
         This API is evolving.
+
+        Examples
+        --------
+        >>> df = spark.readStream.format("rate").load()
+        >>> df.writeStream.partitionBy("value")
+        <pyspark.sql.streaming.readwriter.DataStreamWriter object ...>
+
+        Partition-by timestamp column from Rate source.
+
+        >>> import tempfile
+        >>> import time
+        >>> with tempfile.TemporaryDirectory() as d, tempfile.TemporaryDirectory() as cp:
+        ...     df = spark.readStream.format("rate").option("rowsPerSecond", 10).load()
+        ...     q = df.writeStream.partitionBy(
+        ...         "timestamp").format("parquet").option("checkpointLocation", cp).start(d)
+        ...     time.sleep(5)
+        ...     q.stop()
+        ...     spark.read.schema(df.schema).parquet(d).show()
+        +...---------+-----+

Review Comment:
   nit: same here



##########
python/pyspark/sql/streaming/readwriter.py:
##########
@@ -638,7 +808,25 @@ def format(self, source: str) -> "DataStreamWriter":
 
         Examples
         --------
-        >>> writer = sdf.writeStream.format('json')
+        >>> df = spark.readStream.format("rate").load()
+        >>> df.writeStream.format("text")
+        <pyspark.sql.streaming.readwriter.DataStreamWriter object ...>
+
+        This API allows to configure the source to write. The example below writes a CSV
+        file from Rate source in a streaming manner.
+
+        >>> import tempfile
+        >>> import time
+        >>> with tempfile.TemporaryDirectory() as d, tempfile.TemporaryDirectory() as cp:
+        ...     df = spark.readStream.format("rate").load()
+        ...     q = df.writeStream.format("csv").option("checkpointLocation", cp).start(d)
+        ...     time.sleep(5)
+        ...     q.stop()
+        ...     spark.read.schema("timestamp TIMESTAMP, value STRING").csv(d).show()
+        +...---------+-----+

Review Comment:
   nit: is it broken?



##########
python/pyspark/sql/streaming/readwriter.py:
##########
@@ -775,14 +1022,27 @@ def trigger(
 
         Examples
         --------
-        >>> # trigger the query for execution every 5 seconds
-        >>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
-        >>> # trigger the query for just once batch of data
-        >>> writer = sdf.writeStream.trigger(once=True)
-        >>> # trigger the query for execution every 5 seconds
-        >>> writer = sdf.writeStream.trigger(continuous='5 seconds')
-        >>> # trigger the query for reading all available data with multiple batches
-        >>> writer = sdf.writeStream.trigger(availableNow=True)
+        >>> df = spark.readStream.format("rate").load()
+
+        Trigger the query for execution every 5 seconds
+
+        >>> df.writeStream.trigger(processingTime='5 seconds')
+        <pyspark.sql.streaming.readwriter.DataStreamWriter object ...>
+
+        Trigger the query for just once batch of data
+
+        >>> df.writeStream.trigger(once=True)

Review Comment:
   Yeah, better not to guide it, or explicitly mention about deprecation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org