You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "bruce_zhao (Jira)" <ji...@apache.org> on 2020/02/26 03:26:00 UTC

[jira] [Updated] (SPARK-30952) Grouped pandas_udf crashed when a group returned an empty DataFrame

     [ https://issues.apache.org/jira/browse/SPARK-30952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

bruce_zhao updated SPARK-30952:
-------------------------------
    Description: 
We are trying to apply three-sigma rule in grouped data to detect anomaly data. We found that it's always crashed when a group returns an empty DataFrame (empty means no anomaly). 

 

Sample Code:
{code:java}
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, StringType, LongType
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np


def check_pdf():
    schema = StructType([
        StructField("customer_id", StringType(), True),
        StructField("count", LongType(), True)
    ])

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def handler(pdf):
        mean = float(np.mean(pdf["count"]))
        sigma = float(np.std(pdf["count"], ddof=1))
        return pdf[pdf["count"] > mean + 3 * sigma]

    return handler


def main():
    spark = SparkSession.builder \
        .appName("AppTest") \
        .master("local[4]") \
        .config("spark.driver.host", "localhost") \
        .config("spark.sql.shuffle.partitions", 2) \
        .getOrCreate()

    df = spark.createDataFrame([
        {
            "count": 15,
            "customer_id": "c1"
        },
        {
            "count": 11,
            "customer_id": "c1"
        },
        {
            "count": 11,
            "customer_id": "c2"
        }
    ])

    result = df.groupby("customer_id").apply(check_pdf()).collect()
    print(result)

    spark.stop()


if __name__ == '__main__':
    main()

{code}
Exception:
{code:java}
2020-02-26 10:56:45 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 4)
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486) 
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475) 
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:178) 
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) 
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) 
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
at org.apache.spark.scheduler.Task.run(Task.scala:121) 
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:159) 
... 20 more



{code}
 

 

 

  was:
We are trying to apply three-sigma rule in grouped data to detect anomaly data. We found that it's always crashed when a group returns an empty DataFrame (empty means no anomaly). 

 

Sample Code:
{code:java}
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, StringType, LongType
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np


def check_pdf():
    schema = StructType([
        StructField("customer_id", StringType(), True),
        StructField("count", LongType(), True)
    ])

    @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
    def handler(pdf):
        mean = float(np.mean(pdf["count"]))
        sigma = float(np.std(pdf["count"], ddof=1))
        print(mean+3*sigma)
        return pdf[pdf["count"] > mean + 3 * sigma]

    return handler


def main():
    spark = SparkSession.builder \
        .appName("AppTest") \
        .master("local[4]") \
        .config("spark.driver.host", "localhost") \
        .config("spark.sql.shuffle.partitions", 2) \
        .getOrCreate()

    df = spark.createDataFrame([
        {
            "count": 15,
            "customer_id": "c1"
        },
        {
            "count": 11,
            "customer_id": "c1"
        },
        {
            "count": 11,
            "customer_id": "c2"
        }
    ])

    result = df.groupby("customer_id").apply(check_pdf()).collect()
    print(result)

    spark.stop()


if __name__ == '__main__':
    main()

{code}
Exception:
{code:java}

2020-02-26 10:56:45 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 4)
org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486) 
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475) 
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:178) 
at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) 
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) 
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
at org.apache.spark.scheduler.Task.run(Task.scala:121) 
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:159) 
... 20 more



{code}
 

 

 


> Grouped pandas_udf crashed when a group returned an empty DataFrame 
> --------------------------------------------------------------------
>
>                 Key: SPARK-30952
>                 URL: https://issues.apache.org/jira/browse/SPARK-30952
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.0
>            Reporter: bruce_zhao
>            Priority: Major
>
> We are trying to apply three-sigma rule in grouped data to detect anomaly data. We found that it's always crashed when a group returns an empty DataFrame (empty means no anomaly). 
>  
> Sample Code:
> {code:java}
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> from pyspark.sql.types import StructField, StructType, StringType, LongType
> import pandas as pd
> from pyspark.sql import SparkSession
> import numpy as np
> def check_pdf():
>     schema = StructType([
>         StructField("customer_id", StringType(), True),
>         StructField("count", LongType(), True)
>     ])
>     @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
>     def handler(pdf):
>         mean = float(np.mean(pdf["count"]))
>         sigma = float(np.std(pdf["count"], ddof=1))
>         return pdf[pdf["count"] > mean + 3 * sigma]
>     return handler
> def main():
>     spark = SparkSession.builder \
>         .appName("AppTest") \
>         .master("local[4]") \
>         .config("spark.driver.host", "localhost") \
>         .config("spark.sql.shuffle.partitions", 2) \
>         .getOrCreate()
>     df = spark.createDataFrame([
>         {
>             "count": 15,
>             "customer_id": "c1"
>         },
>         {
>             "count": 11,
>             "customer_id": "c1"
>         },
>         {
>             "count": 11,
>             "customer_id": "c2"
>         }
>     ])
>     result = df.groupby("customer_id").apply(check_pdf()).collect()
>     print(result)
>     spark.stop()
> if __name__ == '__main__':
>     main()
> {code}
> Exception:
> {code:java}
> 2020-02-26 10:56:45 ERROR Executor:91 - Exception in task 0.0 in stage 1.0 (TID 4)
> org.apache.spark.SparkException: Python worker exited unexpectedly (crashed) 
> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:486) 
> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$3.applyOrElse(PythonRunner.scala:475) 
> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:178) 
> at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) 
> at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406) 
> at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) 
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) 
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 
> at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) 
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) 
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) 
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) 
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) 
> at org.apache.spark.scheduler.Task.run(Task.scala:121) 
> at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) 
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) 
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)Caused by: java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:159) 
> ... 20 more
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org