You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rafal Ganczarek (JIRA)" <ji...@apache.org> on 2018/05/08 11:20:00 UTC

[jira] [Created] (SPARK-24208) Cannot resolve column in self join after applying Pandas UDF

Rafal Ganczarek created SPARK-24208:
---------------------------------------

             Summary: Cannot resolve column in self join after applying Pandas UDF
                 Key: SPARK-24208
                 URL: https://issues.apache.org/jira/browse/SPARK-24208
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 2.3.0
         Environment: AWS EMR 5.13.0
Amazon Hadoop distribution 2.8.3
Spark 2.3.0
Pandas 0.22.0
            Reporter: Rafal Ganczarek


I noticed that after applying Pandas UDF function, a self join of resulted DataFrame will fail to resolve columns. The workaround that I found is to recreate DataFrame with its RDD and schema.

Below you can find a Python code that reproduces the issue.
{code:java}
from pyspark import Row
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('key long, col string', PandasUDFType.GROUPED_MAP)
def dummy_pandas_udf(test_df):
    return test_df[['key','col']]

df = spark.createDataFrame([Row(key=1,col='A'), Row(key=1,col='B'), Row(key=2,col='C')])

# transformation that causes the issue
df = df.groupBy('key').apply(dummy_pandas_udf)

# WORKAROUND that fixes the issue
# df = spark.createDataFrame(df.rdd, df.schema)

df.alias('temp0').join(df.alias('temp1'), F.col('temp0.key') == F.col('temp1.key')).show()
{code}
If workaround line is commented out, then above code fails with the following error:
{code:java}
AnalysisExceptionTraceback (most recent call last)
<ipython-input-88-8de763656d6d> in <module>()
     12 # df = spark.createDataFrame(df.rdd, df.schema)
     13 
---> 14 df.alias('temp0').join(df.alias('temp1'), F.col('temp0.key') == F.col('temp1.key')).show()

/usr/lib/spark/python/pyspark/sql/dataframe.py in join(self, other, on, how)
    929                 on = self._jseq([])
    930             assert isinstance(how, basestring), "how should be basestring"
--> 931             jdf = self._jdf.join(other._jdf, on, how)
    932         return DataFrame(jdf, self.sql_ctx)
    933 

/usr/lib/spark/python/lib/py4j-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161 
   1162         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u"cannot resolve '`temp0.key`' given input columns: [temp0.key, temp0.col];;\n'Join Inner, ('temp0.key = 'temp1.key)\n:- AnalysisBarrier\n:     +- SubqueryAlias temp0\n:        +- FlatMapGroupsInPandas [key#4099L], dummy_pandas_udf(col#4098, key#4099L), [key#4104L, col#4105]\n:           +- Project [key#4099L, col#4098, key#4099L]\n:              +- LogicalRDD [col#4098, key#4099L], false\n+- AnalysisBarrier\n      +- SubqueryAlias temp1\n         +- FlatMapGroupsInPandas [key#4099L], dummy_pandas_udf(col#4098, key#4099L), [key#4104L, col#4105]\n            +- Project [key#4099L, col#4098, key#4099L]\n               +- LogicalRDD [col#4098, key#4099L], false\n"
{code}
The same happens with use of Spark SQL.
{code}
df.createOrReplaceTempView('df')
spark.sql('''
    SELECT 
        *
    FROM df temp0
    LEFT JOIN df temp1 ON
        temp0.key == temp1.key
''').show()
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org