You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Shiyuan <gs...@gmail.com> on 2018/03/21 04:58:21 UTC

strange behavior of joining dataframes

Hi Spark-users:
I have a dataframe "df_t" which was generated from other dataframes by
several transformations. And then I  did something very simple,  just
counting the rows, that is the following code:

(A)
df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
"cnt1")
df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
df_t_3 = df_t_1.join(df_t_2, ["Id"])
df_t.join(df_t_3, ["Id","key"])

When I run this query, I got the error that  "key" is missing during
joining. However, the column "key" is clearly in the dataframe dt.  What is
strange is that: if I first do this:

 data = df_t.collect(); df_t = spark.createDataFrame(data);  (B)

then (A) can run without error.  However,  the code (B) should not change
the dataframe dt_t at all.  Why the snippet (A) can run with (B) but
failed without (B)?  Also, A different joining sequence can also complete
without error:

(C)
df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
"cnt1")
df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
df_t.join(df_t_1, ["Id","key"]).join(df_t_2, ["Id"])

But (A) and (C) are conceptually the same and  should produce the same
result.      What could possibly go wrong here?  Any hints to track down
the problem is appreciated.  I am using spark 2.1.

Re: strange behavior of joining dataframes

Posted by Shiyuan <gs...@gmail.com>.
Here is a simple example that reproduces the problem. This code has a
missing attribute('kk') error.  Is it  a bug?     Note that if the `select`
in line B is removed, this code would run.

import pyspark.sql.functions as F
df =
spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])

df = df.withColumnRenamed("k","kk")\
  .select("ID","score","LABEL","kk")    #line B

df_t =
df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1)
df = df.join(df_t.select("ID"),["ID"])
df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
df = df.join(df_sw, ["ID","kk"])

On Tue, Mar 20, 2018 at 9:58 PM, Shiyuan <gs...@gmail.com> wrote:

> Hi Spark-users:
> I have a dataframe "df_t" which was generated from other dataframes by
> several transformations. And then I  did something very simple,  just
> counting the rows, that is the following code:
>
> (A)
> df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
> "cnt1")
> df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
> df_t_3 = df_t_1.join(df_t_2, ["Id"])
> df_t.join(df_t_3, ["Id","key"])
>
> When I run this query, I got the error that  "key" is missing during
> joining. However, the column "key" is clearly in the dataframe dt.  What is
> strange is that: if I first do this:
>
>  data = df_t.collect(); df_t = spark.createDataFrame(data);  (B)
>
> then (A) can run without error.  However,  the code (B) should not change
> the dataframe dt_t at all.  Why the snippet (A) can run with (B) but
> failed without (B)?  Also, A different joining sequence can also complete
> without error:
>
> (C)
> df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
> "cnt1")
> df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
> df_t.join(df_t_1, ["Id","key"]).join(df_t_2, ["Id"])
>
> But (A) and (C) are conceptually the same and  should produce the same
> result.      What could possibly go wrong here?  Any hints to track down
> the problem is appreciated.  I am using spark 2.1.
>
>
>
>
>