You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shiyuan <gs...@gmail.com> on 2018/04/09 17:50:34 UTC

A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

Hi Spark Users,
    The following code snippet has an "attribute missing" error while the
attribute exists.  This bug is  triggered by a particular sequence of of
"select", "groupby" and "join".  Note that if I take away the "select"  in
#line B,  the code runs without error.   However, the "select" in #line B
includes all columns in the dataframe and hence should  not affect the
final result.


import pyspark.sql.functions as F
df =
spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])

df = df.withColumnRenamed("k","kk")\
  .select("ID","score","LABEL","kk")    #line B

df_t =
df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1)
df = df.join(df_t.select("ID"),["ID"])
df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
df = df.join(df_sw, ["ID","kk"])

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

Posted by Shiyuan <gs...@gmail.com>.
Here it is :
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2991198123660769/823198936734135/866038034322120/latest.html


On Wed, Apr 11, 2018 at 10:55 AM, Alessandro Solimando <
alessandro.solimando@gmail.com> wrote:

> Hi Shiyuan,
> can you show us the output of ¨explain¨ over df (as a last step)?
>
> On 11 April 2018 at 19:47, Shiyuan <gs...@gmail.com> wrote:
>
>> Variable name binding is a python thing, and Spark should not care how
>> the variable is named. What matters is the dependency graph. Spark fails to
>> handle this dependency graph correctly for which I am quite surprised: this
>> is just a simple combination of three very common sql operations.
>>
>>
>> On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta <
>> gourav.sengupta@gmail.com> wrote:
>>
>>> Hi Shiyuan,
>>>
>>> I do not know whether I am right, but I would prefer to avoid
>>> expressions in Spark as:
>>>
>>> df = <<some transformation on df>>
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gs...@gmail.com> wrote:
>>>
>>>> Here is the pretty print of the physical plan which reveals some
>>>> details about what causes the bug (see the lines highlighted in bold):
>>>> WithColumnRenamed() fails to update the dependency graph correctly:
>>>>
>>>>
>>>> 'Resolved attribute(s) kk#144L missing from
>>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>>> the operation: kk. Please check if the right attribute(s) are used
>>>>
>>>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
>>>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>>>>    :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>>    :  +- Join Inner, (ID#64 = ID#99)
>>>>    :     :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>>    :     :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>>>>    :     :     +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>>>>    :     +- Project [ID#99]
>>>>    :        +- Filter (nL#90L > cast(1 as bigint))
>>>>    :           +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100)
>>>> AS nL#90L]
>>>>    :              +- Project [ID#99, score#102, LABEL#100, kk#73L]
>>>>    :                 +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
>>>> score#102]
>>>>    :                    +- LogicalRDD [ID#99, LABEL#100, k#101L,
>>>> score#102]
>>>>    +- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>>>>       +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
>>>> count#118L]
>>>>          +- Project [ID#135, score#138, LABEL#136, kk#128L]
>>>>             +- Join Inner, (ID#135 = ID#99)
>>>>                :- Project [ID#135, score#138, LABEL#136, kk#128L]
>>>>                :  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
>>>> score#138]*
>>>>                :     +- LogicalRDD [ID#135, LABEL#136, k#137L,
>>>> score#138]
>>>>                +- Project [ID#99]
>>>>                   +- Filter (nL#90L > cast(1 as bigint))
>>>>                      +- Aggregate [ID#99], [ID#99, count(distinct
>>>> LABEL#100) AS nL#90L]
>>>>                         +- *!Project [ID#99, score#102, LABEL#100,
>>>> kk#128L]*
>>>>                            +-* Project [ID#99, LABEL#100, k#101L AS
>>>> kk#73L, score#102]*
>>>>                               +- LogicalRDD [ID#99, LABEL#100, k#101L,
>>>> score#102]
>>>>
>>>> Here is the code which generates the error:
>>>>
>>>> import pyspark.sql.functions as F
>>>> from pyspark.sql import Row
>>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRename
>>>> d("k","kk").select("ID","score","LABEL","kk")
>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>> ilter(F.col("nL")>1)
>>>> df = df.join(df_t.select("ID"),["ID"])
>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>> "cnt1")
>>>> df = df.join(df_sw, ["ID","kk"])
>>>>
>>>>
>>>> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gs...@gmail.com> wrote:
>>>>
>>>>> The spark warning about Row instead of Dict is not the culprit. The
>>>>> problem still persists after I use Row instead of Dict to generate the
>>>>> dataframe.
>>>>>
>>>>> Here is the expain() output regarding the reassignment of df as Gourav
>>>>> suggests to run, They look the same except that  the serial numbers
>>>>> following the columns are different(eg. ID#7273 vs. ID#7344).
>>>>>
>>>>> this is the output of df.explain() after df =
>>>>> df.join(df_t.select("ID"),["ID"])
>>>>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
>>>>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
>>>>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
>>>>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
>>>>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
>>>>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort
>>>>> [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5)
>>>>> Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
>>>>> functions=[finalmerge_count(distinct merge count#7314L) AS
>>>>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
>>>>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
>>>>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
>>>>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
>>>>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
>>>>> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
>>>>> isnotnull(ID#7303) +- *(3) Scan ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]
>>>>>
>>>>>
>>>>> In comparison, this is the output of df1.explain() after  df1 =
>>>>> df.join(df_t.select("ID"),["ID"])?
>>>>> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
>>>>> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
>>>>> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344,
>>>>> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS
>>>>> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan
>>>>> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort
>>>>> [ID#7374 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5)
>>>>> Filter (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374],
>>>>> functions=[finalmerge_count(distinct merge count#7385L) AS
>>>>> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +-
>>>>> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct
>>>>> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374,
>>>>> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374,
>>>>> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375],
>>>>> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter
>>>>> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375
>>>>> ,k#7376L,score#7377]
>>>>>
>>>>>
>>>>> Here is the code I run and the error I get in Spark 2.3.0. By looking
>>>>> at the error,  the cause seems to be that  spark doesn't look up the column
>>>>> by its name but by a serial number and  the serial number somehow is messed
>>>>> up.
>>>>>
>>>>> import pyspark.sql.functions as F
>>>>> from pyspark.sql import Row
>>>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)])
>>>>>
>>>>> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
>>>>>   #line B
>>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>>> ilter(F.col("nL")>1)
>>>>> df = df.join(df_t.select("ID"),["ID"])
>>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>>> "cnt1")
>>>>> df = df.join(df_sw, ["ID","kk"])
>>>>>
>>>>> This is the error:
>>>>> 'Resolved attribute(s) kk#144L missing from
>>>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>>>> the operation: kk. Please check if the right attribute(s) are
>>>>> used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, cnt1#140L]\n+- Join
>>>>> Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- Project [ID#88,
>>>>> score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = ID#118)\n : :-
>>>>> Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project [ID#88,
>>>>> LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, LABEL#89,
>>>>> k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter (nL#110L >
>>>>> cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, count(distinct
>>>>> LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, LABEL#119,
>>>>> kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n :
>>>>> +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n +- Project
>>>>> [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- Aggregate [ID#150,
>>>>> kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- Project [ID#150,
>>>>> score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = ID#118)\n :-
>>>>> Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project [ID#150,
>>>>> LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD [ID#150,
>>>>> LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- Filter
>>>>> (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118,
>>>>> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121,
>>>>> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
>>>>> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n'
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <
>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> what I am curious about is the reassignment of df.
>>>>>>
>>>>>> Can you please look into the explain plan of df after the statement
>>>>>> df = df.join(df_t.select("ID"),["ID"])? And then compare with the
>>>>>> explain plan of df1 after the statement df1 = df.join(df_t.select("ID"),["ID
>>>>>> "])?
>>>>>>
>>>>>> Its late here, but I am yet to go through this completely.  But I
>>>>>> think that SPARK does throw a warning mentioning us to use Row instead of
>>>>>> Dictionary.
>>>>>>
>>>>>> It will be of help if you could kindly try using the below statement
>>>>>> and go through your used case once again (I am yet to go through all the
>>>>>> lines):
>>>>>>
>>>>>>
>>>>>>
>>>>>> from pyspark.sql import Row
>>>>>>
>>>>>> df = spark.createDataFrame([Row(score =
>>>>>> 1.0,ID="abc",LABEL=True,k=2), Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gs...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Spark Users,
>>>>>>>     The following code snippet has an "attribute missing" error
>>>>>>> while the attribute exists.  This bug is  triggered by a particular
>>>>>>> sequence of of "select", "groupby" and "join".  Note that if I take away
>>>>>>> the "select"  in #line B,  the code runs without error.   However, the
>>>>>>> "select" in #line B  includes all columns in the dataframe and hence
>>>>>>> should  not affect the final result.
>>>>>>>
>>>>>>>
>>>>>>> import pyspark.sql.functions as F
>>>>>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>>>>>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>>>>>>
>>>>>>> df = df.withColumnRenamed("k","kk")\
>>>>>>>   .select("ID","score","LABEL","kk")    #line B
>>>>>>>
>>>>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>>>>> ilter(F.col("nL")>1)
>>>>>>> df = df.join(df_t.select("ID"),["ID"])
>>>>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>>>>> "cnt1")
>>>>>>> df = df.join(df_sw, ["ID","kk"])
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

Posted by Alessandro Solimando <al...@gmail.com>.
Hi Shiyuan,
can you show us the output of ¨explain¨ over df (as a last step)?

On 11 April 2018 at 19:47, Shiyuan <gs...@gmail.com> wrote:

> Variable name binding is a python thing, and Spark should not care how the
> variable is named. What matters is the dependency graph. Spark fails to
> handle this dependency graph correctly for which I am quite surprised: this
> is just a simple combination of three very common sql operations.
>
>
> On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta <
> gourav.sengupta@gmail.com> wrote:
>
>> Hi Shiyuan,
>>
>> I do not know whether I am right, but I would prefer to avoid expressions
>> in Spark as:
>>
>> df = <<some transformation on df>>
>>
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gs...@gmail.com> wrote:
>>
>>> Here is the pretty print of the physical plan which reveals some details
>>> about what causes the bug (see the lines highlighted in bold):
>>> WithColumnRenamed() fails to update the dependency graph correctly:
>>>
>>>
>>> 'Resolved attribute(s) kk#144L missing from
>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>> the operation: kk. Please check if the right attribute(s) are used
>>>
>>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
>>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>>>    :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>    :  +- Join Inner, (ID#64 = ID#99)
>>>    :     :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>    :     :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>>>    :     :     +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>>>    :     +- Project [ID#99]
>>>    :        +- Filter (nL#90L > cast(1 as bigint))
>>>    :           +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100)
>>> AS nL#90L]
>>>    :              +- Project [ID#99, score#102, LABEL#100, kk#73L]
>>>    :                 +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
>>> score#102]
>>>    :                    +- LogicalRDD [ID#99, LABEL#100, k#101L,
>>> score#102]
>>>    +- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>>>       +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
>>> count#118L]
>>>          +- Project [ID#135, score#138, LABEL#136, kk#128L]
>>>             +- Join Inner, (ID#135 = ID#99)
>>>                :- Project [ID#135, score#138, LABEL#136, kk#128L]
>>>                :  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
>>> score#138]*
>>>                :     +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
>>>                +- Project [ID#99]
>>>                   +- Filter (nL#90L > cast(1 as bigint))
>>>                      +- Aggregate [ID#99], [ID#99, count(distinct
>>> LABEL#100) AS nL#90L]
>>>                         +- *!Project [ID#99, score#102, LABEL#100,
>>> kk#128L]*
>>>                            +-* Project [ID#99, LABEL#100, k#101L AS
>>> kk#73L, score#102]*
>>>                               +- LogicalRDD [ID#99, LABEL#100, k#101L,
>>> score#102]
>>>
>>> Here is the code which generates the error:
>>>
>>> import pyspark.sql.functions as F
>>> from pyspark.sql import Row
>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRename
>>> d("k","kk").select("ID","score","LABEL","kk")
>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>> ilter(F.col("nL")>1)
>>> df = df.join(df_t.select("ID"),["ID"])
>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>> "cnt1")
>>> df = df.join(df_sw, ["ID","kk"])
>>>
>>>
>>> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gs...@gmail.com> wrote:
>>>
>>>> The spark warning about Row instead of Dict is not the culprit. The
>>>> problem still persists after I use Row instead of Dict to generate the
>>>> dataframe.
>>>>
>>>> Here is the expain() output regarding the reassignment of df as Gourav
>>>> suggests to run, They look the same except that  the serial numbers
>>>> following the columns are different(eg. ID#7273 vs. ID#7344).
>>>>
>>>> this is the output of df.explain() after df =
>>>> df.join(df_t.select("ID"),["ID"])
>>>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
>>>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
>>>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
>>>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
>>>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
>>>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort
>>>> [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5)
>>>> Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
>>>> functions=[finalmerge_count(distinct merge count#7314L) AS
>>>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
>>>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
>>>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
>>>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
>>>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
>>>> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
>>>> isnotnull(ID#7303) +- *(3) Scan ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]
>>>>
>>>>
>>>> In comparison, this is the output of df1.explain() after  df1 =
>>>> df.join(df_t.select("ID"),["ID"])?
>>>> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
>>>> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
>>>> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344,
>>>> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS
>>>> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan
>>>> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort
>>>> [ID#7374 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5)
>>>> Filter (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374],
>>>> functions=[finalmerge_count(distinct merge count#7385L) AS
>>>> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +-
>>>> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct
>>>> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374,
>>>> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374,
>>>> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375],
>>>> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter
>>>> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375
>>>> ,k#7376L,score#7377]
>>>>
>>>>
>>>> Here is the code I run and the error I get in Spark 2.3.0. By looking
>>>> at the error,  the cause seems to be that  spark doesn't look up the column
>>>> by its name but by a serial number and  the serial number somehow is messed
>>>> up.
>>>>
>>>> import pyspark.sql.functions as F
>>>> from pyspark.sql import Row
>>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)])
>>>>
>>>> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
>>>>   #line B
>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>> ilter(F.col("nL")>1)
>>>> df = df.join(df_t.select("ID"),["ID"])
>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>> "cnt1")
>>>> df = df.join(df_sw, ["ID","kk"])
>>>>
>>>> This is the error:
>>>> 'Resolved attribute(s) kk#144L missing from
>>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>>> the operation: kk. Please check if the right attribute(s) are
>>>> used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, cnt1#140L]\n+- Join
>>>> Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- Project [ID#88,
>>>> score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = ID#118)\n : :-
>>>> Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project [ID#88,
>>>> LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, LABEL#89,
>>>> k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter (nL#110L >
>>>> cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, count(distinct
>>>> LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, LABEL#119,
>>>> kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n :
>>>> +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n +- Project
>>>> [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- Aggregate [ID#150,
>>>> kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- Project [ID#150,
>>>> score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = ID#118)\n :-
>>>> Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project [ID#150,
>>>> LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD [ID#150,
>>>> LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- Filter
>>>> (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118,
>>>> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121,
>>>> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
>>>> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n'
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <
>>>> gourav.sengupta@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> what I am curious about is the reassignment of df.
>>>>>
>>>>> Can you please look into the explain plan of df after the statement df
>>>>> = df.join(df_t.select("ID"),["ID"])? And then compare with the
>>>>> explain plan of df1 after the statement df1 = df.join(df_t.select("ID"),["ID
>>>>> "])?
>>>>>
>>>>> Its late here, but I am yet to go through this completely.  But I
>>>>> think that SPARK does throw a warning mentioning us to use Row instead of
>>>>> Dictionary.
>>>>>
>>>>> It will be of help if you could kindly try using the below statement
>>>>> and go through your used case once again (I am yet to go through all the
>>>>> lines):
>>>>>
>>>>>
>>>>>
>>>>> from pyspark.sql import Row
>>>>>
>>>>> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2),
>>>>> Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>>>>>
>>>>> Regards,
>>>>> Gourav Sengupta
>>>>>
>>>>>
>>>>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gs...@gmail.com> wrote:
>>>>>
>>>>>> Hi Spark Users,
>>>>>>     The following code snippet has an "attribute missing" error while
>>>>>> the attribute exists.  This bug is  triggered by a particular sequence of
>>>>>> of "select", "groupby" and "join".  Note that if I take away the "select"
>>>>>> in #line B,  the code runs without error.   However, the "select" in #line
>>>>>> B  includes all columns in the dataframe and hence should  not affect the
>>>>>> final result.
>>>>>>
>>>>>>
>>>>>> import pyspark.sql.functions as F
>>>>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>>>>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>>>>>
>>>>>> df = df.withColumnRenamed("k","kk")\
>>>>>>   .select("ID","score","LABEL","kk")    #line B
>>>>>>
>>>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>>>> ilter(F.col("nL")>1)
>>>>>> df = df.join(df_t.select("ID"),["ID"])
>>>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>>>> "cnt1")
>>>>>> df = df.join(df_sw, ["ID","kk"])
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

Posted by Shiyuan <gs...@gmail.com>.
Variable name binding is a python thing, and Spark should not care how the
variable is named. What matters is the dependency graph. Spark fails to
handle this dependency graph correctly for which I am quite surprised: this
is just a simple combination of three very common sql operations.


On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi Shiyuan,
>
> I do not know whether I am right, but I would prefer to avoid expressions
> in Spark as:
>
> df = <<some transformation on df>>
>
>
> Regards,
> Gourav Sengupta
>
> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gs...@gmail.com> wrote:
>
>> Here is the pretty print of the physical plan which reveals some details
>> about what causes the bug (see the lines highlighted in bold):
>> WithColumnRenamed() fails to update the dependency graph correctly:
>>
>>
>> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
>> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
>> with the same name appear in the operation: kk. Please check if the right
>> attribute(s) are used
>>
>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>>    :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>    :  +- Join Inner, (ID#64 = ID#99)
>>    :     :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>    :     :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>>    :     :     +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>>    :     +- Project [ID#99]
>>    :        +- Filter (nL#90L > cast(1 as bigint))
>>    :           +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS
>> nL#90L]
>>    :              +- Project [ID#99, score#102, LABEL#100, kk#73L]
>>    :                 +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
>> score#102]
>>    :                    +- LogicalRDD [ID#99, LABEL#100, k#101L,
>> score#102]
>>    +- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>>       +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
>> count#118L]
>>          +- Project [ID#135, score#138, LABEL#136, kk#128L]
>>             +- Join Inner, (ID#135 = ID#99)
>>                :- Project [ID#135, score#138, LABEL#136, kk#128L]
>>                :  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
>> score#138]*
>>                :     +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
>>                +- Project [ID#99]
>>                   +- Filter (nL#90L > cast(1 as bigint))
>>                      +- Aggregate [ID#99], [ID#99, count(distinct
>> LABEL#100) AS nL#90L]
>>                         +- *!Project [ID#99, score#102, LABEL#100,
>> kk#128L]*
>>                            +-* Project [ID#99, LABEL#100, k#101L AS
>> kk#73L, score#102]*
>>                               +- LogicalRDD [ID#99, LABEL#100, k#101L,
>> score#102]
>>
>> Here is the code which generates the error:
>>
>> import pyspark.sql.functions as F
>> from pyspark.sql import Row
>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=
>> 2),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRenam
>> ed("k","kk").select("ID","score","LABEL","kk")
>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).
>> filter(F.col("nL")>1)
>> df = df.join(df_t.select("ID"),["ID"])
>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>> "cnt1")
>> df = df.join(df_sw, ["ID","kk"])
>>
>>
>> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gs...@gmail.com> wrote:
>>
>>> The spark warning about Row instead of Dict is not the culprit. The
>>> problem still persists after I use Row instead of Dict to generate the
>>> dataframe.
>>>
>>> Here is the expain() output regarding the reassignment of df as Gourav
>>> suggests to run, They look the same except that  the serial numbers
>>> following the columns are different(eg. ID#7273 vs. ID#7344).
>>>
>>> this is the output of df.explain() after df =
>>> df.join(df_t.select("ID"),["ID"])
>>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
>>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
>>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
>>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
>>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
>>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort
>>> [ID#7303 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5)
>>> Filter (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
>>> functions=[finalmerge_count(distinct merge count#7314L) AS
>>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
>>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
>>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
>>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
>>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
>>> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
>>> isnotnull(ID#7303) +- *(3) Scan ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]
>>>
>>>
>>> In comparison, this is the output of df1.explain() after  df1 =
>>> df.join(df_t.select("ID"),["ID"])?
>>> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
>>> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
>>> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344,
>>> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS
>>> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan
>>> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort
>>> [ID#7374 ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5)
>>> Filter (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374],
>>> functions=[finalmerge_count(distinct merge count#7385L) AS
>>> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +-
>>> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct
>>> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374,
>>> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374,
>>> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375],
>>> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter
>>> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375
>>> ,k#7376L,score#7377]
>>>
>>>
>>> Here is the code I run and the error I get in Spark 2.3.0. By looking at
>>> the error,  the cause seems to be that  spark doesn't look up the column by
>>> its name but by a serial number and  the serial number somehow is messed
>>> up.
>>>
>>> import pyspark.sql.functions as F
>>> from pyspark.sql import Row
>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)])
>>>
>>> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
>>>   #line B
>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>> ilter(F.col("nL")>1)
>>> df = df.join(df_t.select("ID"),["ID"])
>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>> "cnt1")
>>> df = df.join(df_sw, ["ID","kk"])
>>>
>>> This is the error:
>>> 'Resolved attribute(s) kk#144L missing from
>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>> the operation: kk. Please check if the right attribute(s) are
>>> used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89, cnt1#140L]\n+- Join
>>> Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :- Project [ID#88,
>>> score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 = ID#118)\n : :-
>>> Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project [ID#88,
>>> LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88, LABEL#89,
>>> k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter (nL#110L >
>>> cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118, count(distinct
>>> LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121, LABEL#119,
>>> kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n :
>>> +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n +- Project
>>> [ID#150, kk#144L, count#134L AS cnt1#140L]\n +- Aggregate [ID#150,
>>> kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +- Project [ID#150,
>>> score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = ID#118)\n :-
>>> Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project [ID#150,
>>> LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD [ID#150,
>>> LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +- Filter
>>> (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118,
>>> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121,
>>> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
>>> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n'
>>>
>>>
>>>
>>>
>>> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <
>>> gourav.sengupta@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> what I am curious about is the reassignment of df.
>>>>
>>>> Can you please look into the explain plan of df after the statement df
>>>> = df.join(df_t.select("ID"),["ID"])? And then compare with the explain
>>>> plan of df1 after the statement df1 = df.join(df_t.select("ID"),["ID
>>>> "])?
>>>>
>>>> Its late here, but I am yet to go through this completely.  But I think
>>>> that SPARK does throw a warning mentioning us to use Row instead of
>>>> Dictionary.
>>>>
>>>> It will be of help if you could kindly try using the below statement
>>>> and go through your used case once again (I am yet to go through all the
>>>> lines):
>>>>
>>>>
>>>>
>>>> from pyspark.sql import Row
>>>>
>>>> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2),
>>>> Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>>>>
>>>> Regards,
>>>> Gourav Sengupta
>>>>
>>>>
>>>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gs...@gmail.com> wrote:
>>>>
>>>>> Hi Spark Users,
>>>>>     The following code snippet has an "attribute missing" error while
>>>>> the attribute exists.  This bug is  triggered by a particular sequence of
>>>>> of "select", "groupby" and "join".  Note that if I take away the "select"
>>>>> in #line B,  the code runs without error.   However, the "select" in #line
>>>>> B  includes all columns in the dataframe and hence should  not affect the
>>>>> final result.
>>>>>
>>>>>
>>>>> import pyspark.sql.functions as F
>>>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>>>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>>>>
>>>>> df = df.withColumnRenamed("k","kk")\
>>>>>   .select("ID","score","LABEL","kk")    #line B
>>>>>
>>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>>> ilter(F.col("nL")>1)
>>>>> df = df.join(df_t.select("ID"),["ID"])
>>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>>> "cnt1")
>>>>> df = df.join(df_sw, ["ID","kk"])
>>>>>
>>>>
>>>>
>>>
>>
>

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

Posted by Gourav Sengupta <go...@gmail.com>.
Hi Shiyuan,

I do not know whether I am right, but I would prefer to avoid expressions
in Spark as:

df = <<some transformation on df>>


Regards,
Gourav Sengupta

On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gs...@gmail.com> wrote:

> Here is the pretty print of the physical plan which reveals some details
> about what causes the bug (see the lines highlighted in bold):
> WithColumnRenamed() fails to update the dependency graph correctly:
>
>
> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
> with the same name appear in the operation: kk. Please check if the right
> attribute(s) are used
>
> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>    :- Project [ID#64, score#67, LABEL#65, kk#73L]
>    :  +- Join Inner, (ID#64 = ID#99)
>    :     :- Project [ID#64, score#67, LABEL#65, kk#73L]
>    :     :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>    :     :     +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>    :     +- Project [ID#99]
>    :        +- Filter (nL#90L > cast(1 as bigint))
>    :           +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS
> nL#90L]
>    :              +- Project [ID#99, score#102, LABEL#100, kk#73L]
>    :                 +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
> score#102]
>    :                    +- LogicalRDD [ID#99, LABEL#100, k#101L, score#102]
>    +- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>       +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
> count#118L]
>          +- Project [ID#135, score#138, LABEL#136, kk#128L]
>             +- Join Inner, (ID#135 = ID#99)
>                :- Project [ID#135, score#138, LABEL#136, kk#128L]
>                :  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
> score#138]*
>                :     +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
>                +- Project [ID#99]
>                   +- Filter (nL#90L > cast(1 as bigint))
>                      +- Aggregate [ID#99], [ID#99, count(distinct
> LABEL#100) AS nL#90L]
>                         +- *!Project [ID#99, score#102, LABEL#100,
> kk#128L]*
>                            +-* Project [ID#99, LABEL#100, k#101L AS
> kk#73L, score#102]*
>                               +- LogicalRDD [ID#99, LABEL#100, k#101L,
> score#102]
>
> Here is the code which generates the error:
>
> import pyspark.sql.functions as F
> from pyspark.sql import Row
> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,
> k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)]).
> withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("
> nL")).filter(F.col("nL")>1)
> df = df.join(df_t.select("ID"),["ID"])
> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
> df = df.join(df_sw, ["ID","kk"])
>
>
> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gs...@gmail.com> wrote:
>
>> The spark warning about Row instead of Dict is not the culprit. The
>> problem still persists after I use Row instead of Dict to generate the
>> dataframe.
>>
>> Here is the expain() output regarding the reassignment of df as Gourav
>> suggests to run, They look the same except that  the serial numbers
>> following the columns are different(eg. ID#7273 vs. ID#7344).
>>
>> this is the output of df.explain() after df =
>> df.join(df_t.select("ID"),["ID"])
>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
>> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
>> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
>> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort [ID#7303
>> ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) Filter
>> (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
>> functions=[finalmerge_count(distinct merge count#7314L) AS
>> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
>> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
>> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
>> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
>> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
>> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
>> isnotnull(ID#7303) +- *(3) Scan ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]
>>
>>
>> In comparison, this is the output of df1.explain() after  df1 =
>> df.join(df_t.select("ID"),["ID"])?
>> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
>> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
>> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344,
>> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS
>> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan
>> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort [ID#7374
>> ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5) Filter
>> (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374],
>> functions=[finalmerge_count(distinct merge count#7385L) AS
>> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +-
>> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct
>> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374,
>> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374,
>> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375],
>> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter
>> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#7375
>> ,k#7376L,score#7377]
>>
>>
>> Here is the code I run and the error I get in Spark 2.3.0. By looking at
>> the error,  the cause seems to be that  spark doesn't look up the column by
>> its name but by a serial number and  the serial number somehow is messed
>> up.
>>
>> import pyspark.sql.functions as F
>> from pyspark.sql import Row
>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=
>> 2),Row(score=1.0,ID='abc',LABEL=False,k=3)])
>>
>> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
>> #line B
>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).
>> filter(F.col("nL")>1)
>> df = df.join(df_t.select("ID"),["ID"])
>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>> "cnt1")
>> df = df.join(df_sw, ["ID","kk"])
>>
>> This is the error:
>> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
>> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
>> with the same name appear in the operation: kk. Please check if the right
>> attribute(s) are used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89,
>> cnt1#140L]\n+- Join Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :-
>> Project [ID#88, score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 =
>> ID#118)\n : :- Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project
>> [ID#88, LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88,
>> LABEL#89, k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter
>> (nL#110L > cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118,
>> count(distinct LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121,
>> LABEL#119, kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
>> score#121]\n : +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121],
>> false\n +- Project [ID#150, kk#144L, count#134L AS cnt1#140L]\n +-
>> Aggregate [ID#150, kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +-
>> Project [ID#150, score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 =
>> ID#118)\n :- Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project
>> [ID#150, LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD
>> [ID#150, LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +-
>> Filter (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118,
>> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121,
>> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
>> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n'
>>
>>
>>
>>
>> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <
>> gourav.sengupta@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> what I am curious about is the reassignment of df.
>>>
>>> Can you please look into the explain plan of df after the statement df =
>>> df.join(df_t.select("ID"),["ID"])? And then compare with the explain
>>> plan of df1 after the statement df1 = df.join(df_t.select("ID"),["ID"])?
>>>
>>> Its late here, but I am yet to go through this completely.  But I think
>>> that SPARK does throw a warning mentioning us to use Row instead of
>>> Dictionary.
>>>
>>> It will be of help if you could kindly try using the below statement and
>>> go through your used case once again (I am yet to go through all the lines):
>>>
>>>
>>>
>>> from pyspark.sql import Row
>>>
>>> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2),
>>> Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>>
>>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gs...@gmail.com> wrote:
>>>
>>>> Hi Spark Users,
>>>>     The following code snippet has an "attribute missing" error while
>>>> the attribute exists.  This bug is  triggered by a particular sequence of
>>>> of "select", "groupby" and "join".  Note that if I take away the "select"
>>>> in #line B,  the code runs without error.   However, the "select" in #line
>>>> B  includes all columns in the dataframe and hence should  not affect the
>>>> final result.
>>>>
>>>>
>>>> import pyspark.sql.functions as F
>>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>>>
>>>> df = df.withColumnRenamed("k","kk")\
>>>>   .select("ID","score","LABEL","kk")    #line B
>>>>
>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>> ilter(F.col("nL")>1)
>>>> df = df.join(df_t.select("ID"),["ID"])
>>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>>> "cnt1")
>>>> df = df.join(df_sw, ["ID","kk"])
>>>>
>>>
>>>
>>
>

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

Posted by Shiyuan <gs...@gmail.com>.
Here is the pretty print of the physical plan which reveals some details
about what causes the bug (see the lines highlighted in bold):
WithColumnRenamed() fails to update the dependency graph correctly:


'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
with the same name appear in the operation: kk. Please check if the right
attribute(s) are used

Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
+- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
   :- Project [ID#64, score#67, LABEL#65, kk#73L]
   :  +- Join Inner, (ID#64 = ID#99)
   :     :- Project [ID#64, score#67, LABEL#65, kk#73L]
   :     :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
   :     :     +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
   :     +- Project [ID#99]
   :        +- Filter (nL#90L > cast(1 as bigint))
   :           +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS
nL#90L]
   :              +- Project [ID#99, score#102, LABEL#100, kk#73L]
   :                 +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
score#102]
   :                    +- LogicalRDD [ID#99, LABEL#100, k#101L, score#102]
   +- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
      +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
count#118L]
         +- Project [ID#135, score#138, LABEL#136, kk#128L]
            +- Join Inner, (ID#135 = ID#99)
               :- Project [ID#135, score#138, LABEL#136, kk#128L]
               :  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
score#138]*
               :     +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
               +- Project [ID#99]
                  +- Filter (nL#90L > cast(1 as bigint))
                     +- Aggregate [ID#99], [ID#99, count(distinct
LABEL#100) AS nL#90L]
                        +- *!Project [ID#99, score#102, LABEL#100, kk#128L]*
                           +-* Project [ID#99, LABEL#100, k#101L AS kk#73L,
score#102]*
                              +- LogicalRDD [ID#99, LABEL#100, k#101L,
score#102]

Here is the code which generates the error:

import pyspark.sql.functions as F
from pyspark.sql import Row
df =
spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
df_t =
df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1)
df = df.join(df_t.select("ID"),["ID"])
df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
df = df.join(df_sw, ["ID","kk"])


On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gs...@gmail.com> wrote:

> The spark warning about Row instead of Dict is not the culprit. The
> problem still persists after I use Row instead of Dict to generate the
> dataframe.
>
> Here is the expain() output regarding the reassignment of df as Gourav
> suggests to run, They look the same except that  the serial numbers
> following the columns are different(eg. ID#7273 vs. ID#7344).
>
> this is the output of df.explain() after df = df.join(df_t.select("ID"),["
> ID"])
> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort [ID#7303
> ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) Filter
> (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
> functions=[finalmerge_count(distinct merge count#7314L) AS
> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
> isnotnull(ID#7303) +- *(3) Scan ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]
>
>
> In comparison, this is the output of df1.explain() after  df1 =
> df.join(df_t.select("ID"),["ID"])?
> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344,
> 200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS
> kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan
> ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort [ID#7374
> ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5) Filter
> (nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374],
> functions=[finalmerge_count(distinct merge count#7385L) AS
> count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +-
> *(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct
> LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374,
> LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374,
> LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375],
> functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter
> isnotnull(ID#7374) +- *(3) Scan ExistingRDD[ID#7374,LABEL#
> 7375,k#7376L,score#7377]
>
>
> Here is the code I run and the error I get in Spark 2.3.0. By looking at
> the error,  the cause seems to be that  spark doesn't look up the column by
> its name but by a serial number and  the serial number somehow is messed
> up.
>
> import pyspark.sql.functions as F
> from pyspark.sql import Row
> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,
> k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)])
>
> df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
> #line B
> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("
> nL")).filter(F.col("nL")>1)
> df = df.join(df_t.select("ID"),["ID"])
> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
> df = df.join(df_sw, ["ID","kk"])
>
> This is the error:
> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
> with the same name appear in the operation: kk. Please check if the right
> attribute(s) are used.;;\nProject [ID#88, kk#96L, score#91, LABEL#89,
> cnt1#140L]\n+- Join Inner, ((ID#88 = ID#150) && (kk#96L = kk#144L))\n :-
> Project [ID#88, score#91, LABEL#89, kk#96L]\n : +- Join Inner, (ID#88 =
> ID#118)\n : :- Project [ID#88, score#91, LABEL#89, kk#96L]\n : : +- Project
> [ID#88, LABEL#89, k#90L AS kk#96L, score#91]\n : : +- LogicalRDD [ID#88,
> LABEL#89, k#90L, score#91], false\n : +- Project [ID#118]\n : +- Filter
> (nL#110L > cast(1 as bigint))\n : +- Aggregate [ID#118], [ID#118,
> count(distinct LABEL#119) AS nL#110L]\n : +- Project [ID#118, score#121,
> LABEL#119, kk#96L]\n : +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
> score#121]\n : +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121],
> false\n +- Project [ID#150, kk#144L, count#134L AS cnt1#140L]\n +-
> Aggregate [ID#150, kk#144L], [ID#150, kk#144L, count(1) AS count#134L]\n +-
> Project [ID#150, score#153, LABEL#151, kk#144L]\n +- Join Inner, (ID#150 =
> ID#118)\n :- Project [ID#150, score#153, LABEL#151, kk#144L]\n : +- Project
> [ID#150, LABEL#151, k#152L AS kk#144L, score#153]\n : +- LogicalRDD
> [ID#150, LABEL#151, k#152L, score#153], false\n +- Project [ID#118]\n +-
> Filter (nL#110L > cast(1 as bigint))\n +- Aggregate [ID#118], [ID#118,
> count(distinct LABEL#119) AS nL#110L]\n +- !Project [ID#118, score#121,
> LABEL#119, kk#144L]\n +- Project [ID#118, LABEL#119, k#120L AS kk#96L,
> score#121]\n +- LogicalRDD [ID#118, LABEL#119, k#120L, score#121], false\n'
>
>
>
>
> On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <gourav.sengupta@gmail.com
> > wrote:
>
>> Hi,
>>
>> what I am curious about is the reassignment of df.
>>
>> Can you please look into the explain plan of df after the statement df =
>> df.join(df_t.select("ID"),["ID"])? And then compare with the explain
>> plan of df1 after the statement df1 = df.join(df_t.select("ID"),["ID"])?
>>
>> Its late here, but I am yet to go through this completely.  But I think
>> that SPARK does throw a warning mentioning us to use Row instead of
>> Dictionary.
>>
>> It will be of help if you could kindly try using the below statement and
>> go through your used case once again (I am yet to go through all the lines):
>>
>>
>>
>> from pyspark.sql import Row
>>
>> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2),
>> Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>>
>> Regards,
>> Gourav Sengupta
>>
>>
>> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gs...@gmail.com> wrote:
>>
>>> Hi Spark Users,
>>>     The following code snippet has an "attribute missing" error while
>>> the attribute exists.  This bug is  triggered by a particular sequence of
>>> of "select", "groupby" and "join".  Note that if I take away the "select"
>>> in #line B,  the code runs without error.   However, the "select" in #line
>>> B  includes all columns in the dataframe and hence should  not affect the
>>> final result.
>>>
>>>
>>> import pyspark.sql.functions as F
>>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>>
>>> df = df.withColumnRenamed("k","kk")\
>>>   .select("ID","score","LABEL","kk")    #line B
>>>
>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>> ilter(F.col("nL")>1)
>>> df = df.join(df_t.select("ID"),["ID"])
>>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>>> "cnt1")
>>> df = df.join(df_sw, ["ID","kk"])
>>>
>>
>>
>

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

Posted by Shiyuan <gs...@gmail.com>.
The spark warning about Row instead of Dict is not the culprit. The problem
still persists after I use Row instead of Dict to generate the dataframe.

Here is the expain() output regarding the reassignment of df as Gourav
suggests to run, They look the same except that  the serial numbers
following the columns are different(eg. ID#7273 vs. ID#7344).

this is the output of df.explain() after df =
df.join(df_t.select("ID"),["ID"])
== Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
[ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort [ID#7303
ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) Filter
(nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
functions=[finalmerge_count(distinct merge count#7314L) AS
count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
*(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
isnotnull(ID#7303) +- *(3) Scan
ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]

In comparison, this is the output of df1.explain() after  df1 =
df.join(df_t.select("ID"),["ID"])?
== Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
[ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344,
200) : +- *(1) Project [ID#7344, score#7347, LABEL#7345, k#7346L AS
kk#7352L] : +- *(1) Filter isnotnull(ID#7344) : +- *(1) Scan
ExistingRDD[ID#7344,LABEL#7345,k#7346L,score#7347] +- *(5) Sort [ID#7374
ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7374] +- *(5) Filter
(nL#7366L > 1) +- *(5) HashAggregate(keys=[ID#7374],
functions=[finalmerge_count(distinct merge count#7385L) AS
count(LABEL#7375)#7365L]) +- Exchange hashpartitioning(ID#7374, 200) +-
*(4) HashAggregate(keys=[ID#7374], functions=[partial_count(distinct
LABEL#7375) AS count#7385L]) +- *(4) HashAggregate(keys=[ID#7374,
LABEL#7375], functions=[]) +- Exchange hashpartitioning(ID#7374,
LABEL#7375, 200) +- *(3) HashAggregate(keys=[ID#7374, LABEL#7375],
functions=[]) +- *(3) Project [ID#7374, LABEL#7375] +- *(3) Filter
isnotnull(ID#7374) +- *(3) Scan
ExistingRDD[ID#7374,LABEL#7375,k#7376L,score#7377]


Here is the code I run and the error I get in Spark 2.3.0. By looking at
the error,  the cause seems to be that  spark doesn't look up the column by
its name but by a serial number and  the serial number somehow is messed
up.

import pyspark.sql.functions as F
from pyspark.sql import Row
df =
spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)])

df = df.withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
#line B
df_t =
df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1)
df = df.join(df_t.select("ID"),["ID"])
df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
df = df.join(df_sw, ["ID","kk"])

This is the error:
'Resolved attribute(s) kk#144L missing from
ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118, score#121,
LABEL#119, kk#144L]. Attribute(s) with the same name appear in the
operation: kk. Please check if the right attribute(s) are used.;;\nProject
[ID#88, kk#96L, score#91, LABEL#89, cnt1#140L]\n+- Join Inner, ((ID#88 =
ID#150) && (kk#96L = kk#144L))\n :- Project [ID#88, score#91, LABEL#89,
kk#96L]\n : +- Join Inner, (ID#88 = ID#118)\n : :- Project [ID#88,
score#91, LABEL#89, kk#96L]\n : : +- Project [ID#88, LABEL#89, k#90L AS
kk#96L, score#91]\n : : +- LogicalRDD [ID#88, LABEL#89, k#90L, score#91],
false\n : +- Project [ID#118]\n : +- Filter (nL#110L > cast(1 as bigint))\n
: +- Aggregate [ID#118], [ID#118, count(distinct LABEL#119) AS nL#110L]\n :
+- Project [ID#118, score#121, LABEL#119, kk#96L]\n : +- Project [ID#118,
LABEL#119, k#120L AS kk#96L, score#121]\n : +- LogicalRDD [ID#118,
LABEL#119, k#120L, score#121], false\n +- Project [ID#150, kk#144L,
count#134L AS cnt1#140L]\n +- Aggregate [ID#150, kk#144L], [ID#150,
kk#144L, count(1) AS count#134L]\n +- Project [ID#150, score#153,
LABEL#151, kk#144L]\n +- Join Inner, (ID#150 = ID#118)\n :- Project
[ID#150, score#153, LABEL#151, kk#144L]\n : +- Project [ID#150, LABEL#151,
k#152L AS kk#144L, score#153]\n : +- LogicalRDD [ID#150, LABEL#151, k#152L,
score#153], false\n +- Project [ID#118]\n +- Filter (nL#110L > cast(1 as
bigint))\n +- Aggregate [ID#118], [ID#118, count(distinct LABEL#119) AS
nL#110L]\n +- !Project [ID#118, score#121, LABEL#119, kk#144L]\n +- Project
[ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n +- LogicalRDD [ID#118,
LABEL#119, k#120L, score#121], false\n'




On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <go...@gmail.com>
wrote:

> Hi,
>
> what I am curious about is the reassignment of df.
>
> Can you please look into the explain plan of df after the statement df =
> df.join(df_t.select("ID"),["ID"])? And then compare with the explain plan
> of df1 after the statement df1 = df.join(df_t.select("ID"),["ID"])?
>
> Its late here, but I am yet to go through this completely.  But I think
> that SPARK does throw a warning mentioning us to use Row instead of
> Dictionary.
>
> It will be of help if you could kindly try using the below statement and
> go through your used case once again (I am yet to go through all the lines):
>
>
>
> from pyspark.sql import Row
>
> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2),
> Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>
> Regards,
> Gourav Sengupta
>
>
> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gs...@gmail.com> wrote:
>
>> Hi Spark Users,
>>     The following code snippet has an "attribute missing" error while the
>> attribute exists.  This bug is  triggered by a particular sequence of of
>> "select", "groupby" and "join".  Note that if I take away the "select"  in
>> #line B,  the code runs without error.   However, the "select" in #line B
>> includes all columns in the dataframe and hence should  not affect the
>> final result.
>>
>>
>> import pyspark.sql.functions as F
>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>
>> df = df.withColumnRenamed("k","kk")\
>>   .select("ID","score","LABEL","kk")    #line B
>>
>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).
>> filter(F.col("nL")>1)
>> df = df.join(df_t.select("ID"),["ID"])
>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>> "cnt1")
>> df = df.join(df_sw, ["ID","kk"])
>>
>
>

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

Posted by Gourav Sengupta <go...@gmail.com>.
Hi,

what I am curious about is the reassignment of df.

Can you please look into the explain plan of df after the statement df =
df.join(df_t.select("ID"),["ID"])? And then compare with the explain plan
of df1 after the statement df1 = df.join(df_t.select("ID"),["ID"])?

Its late here, but I am yet to go through this completely.  But I think
that SPARK does throw a warning mentioning us to use Row instead of
Dictionary.

It will be of help if you could kindly try using the below statement and go
through your used case once again (I am yet to go through all the lines):



from pyspark.sql import Row

df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2),
Row(score = 1.0,ID="abc",LABEL=True,k=3)])

Regards,
Gourav Sengupta


On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gs...@gmail.com> wrote:

> Hi Spark Users,
>     The following code snippet has an "attribute missing" error while the
> attribute exists.  This bug is  triggered by a particular sequence of of
> "select", "groupby" and "join".  Note that if I take away the "select"  in
> #line B,  the code runs without error.   However, the "select" in #line B
> includes all columns in the dataframe and hence should  not affect the
> final result.
>
>
> import pyspark.sql.functions as F
> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':
> True,'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>
> df = df.withColumnRenamed("k","kk")\
>   .select("ID","score","LABEL","kk")    #line B
>
> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("
> nL")).filter(F.col("nL")>1)
> df = df.join(df_t.select("ID"),["ID"])
> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
> df = df.join(df_sw, ["ID","kk"])
>