You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2022/05/20 04:05:00 UTC
[jira] [Resolved] (SPARK-39218) Python foreachBatch streaming query cannot be stopped gracefully after pin thread mode is enabled
[ https://issues.apache.org/jira/browse/SPARK-39218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-39218.
----------------------------------
Fix Version/s: 3.3.0
Resolution: Fixed
Issue resolved by pull request 36589
[https://github.com/apache/spark/pull/36589]
> Python foreachBatch streaming query cannot be stopped gracefully after pin thread mode is enabled
> -------------------------------------------------------------------------------------------------
>
> Key: SPARK-39218
> URL: https://issues.apache.org/jira/browse/SPARK-39218
> Project: Spark
> Issue Type: Bug
> Components: PySpark, Structured Streaming
> Affects Versions: 3.0.3, 3.1.2, 3.2.1, 3.3.0
> Reporter: Hyukjin Kwon
> Assignee: Hyukjin Kwon
> Priority: Major
> Fix For: 3.3.0
>
>
> For example,
> {code}
> import time
> def func(batch_df, batch_id):
> time.sleep(10)
> print(batch_df.count())
> q = spark.readStream.format("rate").load().writeStream.foreachBatch(func).start()
> time.sleep(5)
> q.stop()
> {code}
> works find with pinned thread mode is disabled. Whe pinned thread mode is enabled:
> {code}
> 22/05/18 15:23:24 ERROR MicroBatchExecution: Query [id = 2538f8a2-c6e4-44c9-bf38-e6dab555267e, runId = 1d500478-1d77-46aa-b35a-585264a809b9] terminated with error
> py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
> File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 617, in _call_proxy
> return_value = getattr(self.pool[obj_id], method)(*params)
> File "/.../spark/python/pyspark/sql/utils.py", line 272, in call
> raise e
> File "/.../spark/python/pyspark/sql/utils.py", line 269, in call
> self.func(DataFrame(jdf, self.session), batch_id)
> File "<stdin>", line 3, in func
> File "/.../spark/python/pyspark/sql/dataframe.py", line 804, in count
> return int(self._jdf.count())
> File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
> return_value = get_return_value(
> File "/.../spark/python/pyspark/sql/utils.py", line 190, in deco
> return f(*a, **kw)
> File "/.../spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
> raise Py4JJavaError(
> py4j.protocol.Py4JJavaError: An error occurred while calling o44.count.
> : java.lang.InterruptedException
> at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1302)
> at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:242)
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:258)
> at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:187)
> at org.apache.spark.util.ThreadUtils$.awaitReady(ThreadUtils.scala:334)
> at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:943)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:2227)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org