You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Abdeali Kothari (JIRA)" <ji...@apache.org> on 2018/10/08 04:59:00 UTC
[jira] [Updated] (SPARK-25591) PySpark Accumulators with multiple
PythonUDFs
[ https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Abdeali Kothari updated SPARK-25591:
------------------------------------
Description:
When having multiple Python UDFs - the last Python UDF's accumulator is the only accumulator that gets updated.
{code:python}
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark import AccumulatorParam
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
test_accum = spark.sparkContext.accumulator(0.0)
SHUFFLE = False
def main(data):
print(">>> Check0", test_accum.value)
def test(x):
global test_accum
test_accum += 1.0
return x
print(">>> Check1", test_accum.value)
def test2(x):
global test_accum
test_accum += 100.0
return x
print(">>> Check2", test_accum.value)
func_udf = F.udf(test, T.DoubleType())
print(">>> Check3", test_accum.value)
func_udf2 = F.udf(test2, T.DoubleType())
print(">>> Check4", test_accum.value)
data = data.withColumn("out1", func_udf(data["a"]))
if SHUFFLE:
data = data.repartition(2)
print(">>> Check5", test_accum.value)
data = data.withColumn("out2", func_udf2(data["b"]))
if SHUFFLE:
data = data.repartition(2)
print(">>> Check6", test_accum.value)
data.show() # ACTION
print(">>> Check7", test_accum.value)
return data
df = spark.createDataFrame([
[1.0, 2.0]
], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for field_name in ["a", "b"]]))
df2 = main(df)
{code}
{code:python}
######## Output 1 - with SHUFFLE=False
...
# >>> Check7 100.0
######## Output 2 - with SHUFFLE=True
...
# >>> Check7 101.0
{code}
Basically looks like:
- Accumulator works only for last UDF before a shuffle-like operation
was:
When having multiple Python UDFs - the last Python UDF's accumulator is the only accumulator that gets updated.
{code:python}
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark import AccumulatorParam
spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
test_accum = spark.sparkContext.accumulator(0.0)
SHUFFLE = False
def main(data):
print(">>> Check0", test_accum.value)
def test(x):
global test_accum
test_accum += 1.0
return x
print(">>> Check1", test_accum.value)
def test2(x):
global test_accum
test_accum += 100.0
return x
print(">>> Check2", test_accum.value)
func_udf = F.udf(test, T.DoubleType())
print(">>> Check3", test_accum.value)
func_udf2 = F.udf(test2, T.DoubleType())
print(">>> Check4", test_accum.value)
data = data.withColumn("out1", func_udf(data["a"]))
if SHUFFLE:
data = data.repartition(2)
print(">>> Check5", test_accum.value)
data = data.withColumn("out2", func_udf2(data["b"]))
if SHUFFLE:
data = data.repartition(2)
print(">>> Check6", test_accum.value)
data.show() # ACTION
print(">>> Check7", test_accum.value)
return data
df = spark.createDataFrame([
[1.0, 2.0]
], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for field_name in ["a", "b"]]))
df2 = main(df)
{code}
######## Output 1 - with SHUFFLE=False
...
# >>> Check7 100.0
######## Output 2 - with SHUFFLE=True
...
# >>> Check7 101.0
Basically looks like:
- Accumulator works only for last UDF before a shuffle-like operation
> PySpark Accumulators with multiple PythonUDFs
> ---------------------------------------------
>
> Key: SPARK-25591
> URL: https://issues.apache.org/jira/browse/SPARK-25591
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.3.2
> Reporter: Abdeali Kothari
> Priority: Major
>
> When having multiple Python UDFs - the last Python UDF's accumulator is the only accumulator that gets updated.
> {code:python}
> import pyspark
> from pyspark.sql import SparkSession, Row
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark import AccumulatorParam
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> test_accum = spark.sparkContext.accumulator(0.0)
> SHUFFLE = False
> def main(data):
> print(">>> Check0", test_accum.value)
> def test(x):
> global test_accum
> test_accum += 1.0
> return x
> print(">>> Check1", test_accum.value)
> def test2(x):
> global test_accum
> test_accum += 100.0
> return x
> print(">>> Check2", test_accum.value)
> func_udf = F.udf(test, T.DoubleType())
> print(">>> Check3", test_accum.value)
> func_udf2 = F.udf(test2, T.DoubleType())
> print(">>> Check4", test_accum.value)
> data = data.withColumn("out1", func_udf(data["a"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check5", test_accum.value)
> data = data.withColumn("out2", func_udf2(data["b"]))
> if SHUFFLE:
> data = data.repartition(2)
> print(">>> Check6", test_accum.value)
> data.show() # ACTION
> print(">>> Check7", test_accum.value)
> return data
> df = spark.createDataFrame([
> [1.0, 2.0]
> ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for field_name in ["a", "b"]]))
> df2 = main(df)
> {code}
> {code:python}
> ######## Output 1 - with SHUFFLE=False
> ...
> # >>> Check7 100.0
> ######## Output 2 - with SHUFFLE=True
> ...
> # >>> Check7 101.0
> {code}
> Basically looks like:
> - Accumulator works only for last UDF before a shuffle-like operation
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org