You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Abdeali Kothari (JIRA)" <ji...@apache.org> on 2018/12/09 13:03:00 UTC
[jira] [Commented] (SPARK-25992) Accumulators giving KeyError in
pyspark
[ https://issues.apache.org/jira/browse/SPARK-25992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16713965#comment-16713965 ]
Abdeali Kothari commented on SPARK-25992:
-----------------------------------------
Here is a reproducible example in pyspark where using accumulators inside multiprocessing causes errors:
{code:python}
import findspark
findspark.init() # noqa
import pyspark
from pyspark.sql import functions as F
import multiprocessing
spark = pyspark.sql.SparkSession.builder.getOrCreate()
print("appId=", spark.sparkContext.applicationId)
def myfunc(x):
print("myfunc({}): appId={}".format(x, spark.sparkContext.applicationId))
myaccum = spark.sparkContext.accumulator([], pyspark.accumulators.AddingAccumulatorParam([]))
df = spark.createDataFrame(
[['a1', 'b1', 'c1', x * 1],
['a2', 'b2', 'c2', x * 2],
['a3', 'b3', 'c3', x * 3]],
['a', 'b', 'c', 'x'])
df = df.withColumn("rnd", F.rand(42))
@pyspark.sql.functions.udf
def myudf(x):
nonlocal myaccum
myaccum += ["myudf({})".format(x)]
return x
df = df.withColumn("x2", myudf(df['x']))
df.show()
print("Accum value:", myaccum.value)
return myaccum
print("Without multiproc:")
print("Got return value:", myfunc(1).value)
print("====")
print("With multiproc:")
pool = multiprocessing.Pool(1)
print("Got return value:", [i.value for i in pool.map(myfunc, [1, 2, 3])])
print("====")
{code}
> Accumulators giving KeyError in pyspark
> ---------------------------------------
>
> Key: SPARK-25992
> URL: https://issues.apache.org/jira/browse/SPARK-25992
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.3.1
> Reporter: Abdeali Kothari
> Priority: Major
>
> I am using accumulators and when I run my code, I sometimes get some warn messages. When I checked, there was nothing accumulated - not sure if I lost info from the accumulator or it worked and I can ignore this error ?
> The message:
> {noformat}
> Exception happened during processing of request from
> ('127.0.0.1', 62099)
> Traceback (most recent call last):
> File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 317, in _handle_request_noblock
> self.process_request(request, client_address)
> File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 348, in process_request
> self.finish_request(request, client_address)
> File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 361, in finish_request
> self.RequestHandlerClass(request, client_address, self)
> File "/Users/abdealijk/anaconda3/lib/python3.6/socketserver.py", line 696, in __init__
> self.handle()
> File "/usr/local/hadoop/spark2.3.1/python/pyspark/accumulators.py", line 238, in handle
> _accumulatorRegistry[aid] += update
> KeyError: 0
> ----------------------------------------
> 2018-11-09 19:09:08 ERROR DAGScheduler:91 - Failed to update accumulators for task 0
> org.apache.spark.SparkException: EOF reached before Python server acknowledged
> at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:634)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1131)
> at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1123)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1123)
> at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1206)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org