You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Maciej Szymkiewicz (Jira)" <ji...@apache.org> on 2023/03/24 11:16:00 UTC

[jira] [Comment Edited] (SPARK-42910) Generic annotation of class attribute in abstract class is NOT initalized in inherited classes

    [ https://issues.apache.org/jira/browse/SPARK-42910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17704575#comment-17704575 ] 

Maciej Szymkiewicz edited comment on SPARK-42910 at 3/24/23 11:15 AM:
----------------------------------------------------------------------

It is no longer generic, so that cannot be a problem. 

Additionally, the issue seems to disappear when classes are defined externally:

{code:python}
# foo,py

from abc import ABC
from typing import Generic, TypeVar, Callable

T = TypeVar("T")

class Foo:
    ...

class A(ABC, Generic[T]):
    base_record: Callable[..., T]

class B(A):
    base_record = Foo

class C(B):
    ...

def f(_: int) -> int:
    print(C.base_record)
    return 1
{code}

and then

{code:python}
from operator import add
from foo import C, f
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("schema_test")\
    .getOrCreate()

spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add) 
{code}

so it makes sense to focus further investigation on the way how we prepare locally defined classes for shipping over the wire.


was (Author: zero323):
It is no longer generic, so that cannot be a problem. 

Additionally, the issue seems to disappear when classes are defined externally:

{code:python}
# foo,py

from abc import ABC
from typing import Generic, TypeVar, Callable

T = TypeVar("T")

class Foo:
    ...

class A(ABC, Generic[T]):
    base_record: Callable[..., T]

class B(A):
    base_record = Foo

class C(B):
    ...

def f(_: int) -> int:
    print(C.base_record)
    return 1
{code}

and then

{code: python}
from operator import add
from foo import C, f
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("schema_test")\
    .getOrCreate()

spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add) 
{code}

so it makes sense to focus further investigation on the way how we prepare locally defined classes for shipping over the wire.

> Generic annotation of class attribute in abstract class is NOT initalized in inherited classes
> ----------------------------------------------------------------------------------------------
>
>                 Key: SPARK-42910
>                 URL: https://issues.apache.org/jira/browse/SPARK-42910
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.3.0, 3.3.2
>         Environment: Tested in two environments:
>  # Databricks
> Pyspark Version: 3.3.0
> Python Version: 3.9.15
>  # Local
> Pyspark Verison: 3.3.2
> Python Version: 3.3.10
>            Reporter: Jon Farzanfar
>            Priority: Minor
>
> We are trying to leverage generics to better type our code base.  The example below shows the problem we are having, however without generics this works completely fine in pyspark however with generics it doesn't but does locally without leveraging pyspark.  
> Output for local: 
>  
> {code:java}
> <class '__main__.Foo'>{code}
>  
> TraceBack for pyspark: 
> {code:java}
> AttributeError: type object 'C' has no attribute 'base_record'
> 	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
> 	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
> 	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
> 	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
> 	at scala.collection.Iterator.foreach(Iterator.scala:943)
> 	at scala.collection.Iterator.foreach$(Iterator.scala:943)
> 	at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
> 	at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
> 	at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:105)
> 	at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:49)
> 	at scala.collection.TraversableOnce.to(TraversableOnce.scala:366)
> 	at scala.collection.TraversableOnce.to$(TraversableOnce.scala:364)
> 	at org.apache.spark.InterruptibleIterator.to(InterruptibleIterator.scala:28)
> 	at scala.collection.TraversableOnce.toBuffer(TraversableOnce.scala:358)
> 	at scala.collection.TraversableOnce.toBuffer$(TraversableOnce.scala:358)
> 	at org.apache.spark.InterruptibleIterator.toBuffer(InterruptibleIterator.scala:28)
> 	at scala.collection.TraversableOnce.toArray(TraversableOnce.scala:345)
> 	at scala.collection.TraversableOnce.toArray$(TraversableOnce.scala:339)
> 	at org.apache.spark.InterruptibleIterator.toArray(InterruptibleIterator.scala:28)
> 	at org.apache.spark.rdd.RDD.$anonfun$collect$2(RDD.scala:1021)
> 	at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:136)
> 	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
> 	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	... 1 more {code}
>  
> Code:
>  
> {code:java}
> from abc import ABC
> from typing import Generic, TypeVar, Callable
> from operator import add
> from pyspark.sql import SparkSession
> T = TypeVar("T")
> class Foo:
>     ...
> class A(ABC, Generic[T]):
>     base_record: Callable[..., T]
> class B(A):
>     base_record = Foo
> class C(B):
>     ...
> def f(_: int) -> int:
>     print(C.base_record)
>     return 1
> spark = SparkSession\
>     .builder\
>     .appName("schema_test")\
>     .getOrCreate()
> spark.sparkContext.parallelize(range(1, 100)).map(f).reduce(add) {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org