You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (Jira)" <ji...@apache.org> on 2020/08/01 01:32:00 UTC

[jira] [Resolved] (SPARK-32514) Pyspark: Issue using sql query in foreachBatch sink

     [ https://issues.apache.org/jira/browse/SPARK-32514?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jungtaek Lim resolved SPARK-32514.
----------------------------------
    Resolution: Not A Problem

Python doesn't allow abbreviating () with no param, whereas Scala does. Use `write()`, not `write`.


> Pyspark: Issue using sql query in foreachBatch sink
> ---------------------------------------------------
>
>                 Key: SPARK-32514
>                 URL: https://issues.apache.org/jira/browse/SPARK-32514
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.5, 2.4.6, 3.0.0
>            Reporter: Muru
>            Priority: Major
>
> In a pyspark SS job, trying to use sql query instead of DF API methods in foreachBatch sink
> throws AttributeError: 'JavaMember' object has no attribute 'format' exception.
>  
> However, the same thing works in Scala job. 
>  
> Please note, I tested in spark 2.4.5/2.4.6 and 3.0.0 and got the same exception.
>  
> I noticed that I could perform other operations except the write method.  
>  
> Please, let me know how to fix this issue.
>  
> See below code examples
> # Spark Scala method
> def processData(batchDF: DataFrame, batchId: Long) {
>    batchDF.createOrReplaceTempView("tbl")
>    val outdf=batchDF.sparkSession.sql("select action, count(*) as count from tbl where date='2020-06-20' group by 1")
>    outdf.printSchema()
>    outdf.show
>    outdf.coalesce(1).write.format("csv").save("/tmp/agg")
> }
>  
> ## pyspark python method
> def process_data(bdf, bid):
>   lspark = bdf._jdf.sparkSession()
>   bdf.createOrReplaceTempView("tbl")
>   outdf=lspark.sql("select action, count(*) as count from tbl where date='2020-06-20' group by 1")
>   outdf.printSchema()
>   # it works
>   outdf.show()
>   # throws AttributeError: 'JavaMember' object has no attribute 'format' exception
>   outdf.coalesce(1).write.format("csv").save("/tmp/agg1") 
>  
> Here is the full exception 
> 20/07/24 16:31:24 ERROR streaming.MicroBatchExecution: Query [id = 854a39d0-b944-4b52-bf05-cacf998e2cbd, runId = e3d4dc7d-80e1-4164-8310-805d7713fc96] terminated with error
> py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
>   File "/Users/muru/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 2381, in _call_proxy
>     return_value = getattr(self.pool[obj_id], method)(*params)
>   File "/Users/muru/spark/python/pyspark/sql/utils.py", line 191, in call
>     raise e
> AttributeError: 'JavaMember' object has no attribute 'format'
> at py4j.Protocol.getReturnValue(Protocol.java:473)
> at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
> at com.sun.proxy.$Proxy20.call(Unknown Source)
> at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
> at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$$anonfun$callForeachBatch$1.apply(ForeachBatchSink.scala:55)
> at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537)
> at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
> at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535)
> at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at [org.apache.spark.sql.execution.streaming.MicroBatchExecution.org|http://org.apache.spark.sql.execution.streaming.microbatchexecution.org/]$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
> at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
> at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
> at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
> at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
> at [org.apache.spark.sql.execution.streaming.StreamExecution.org|http://org.apache.spark.sql.execution.streaming.streamexecution.org/]$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
> at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org