You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by 李斌松 <li...@gmail.com> on 2018/03/26 13:18:20 UTC

spark 2.3 dataframe join bug

Hi, sparks:
     I'm using spark2.3 and had found a bug in spark dataframe, here is my
codes:

        sc = sparkSession.sparkContext
        tmp = sparkSession.createDataFrame(sc.parallelize([[1, 2, 3, 4],
[1, 2, 5, 6], [2, 3, 4, 5], [2, 3, 5, 6]])).toDF('a', 'b', 'c', 'd')
        tmp.createOrReplaceTempView('tdl_spark_test')
        sparkSession.sql('cache table tdl_spark_test')

        df = sparkSession.sql('select a, b from tdl_spark_test group by a,
b')
        df.printSchema()

        df1 = sparkSession.sql('select a, b, collect_set(array(c)) as c
from tdl_spark_test group by a, b')
        df1 = df1.withColumnRenamed('a', 'a1').withColumnRenamed('b', 'b1')
        cond = [df.a==df1.a1, df.b==df1.b1]
        df = df.join(df1, cond, 'inner').drop('a1', 'b1')

        df2 = sparkSession.sql('select a, b, collect_set(array(d)) as d
from tdl_spark_test group by a, b')
        df2 = df2.withColumnRenamed('a', 'a1').withColumnRenamed('b', 'b1')
        cond = [df.a==df2.a1, df.b==df2.b1]
        df = df.join(df2, cond, 'inner').drop('a1', 'b1')

        df.show()
        sparkSession.sql('uncache table tdl_spark_test')


        as you can see, the above code is just create a dataframe and two
child dataframe,the expected answer is that:
       +---+---+----------+----------+
        |  a|  b  |         c   |         d   |
       +---+---+----------+----------+
        |  2|  3  |[[5], [4]]|[[5], [6]] |
        |  1|  2  |[[5], [3]]|[[6], [4]] |
       +---+---+----------+----------+

        however,we had got the unexpected answer:
        +---+---+----------+----------+
         |  a  |  b |         c   |         d  |
        +---+---+----------+----------+
         |  2|  3  |[[5], [4]]|[[5], [4]] |
         |  1|  2  |[[5], [3]]|[[5], [3]] |
        +---+---+----------+----------+

         it seems that the column of the first dataframe had coverd the
column of the second dataframe.

         In addition, this error occurred as long as the following options
occurred at the same time:
         1. the first root table is cached.
         2. the "group by" is used in child dataframe.
         3. the "array" is used in "collect_set" in child dataframe.
         4. the join condition is "df.a==df2.a1, df.b==df2.b1" instead of
"['a', 'b']"