You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/27 06:08:33 UTC

[GitHub] [spark] chaoqin-li1123 opened a new pull request, #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

chaoqin-li1123 opened a new pull request, #38013:
URL: https://github.com/apache/spark/pull/38013

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   An example for applyInPandasWithState usage. This example split lines into words, group by words as key and use the state per key to track session of each key.
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   To demonstrate the usage of applyInPandasWithState
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   This is an example that can be run manually.
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38013:
URL: https://github.com/apache/spark/pull/38013#issuecomment-1260062541

   @chaoqin-li1123 
   https://github.com/chaoqin-li1123/spark/actions/runs/3138156803/jobs/5097193712
   
   Linter is still complaining. Could you take a look?
   
   You can install necessary python dependency requirements from `./dev/requirements.txt` and run `./dev/lint-python`, and ensure everything passes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r983455212


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],

Review Comment:
   Can we cast this and show as a timestamp in the console? Numeric timestamp values look a bit difficult to read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r981964990


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+from typing import Iterable, Any
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+    session_fields = [
+        StructField("sessionId", StringType()),
+        StructField("count", LongType()),
+        StructField("start", LongType()),
+        StructField("end", LongType()),
+    ]
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))
+                end = max(end, max(pdf["timestamp"]))
+                count = count + len(pdf)
+            if state.exists:
+                old_session = state.get
+                count = count + old_session[0]
+                start = old_session[1]
+                end = max(end, old_session[2])
+            state.update((count, start, end))
+            state.setTimeoutDuration(10000)
+            yield pd.DataFrame()
+
+    # Group the data by window and word and compute the count of each group
+    sessions = events.groupBy(events["sessionId"]).applyInPandasWithState(
+        func,
+        StructType(session_fields),

Review Comment:
   Since this is an example (for newbies), let's make it verbose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] chaoqin-li1123 commented on pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
chaoqin-li1123 commented on PR #38013:
URL: https://github.com/apache/spark/pull/38013#issuecomment-1259024291

   @HeartSaVioR The applyInPandasWithState session window example.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r983453988


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get

Review Comment:
   I would extract the key like:
   
   ```python
   (session_id,) = key
   ```
   
   or
   
   ```python
   (word,) = key
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] chaoqin-li1123 commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r981960624


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+from typing import Iterable, Any
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")

Review Comment:
   Removed.



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+from typing import Iterable, Any

Review Comment:
   Thanks, import order changed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #38013:
URL: https://github.com/apache/spark/pull/38013#issuecomment-1262187962

   @chaoqin-li1123, I happened to nitpick some. Would you mind creating a followup PR with reusing the same JIRA?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r981724244


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,128 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    session_type = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(key, pdf_iter, state):
+        if state.hasTimedOut:
+            session_id, count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [session_id],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))
+                end = max(end, max(pdf["timestamp"]))
+                count = count + len(pdf)
+            if state.exists:
+                old_session = state.get
+                count = count + old_session[1]
+                start = old_session[2]
+                end = max(end, old_session[3])
+            state.update((key[0], count, start, end))

Review Comment:
   nit: you can always get key from parameter of user function, so adding key to state is redundant. I know it's more about porting the Scala example - it was also redundant.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r983456790


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))

Review Comment:
   I would use pandas API here instead of built-in Python function to show users that we can use pandas. e.g.) `int(min(start, pdf["timestamp"].min()))` and `int(max(start, pdf["timestamp"].max()))`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] chaoqin-li1123 commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r981833418


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,128 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    session_type = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(key, pdf_iter, state):
+        if state.hasTimedOut:
+            session_id, count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [session_id],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))
+                end = max(end, max(pdf["timestamp"]))
+                count = count + len(pdf)
+            if state.exists:
+                old_session = state.get
+                count = count + old_session[1]
+                start = old_session[2]
+                end = max(end, old_session[3])
+            state.update((key[0], count, start, end))

Review Comment:
   Thanks, it makes sense to omit the key in the state. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] chaoqin-li1123 commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r981605902


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .option('includeTimestamp', 'true')\
+        .load()
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, ' ')).alias('sessionId'),
+        lines.timestamp.cast("long")
+    )
+
+    session_type = StructType(
+    [StructField("sessionId", StringType()), StructField("count", LongType()),
+    StructField("start", LongType()), StructField("end", LongType())]
+    )
+
+    def func(key, pdf_iter, state):
+        if state.hasTimedOut:
+            finished_session = state.get
+            state.remove()
+            yield pd.DataFrame({"sessionId": [finished_session[0]], "count": [finished_session[1]], "start": [finished_session[2]], "end": [finished_session[3]]})

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38013:
URL: https://github.com/apache/spark/pull/38013#issuecomment-1259090417

   Thanks for the contribution @chaoqin-li1123 ! Looks like python linter is complaining - could you please look into this?
   https://github.com/chaoqin-li1123/spark/actions/runs/3133327292/jobs/5086593368
   
   Also would be good to explicitly document how you ran the example in the section of `How was this patch tested?`.
   
   Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r983459557


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:

Review Comment:
   ditto. output should better be `Iterator` in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r983455641


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState

Review Comment:
   BTW, I would name `pdf_iter` -> `pdfs`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] chaoqin-li1123 commented on pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
chaoqin-li1123 commented on PR #38013:
URL: https://github.com/apache/spark/pull/38013#issuecomment-1259918456

   > One tip, unlike Scala/Java code, we can leverage `dev/reformat-python` to reformat python code automatically.
   
   It seems  that `dev/reformat-python` skip this file, I need to run 'python3 -m black examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py'   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] chaoqin-li1123 commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r981605599


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .option('includeTimestamp', 'true')\
+        .load()
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, ' ')).alias('sessionId'),
+        lines.timestamp.cast("long")
+    )
+
+    session_type = StructType(
+    [StructField("sessionId", StringType()), StructField("count", LongType()),
+    StructField("start", LongType()), StructField("end", LongType())]
+    )
+
+    def func(key, pdf_iter, state):
+        if state.hasTimedOut:
+            finished_session = state.get
+            state.remove()
+            yield pd.DataFrame({"sessionId": [finished_session[0]], "count": [finished_session[1]], "start": [finished_session[2]], "end": [finished_session[3]]})
+        else:
+            pdfs = list(pdf_iter)

Review Comment:
   Thanks, refactored.



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .option('includeTimestamp', 'true')\
+        .load()
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, ' ')).alias('sessionId'),
+        lines.timestamp.cast("long")
+    )
+
+    session_type = StructType(
+    [StructField("sessionId", StringType()), StructField("count", LongType()),
+    StructField("start", LongType()), StructField("end", LongType())]
+    )
+
+    def func(key, pdf_iter, state):
+        if state.hasTimedOut:
+            finished_session = state.get
+            state.remove()
+            yield pd.DataFrame({"sessionId": [finished_session[0]], "count": [finished_session[1]], "start": [finished_session[2]], "end": [finished_session[3]]})
+        else:
+            pdfs = list(pdf_iter)
+            count = sum(len(pdf) for pdf in pdfs)
+            start = min(min(pdf['timestamp']) for pdf in pdfs)
+            end = max(max(pdf['timestamp']) for pdf in pdfs)
+            if state.exists:
+                old_session = state.get
+                count = count + old_session[1]
+                start = old_session[2]
+                end = max(end, old_session[3])
+            state.update((key[0], count, start, end))
+            state.setTimeoutDuration(30000)
+            yield pd.DataFrame({"sessionId": [], "count": [], "start": [], "end": []})

Review Comment:
   Thanks, cleaned up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r984199174


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState

Review Comment:
   Ohh, okay NVM. Let's leave it as is for now 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r980856329


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .option('includeTimestamp', 'true')\
+        .load()
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, ' ')).alias('sessionId'),
+        lines.timestamp.cast("long")
+    )
+
+    session_type = StructType(
+    [StructField("sessionId", StringType()), StructField("count", LongType()),
+    StructField("start", LongType()), StructField("end", LongType())]
+    )
+
+    def func(key, pdf_iter, state):
+        if state.hasTimedOut:
+            finished_session = state.get
+            state.remove()
+            yield pd.DataFrame({"sessionId": [finished_session[0]], "count": [finished_session[1]], "start": [finished_session[2]], "end": [finished_session[3]]})
+        else:
+            pdfs = list(pdf_iter)
+            count = sum(len(pdf) for pdf in pdfs)
+            start = min(min(pdf['timestamp']) for pdf in pdfs)
+            end = max(max(pdf['timestamp']) for pdf in pdfs)
+            if state.exists:
+                old_session = state.get
+                count = count + old_session[1]
+                start = old_session[2]
+                end = max(end, old_session[3])
+            state.update((key[0], count, start, end))
+            state.setTimeoutDuration(30000)
+            yield pd.DataFrame({"sessionId": [], "count": [], "start": [], "end": []})

Review Comment:
   nit: pandas DataFrame with empty column list would work as well.



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .option('includeTimestamp', 'true')\
+        .load()
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, ' ')).alias('sessionId'),
+        lines.timestamp.cast("long")
+    )
+
+    session_type = StructType(
+    [StructField("sessionId", StringType()), StructField("count", LongType()),
+    StructField("start", LongType()), StructField("end", LongType())]
+    )
+
+    def func(key, pdf_iter, state):
+        if state.hasTimedOut:
+            finished_session = state.get
+            state.remove()
+            yield pd.DataFrame({"sessionId": [finished_session[0]], "count": [finished_session[1]], "start": [finished_session[2]], "end": [finished_session[3]]})
+        else:
+            pdfs = list(pdf_iter)
+            count = sum(len(pdf) for pdf in pdfs)
+            start = min(min(pdf['timestamp']) for pdf in pdfs)
+            end = max(max(pdf['timestamp']) for pdf in pdfs)
+            if state.exists:
+                old_session = state.get
+                count = count + old_session[1]
+                start = old_session[2]
+                end = max(end, old_session[3])
+            state.update((key[0], count, start, end))
+            state.setTimeoutDuration(30000)
+            yield pd.DataFrame({"sessionId": [], "count": [], "start": [], "end": []})
+
+    # Group the data by window and word and compute the count of each group
+    sessions = events.groupBy(events["sessionId"]).applyInPandasWithState(func, session_type, session_type, "Update", GroupStateTimeout.ProcessingTimeTimeout)
+
+    # Start running the query that prints the windowed word counts to the console
+    query = sessions\

Review Comment:
   Same here.



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\

Review Comment:
   Same here.



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\

Review Comment:
   Looks like Python community encourages to use () (like you use {} for your convenient language) and not use \ explicitly to do line break. Something along the line:
   
   ```
   spark = (
       SparkSession
           .builder
           .appName("StructuredNetworkWordCountSessionWindow")
           .getOrCreate()
   )
   ```
   
   If there is indentation issue, we can leverage formatter for python code.



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .option('includeTimestamp', 'true')\
+        .load()
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, ' ')).alias('sessionId'),
+        lines.timestamp.cast("long")
+    )
+
+    session_type = StructType(
+    [StructField("sessionId", StringType()), StructField("count", LongType()),

Review Comment:
   probably not a good indentation - maybe python linter won't be happy.



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .option('includeTimestamp', 'true')\
+        .load()
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, ' ')).alias('sessionId'),
+        lines.timestamp.cast("long")
+    )
+
+    session_type = StructType(
+    [StructField("sessionId", StringType()), StructField("count", LongType()),
+    StructField("start", LongType()), StructField("end", LongType())]
+    )
+
+    def func(key, pdf_iter, state):
+        if state.hasTimedOut:
+            finished_session = state.get
+            state.remove()
+            yield pd.DataFrame({"sessionId": [finished_session[0]], "count": [finished_session[1]], "start": [finished_session[2]], "end": [finished_session[3]]})
+        else:
+            pdfs = list(pdf_iter)

Review Comment:
   I know we won't encounter memory issue even we materialize all elements in iterator, but would be good to follow the traditional practice of using for-each. I don't think this will complicate the logic too much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38013:
URL: https://github.com/apache/spark/pull/38013#issuecomment-1259259980

   One tip, unlike Scala/Java code, we can leverage `dev/reformat-python` to reformat python code automatically.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r980870512


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\

Review Comment:
   Same here. See https://github.com/apache/spark/pull/38013#discussion_r980870370



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r983459290


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))
+                end = max(end, max(pdf["timestamp"]))
+                count = count + len(pdf)
+            if state.exists:
+                old_session = state.get
+                count = count + old_session[0]
+                start = old_session[1]
+                end = max(end, old_session[2])
+            state.update((count, start, end))
+            state.setTimeoutDuration(10000)
+            yield pd.DataFrame()
+
+    # Group the data by window and word and compute the count of each group
+    sessions = events.groupBy(events["sessionId"]).applyInPandasWithState(
+        func,
+        session_schema,
+        session_state_schema,

Review Comment:
   FYI, you can just pass a DDL formatted string schema too. e.g., `"sessionId STRING, count LONG, start LONG, end LONG"` which will be shorter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState
URL: https://github.com/apache/spark/pull/38013


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r981863988


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+from typing import Iterable, Any

Review Comment:
   Import order should be
   ```
   builtin
   
   thridparty
   
   pyspark
   ```



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.

Review Comment:
   ```suggestion
    Split lines into words, group by words and use the state per key to track session of each key.
   ```



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+from typing import Iterable, Any
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows

Review Comment:
   Can we comment that what becomes the `sessionId`?



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+from typing import Iterable, Any
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+    session_fields = [
+        StructField("sessionId", StringType()),
+        StructField("count", LongType()),
+        StructField("start", LongType()),
+        StructField("end", LongType()),
+    ]
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))
+                end = max(end, max(pdf["timestamp"]))
+                count = count + len(pdf)
+            if state.exists:
+                old_session = state.get
+                count = count + old_session[0]
+                start = old_session[1]
+                end = max(end, old_session[2])
+            state.update((count, start, end))
+            state.setTimeoutDuration(10000)
+            yield pd.DataFrame()
+
+    # Group the data by window and word and compute the count of each group
+    sessions = events.groupBy(events["sessionId"]).applyInPandasWithState(
+        func,
+        StructType(session_fields),

Review Comment:
   Can we have two separate variables for these schemas?



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+from typing import Iterable, Any
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")

Review Comment:
   Seems like we don;t do this in other examples.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r983453630


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState

Review Comment:
   Sorry for post-hoc reviews. 
   
   Let's change `Iterable` to `Iterator`. This can only be an iterator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] chaoqin-li1123 commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r984197172


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState

Review Comment:
   The linter force me to annotate the type here as Iterable instead of Iterator, maybe we should do some investigation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r981249089


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .option('includeTimestamp', 'true')\
+        .load()
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, ' ')).alias('sessionId'),
+        lines.timestamp.cast("long")
+    )
+
+    session_type = StructType(
+    [StructField("sessionId", StringType()), StructField("count", LongType()),
+    StructField("start", LongType()), StructField("end", LongType())]
+    )
+
+    def func(key, pdf_iter, state):
+        if state.hasTimedOut:
+            finished_session = state.get
+            state.remove()
+            yield pd.DataFrame({"sessionId": [finished_session[0]], "count": [finished_session[1]], "start": [finished_session[2]], "end": [finished_session[3]]})
+        else:
+            pdfs = list(pdf_iter)

Review Comment:
   Yes, this is an example so should better to have a best practice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r983460896


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+

Review Comment:
   Can we add a bit of more explanation? e.g.) the timeout is set as 10 seconds so each session window lasts until there is no more input to the key for 10 seconds.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] chaoqin-li1123 commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r981978112


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+from typing import Iterable, Any
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+    session_fields = [
+        StructField("sessionId", StringType()),
+        StructField("count", LongType()),
+        StructField("start", LongType()),
+        StructField("end", LongType()),
+    ]
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))
+                end = max(end, max(pdf["timestamp"]))
+                count = count + len(pdf)
+            if state.exists:
+                old_session = state.get
+                count = count + old_session[0]
+                start = old_session[1]
+                end = max(end, old_session[2])
+            state.update((count, start, end))
+            state.setTimeoutDuration(10000)
+            yield pd.DataFrame()
+
+    # Group the data by window and word and compute the count of each group
+    sessions = events.groupBy(events["sessionId"]).applyInPandasWithState(
+        func,
+        StructType(session_fields),

Review Comment:
   Makes sense, I just make the code more readable, PTAL.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on PR #38013:
URL: https://github.com/apache/spark/pull/38013#issuecomment-1260507874

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] leandrohmvieira-db commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
leandrohmvieira-db commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r1029725786


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))

Review Comment:
   Just tried a similar scenario, this causes data type errors downstream:
   
   ```executor driver: net.razorvine.pickle.PickleException (expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.)```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] leandrohmvieira-db commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
leandrohmvieira-db commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r1029725786


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))

Review Comment:
   Just tried this, this causes data type errors downstream:
   
   ```executor driver: net.razorvine.pickle.PickleException (expected zero arguments for construction of ClassDict (for numpy.dtype). This happens when an unsupported/unregistered class is being unpickled that requires construction arguments. Fix it by registering a custom IObjectConstructor for this class.)```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r980858543


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .option('includeTimestamp', 'true')\
+        .load()
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, ' ')).alias('sessionId'),
+        lines.timestamp.cast("long")
+    )
+
+    session_type = StructType(
+    [StructField("sessionId", StringType()), StructField("count", LongType()),
+    StructField("start", LongType()), StructField("end", LongType())]
+    )
+
+    def func(key, pdf_iter, state):
+        if state.hasTimedOut:
+            finished_session = state.get
+            state.remove()
+            yield pd.DataFrame({"sessionId": [finished_session[0]], "count": [finished_session[1]], "start": [finished_session[2]], "end": [finished_session[3]]})
+        else:
+            pdfs = list(pdf_iter)
+            count = sum(len(pdf) for pdf in pdfs)
+            start = min(min(pdf['timestamp']) for pdf in pdfs)
+            end = max(max(pdf['timestamp']) for pdf in pdfs)
+            if state.exists:
+                old_session = state.get
+                count = count + old_session[1]
+                start = old_session[2]
+                end = max(end, old_session[3])
+            state.update((key[0], count, start, end))
+            state.setTimeoutDuration(30000)
+            yield pd.DataFrame({"sessionId": [], "count": [], "start": [], "end": []})
+
+    # Group the data by window and word and compute the count of each group
+    sessions = events.groupBy(events["sessionId"]).applyInPandasWithState(func, session_type, session_type, "Update", GroupStateTimeout.ProcessingTimeTimeout)
+
+    # Start running the query that prints the windowed word counts to the console
+    query = sessions\

Review Comment:
   (Comment order is messed up, see https://github.com/apache/spark/pull/38013#discussion_r980870370)
   
   Same here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] chaoqin-li1123 commented on pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
chaoqin-li1123 commented on PR #38013:
URL: https://github.com/apache/spark/pull/38013#issuecomment-1263089245

   No problem, I will create a new PR with the suggested change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r983457969


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))
+                end = max(end, max(pdf["timestamp"]))
+                count = count + len(pdf)
+            if state.exists:
+                old_session = state.get

Review Comment:
   I would do:
   
   ```
   (old_count, start, old_end) = state.get
   count = count + old_count
   end = max(end, old_end)
   ```



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))
+                end = max(end, max(pdf["timestamp"]))
+                count = count + len(pdf)
+            if state.exists:
+                old_session = state.get

Review Comment:
   I would do:
   
   ```python
   (old_count, start, old_end) = state.get
   count = count + old_count
   end = max(end, old_end)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] chaoqin-li1123 commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r981960722


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+from typing import Iterable, Any
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+    session_fields = [
+        StructField("sessionId", StringType()),
+        StructField("count", LongType()),
+        StructField("start", LongType()),
+        StructField("end", LongType()),
+    ]
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))
+                end = max(end, max(pdf["timestamp"]))
+                count = count + len(pdf)
+            if state.exists:
+                old_session = state.get
+                count = count + old_session[0]
+                start = old_session[1]
+                end = max(end, old_session[2])
+            state.update((count, start, end))
+            state.setTimeoutDuration(10000)
+            yield pd.DataFrame()
+
+    # Group the data by window and word and compute the count of each group
+    sessions = events.groupBy(events["sessionId"]).applyInPandasWithState(
+        func,
+        StructType(session_fields),

Review Comment:
   I prefer to keep it because all fields are the same except the first, make the code less verbose.



##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,130 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+from typing import Iterable, Any
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows

Review Comment:
   Comments added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r981247894


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ split lines into words, group by words as key and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.functions import window
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+import pandas as pd
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = ("Usage: structured_network_wordcount_session_window.py <hostname> <port>")
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession\
+        .builder\
+        .appName("StructuredNetworkWordCountSessionWindow")\
+        .getOrCreate()
+
+    spark.sparkContext.setLogLevel("WARN")
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = spark\
+        .readStream\
+        .format('socket')\
+        .option('host', host)\
+        .option('port', port)\
+        .option('includeTimestamp', 'true')\
+        .load()
+
+    # Split the lines into words, retaining timestamps
+    # split() splits each line into an array, and explode() turns the array into multiple rows
+    events = lines.select(
+        explode(split(lines.value, ' ')).alias('sessionId'),
+        lines.timestamp.cast("long")
+    )
+
+    session_type = StructType(
+    [StructField("sessionId", StringType()), StructField("count", LongType()),
+    StructField("start", LongType()), StructField("end", LongType())]
+    )
+
+    def func(key, pdf_iter, state):
+        if state.hasTimedOut:
+            finished_session = state.get
+            state.remove()
+            yield pd.DataFrame({"sessionId": [finished_session[0]], "count": [finished_session[1]], "start": [finished_session[2]], "end": [finished_session[3]]})

Review Comment:
   I would do this more in a Pythonic way. e.g.)
   
   ```python
   session_id, count, start, end = state.get
   pd.DataFrame({"sessionId": session_id, "count": count, "start": start, "end", end})
   ```
   
   or
   
   ```python
   pd.DataFrame(dict(zip(("sessionId", "count", "start", "end"), state.get)))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #38013: [SPARK-40509][SS][PYTHON] add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #38013:
URL: https://github.com/apache/spark/pull/38013#issuecomment-1259256936

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #38013: [SPARK-40509][SS][PYTHON] Add example for applyInPandasWithState

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #38013:
URL: https://github.com/apache/spark/pull/38013#discussion_r1029932042


##########
examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py:
##########
@@ -0,0 +1,139 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+r"""
+ Split lines into words, group by words and use the state per key to track session of each key.
+
+ Usage: structured_network_wordcount_windowed.py <hostname> <port>
+ <hostname> and <port> describe the TCP server that Structured Streaming
+ would connect to receive data.
+
+ To run this on your local machine, you need to first run a Netcat server
+    `$ nc -lk 9999`
+ and then run the example
+    `$ bin/spark-submit
+    examples/src/main/python/sql/streaming/structured_network_wordcount_session_window.py
+    localhost 9999`
+"""
+import sys
+import math
+from typing import Iterable, Any
+
+import pandas as pd
+
+from pyspark.sql import SparkSession
+from pyspark.sql.functions import explode
+from pyspark.sql.functions import split
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+)
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+
+if __name__ == "__main__":
+    if len(sys.argv) != 3:
+        msg = "Usage: structured_network_wordcount_session_window.py <hostname> <port>"
+        print(msg, file=sys.stderr)
+        sys.exit(-1)
+
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+
+    spark = SparkSession.builder.appName(
+        "StructuredNetworkWordCountSessionWindow"
+    ).getOrCreate()
+
+    # Create DataFrame representing the stream of input lines from connection to host:port
+    lines = (
+        spark.readStream.format("socket")
+        .option("host", host)
+        .option("port", port)
+        .option("includeTimestamp", "true")
+        .load()
+    )
+
+    # Split the lines into words, retaining timestamps, each word become a sessionId
+    events = lines.select(
+        explode(split(lines.value, " ")).alias("sessionId"),
+        lines.timestamp.cast("long"),
+    )
+
+    # Type of output records.
+    session_schema = StructType(
+        [
+            StructField("sessionId", StringType()),
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+    # Type of group state.
+    # Omit the session id in the state since it is available as group key
+    session_state_schema = StructType(
+        [
+            StructField("count", LongType()),
+            StructField("start", LongType()),
+            StructField("end", LongType()),
+        ]
+    )
+
+    def func(
+        key: Any, pdf_iter: Iterable[pd.DataFrame], state: GroupState
+    ) -> Iterable[pd.DataFrame]:
+        if state.hasTimedOut:
+            count, start, end = state.get
+            state.remove()
+            yield pd.DataFrame(
+                {
+                    "sessionId": [key[0]],
+                    "count": [count],
+                    "start": [start],
+                    "end": [end],
+                }
+            )
+        else:
+            start = math.inf
+            end = 0
+            count = 0
+            for pdf in pdf_iter:
+                start = min(start, min(pdf["timestamp"]))

Review Comment:
   I suspect the return type is not matched to the SQL type provided. Do you mind show the reproducer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org