You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Krishna Sangeeth KS (Jira)" <ji...@apache.org> on 2022/03/03 04:27:00 UTC
[jira] [Updated] (SPARK-38395) Pyspark issue in resolving column when there is dot (.)
[ https://issues.apache.org/jira/browse/SPARK-38395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Krishna Sangeeth KS updated SPARK-38395:
----------------------------------------
Description:
Pyspark apply in pandas have some difficulty in resolving columns when there is dot in the column name.
Here is an example that I have which reproduces the issue. Example taken by modifying doctest example [here|https://github.com/apache/spark/blob/branch-3.0/python/pyspark/sql/pandas/group_ops.py#L237-L248]
{code:python}
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("abc|database|10.159.154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10.159.154|xef", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show(){code}
This gives the below error
{code:python}
AnalysisException Traceback (most recent call last)
<ipython-input-126-b1807bb28ae3> in <module>
8 return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show()
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in applyInPandas(self, func, schema)
295 udf = pandas_udf(
296 func, returnType=schema, functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF)
--> 297 all_cols = self._extract_cols(self._gd1) + self._extract_cols(self._gd2)
298 udf_column = udf(*all_cols)
299 jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, udf_column._jc.expr())
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in _extract_cols(gd)
303 def _extract_cols(gd):
304 df = gd._df
--> 305 return [df[col] for col in df.columns]
306
307
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in <listcomp>(.0)
303 def _extract_cols(gd):
304 df = gd._df
--> 305 return [df[col] for col in df.columns]
306
307
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in __getitem__(self, item)
1378 """
1379 if isinstance(item, basestring):
-> 1380 jc = self._jdf.apply(item)
1381 return Column(jc)
1382 elif isinstance(item, Column):
~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
135 # Hide where the exception came from that shows a non-Pythonic
136 # JVM exception message.
--> 137 raise_from(converted)
138 else:
139 raise
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in raise_from(e)
AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" among (abc|database|10.159.154|xef, id, v1); did you mean to quote the `abc|database|10.159.154|xef` column?;
{code}
As we can see the column is present there in the `among` list.
When i replace `.` (dot) with `_` (underscore) the code actually works.
{code:python}
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("abc|database|10_159_154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10_159_154|xef", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 string").show()
{code}
{code:java}
+---------------------------+---+---+---+
|abc|database|10_159_154|xef| id| v1| v2|
+---------------------------+---+---+---+
| 20000101| 1|1.0| x|
| 20000102| 1|3.0| x|
| 20000101| 2|2.0| y|
| 20000102| 2|4.0| y|
+---------------------------+---+---+---+
{code}
was:
Pyspark apply in pandas have some difficult in resolving columns when there is dot in the column name.
Here is an example that I have which reproduces the issue. Example taken by modifying doctest example [here|https://github.com/apache/spark/blob/branch-3.0/python/pyspark/sql/pandas/group_ops.py#L237-L248]
{code:python}
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("abc|database|10.159.154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10.159.154|xef", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show(){code}
This gives the below error
{code:python}
AnalysisException Traceback (most recent call last)
<ipython-input-126-b1807bb28ae3> in <module>
8 return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show()
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in applyInPandas(self, func, schema)
295 udf = pandas_udf(
296 func, returnType=schema, functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF)
--> 297 all_cols = self._extract_cols(self._gd1) + self._extract_cols(self._gd2)
298 udf_column = udf(*all_cols)
299 jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, udf_column._jc.expr())
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in _extract_cols(gd)
303 def _extract_cols(gd):
304 df = gd._df
--> 305 return [df[col] for col in df.columns]
306
307
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in <listcomp>(.0)
303 def _extract_cols(gd):
304 df = gd._df
--> 305 return [df[col] for col in df.columns]
306
307
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in __getitem__(self, item)
1378 """
1379 if isinstance(item, basestring):
-> 1380 jc = self._jdf.apply(item)
1381 return Column(jc)
1382 elif isinstance(item, Column):
~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
135 # Hide where the exception came from that shows a non-Pythonic
136 # JVM exception message.
--> 137 raise_from(converted)
138 else:
139 raise
~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in raise_from(e)
AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" among (abc|database|10.159.154|xef, id, v1); did you mean to quote the `abc|database|10.159.154|xef` column?;
{code}
As we can see the column is present there in the `among` list.
When i replace `.` (dot) with `_` (underscore) the code actually works.
{code:python}
df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("abc|database|10_159_154|xef", "id", "v1"))
df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("abc|database|10_159_154|xef", "id", "v2"))
def asof_join(l, r):
return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 string").show()
{code}
{code:java}
+---------------------------+---+---+---+
|abc|database|10_159_154|xef| id| v1| v2|
+---------------------------+---+---+---+
| 20000101| 1|1.0| x|
| 20000102| 1|3.0| x|
| 20000101| 2|2.0| y|
| 20000102| 2|4.0| y|
+---------------------------+---+---+---+
{code}
> Pyspark issue in resolving column when there is dot (.)
> -------------------------------------------------------
>
> Key: SPARK-38395
> URL: https://issues.apache.org/jira/browse/SPARK-38395
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.0.0
> Environment: Issue found in Mac OS Catalina, Pyspark 3.0
> Reporter: Krishna Sangeeth KS
> Priority: Major
>
> Pyspark apply in pandas have some difficulty in resolving columns when there is dot in the column name.
> Here is an example that I have which reproduces the issue. Example taken by modifying doctest example [here|https://github.com/apache/spark/blob/branch-3.0/python/pyspark/sql/pandas/group_ops.py#L237-L248]
> {code:python}
> df1 = spark.createDataFrame(
> [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
> ("abc|database|10.159.154|xef", "id", "v1"))
> df2 = spark.createDataFrame(
> [(20000101, 1, "x"), (20000101, 2, "y")],
> ("abc|database|10.159.154|xef", "id", "v2"))
> def asof_join(l, r):
> return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
> asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show(){code}
> This gives the below error
> {code:python}
> AnalysisException Traceback (most recent call last)
> <ipython-input-126-b1807bb28ae3> in <module>
> 8 return pd.merge_asof(l, r, on="abc|database|10.159.154|xef", by="id")
> 9 df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
> ---> 10 asof_join, schema="`abc|database|10.159.154|xef` int, id int, v1 double, v2 string").show()
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in applyInPandas(self, func, schema)
> 295 udf = pandas_udf(
> 296 func, returnType=schema, functionType=PythonEvalType.SQL_COGROUPED_MAP_PANDAS_UDF)
> --> 297 all_cols = self._extract_cols(self._gd1) + self._extract_cols(self._gd2)
> 298 udf_column = udf(*all_cols)
> 299 jdf = self._gd1._jgd.flatMapCoGroupsInPandas(self._gd2._jgd, udf_column._jc.expr())
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in _extract_cols(gd)
> 303 def _extract_cols(gd):
> 304 df = gd._df
> --> 305 return [df[col] for col in df.columns]
> 306
> 307
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/pandas/group_ops.py in <listcomp>(.0)
> 303 def _extract_cols(gd):
> 304 df = gd._df
> --> 305 return [df[col] for col in df.columns]
> 306
> 307
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/dataframe.py in __getitem__(self, item)
> 1378 """
> 1379 if isinstance(item, basestring):
> -> 1380 jc = self._jdf.apply(item)
> 1381 return Column(jc)
> 1382 elif isinstance(item, Column):
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/py4j/java_gateway.py in __call__(self, *args)
> 1303 answer = self.gateway_client.send_command(command)
> 1304 return_value = get_return_value(
> -> 1305 answer, self.gateway_client, self.target_id, self.name)
> 1306
> 1307 for temp_arg in temp_args:
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
> 135 # Hide where the exception came from that shows a non-Pythonic
> 136 # JVM exception message.
> --> 137 raise_from(converted)
> 138 else:
> 139 raise
> ~/anaconda3/envs/py37/lib/python3.7/site-packages/pyspark/sql/utils.py in raise_from(e)
> AnalysisException: Cannot resolve column name "abc|database|10.159.154|xef" among (abc|database|10.159.154|xef, id, v1); did you mean to quote the `abc|database|10.159.154|xef` column?;
> {code}
> As we can see the column is present there in the `among` list.
> When i replace `.` (dot) with `_` (underscore) the code actually works.
> {code:python}
> df1 = spark.createDataFrame(
> [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
> ("abc|database|10_159_154|xef", "id", "v1"))
> df2 = spark.createDataFrame(
> [(20000101, 1, "x"), (20000101, 2, "y")],
> ("abc|database|10_159_154|xef", "id", "v2"))
> def asof_join(l, r):
> return pd.merge_asof(l, r, on="abc|database|10_159_154|xef", by="id")
> df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
> asof_join, schema="`abc|database|10_159_154|xef` int, id int, v1 double, v2 string").show()
> {code}
> {code:java}
> +---------------------------+---+---+---+
> |abc|database|10_159_154|xef| id| v1| v2|
> +---------------------------+---+---+---+
> | 20000101| 1|1.0| x|
> | 20000102| 1|3.0| x|
> | 20000101| 2|2.0| y|
> | 20000102| 2|4.0| y|
> +---------------------------+---+---+---+
> {code}
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org