You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2022/05/20 04:05:00 UTC

[jira] [Resolved] (SPARK-39218) Python foreachBatch streaming query cannot be stopped gracefully after pin thread mode is enabled

     [ https://issues.apache.org/jira/browse/SPARK-39218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-39218.
----------------------------------
    Fix Version/s: 3.3.0
       Resolution: Fixed

Issue resolved by pull request 36589
[https://github.com/apache/spark/pull/36589]

> Python foreachBatch streaming query cannot be stopped gracefully after pin thread mode is enabled
> -------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-39218
>                 URL: https://issues.apache.org/jira/browse/SPARK-39218
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Structured Streaming
>    Affects Versions: 3.0.3, 3.1.2, 3.2.1, 3.3.0
>            Reporter: Hyukjin Kwon
>            Assignee: Hyukjin Kwon
>            Priority: Major
>             Fix For: 3.3.0
>
>
> For example,
> {code}
> import time
> def func(batch_df, batch_id):
>     time.sleep(10)
>     print(batch_df.count())
> q = spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
> time.sleep(5)
> q.stop()
> {code}
> works find with pinned thread mode is disabled. Whe pinned thread mode is enabled:
> {code}
> 22/05/18 15:23:24 ERROR MicroBatchExecution: Query [id = 2538f8a2-c6e4-44c9-bf38-e6dab555267e, runId = 1d500478-1d77-46aa-b35a-585264a809b9] terminated with error
> py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
>   File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
>     return_value = getattr(self.pool[obj_id], method)(*params)
>   File "/.../spark/python/pyspark/sql/utils.py", line 272, in call
>     raise e
>   File "/.../spark/python/pyspark/sql/utils.py", line 269, in call
>     self.func(DataFrame(jdf, self.session), batch_id)
>   File "<stdin>", line 3, in func
>   File "/.../spark/python/pyspark/sql/dataframe.py", line 804, in count
>     return int(self._jdf.count())
>   File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
>     return_value = get_return_value(
>   File "/.../spark/python/pyspark/sql/utils.py", line 190, in deco
>     return f(*a, **kw)
>   File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
>     raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o44.count.
> : java.lang.InterruptedException
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
> 	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
> 	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
> 	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
> 	at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:334)
> 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:943)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2227)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org