You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Luke (Jira)" <ji...@apache.org> on 2022/08/22 20:14:00 UTC

[jira] [Created] (SPARK-40181) DataFrame.intersect and .intersectAll are inconsistently dropping rows

Luke created SPARK-40181:
----------------------------

             Summary: DataFrame.intersect and .intersectAll are inconsistently dropping rows
                 Key: SPARK-40181
                 URL: https://issues.apache.org/jira/browse/SPARK-40181
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.0.1
            Reporter: Luke


I don't have a minimal reproducible example for this, but the place where it shows up in our workflow is very simple.

The data in "COLUMN" are a few hundred million distinct strings (gets deduplicated in the plan also) and it is being compared against itself using intersect.

The code that is failing is essentially:
{quote}values = [...] # python list containing many unique strings, none of which are None

df = spark.createDataFrame(
    spark.sparkContext.parallelize(
        [(value,) for value in values], numSlices=2 + len(values) // 10000
    ),
    schema=StructType([StructField("COLUMN", StringType())]),
)

df = df.distinct()

assert df.count() == df.intersect(df).count()

assert df.count() == df.intersectAll(df).count()
{quote}
The issue is that both of the above asserts sometimes pass, and sometimes fail (technically we haven't seen intersectAll pass yet, but we have only tried a few times). One thing which is striking is that if you call df.intersect(df).count() multiple times, the returned count is not always the same. Sometimes it is exactly df.count(), sometimes it is ~1% lower, but how much lower exactly seems random.

In particular, we have called df.intersect(df).count() twice in a row, and got two different counts, which is very surprising given that df should be deterministic, and suggests maybe there is some kind of concurrency/inconsistent hashing issue?

One other thing which is possibly noteworthy is that using df.join(df, df.columns, how="inner") does seem to reliably have the desired behavior (not dropping any rows).

Here is the resulting plan from df.intersect(df)
{quote}== Parsed Logical Plan ==
'Intersect false
:- Deduplicate [COLUMN#144487]
:  +- LogicalRDD [COLUMN#144487], false
+- Deduplicate [COLUMN#144487]
   +- LogicalRDD [COLUMN#144487], false

== Analyzed Logical Plan ==
COLUMN: string
Intersect false
:- Deduplicate [COLUMN#144487]
:  +- LogicalRDD [COLUMN#144487], false
+- Deduplicate [COLUMN#144523]
   +- LogicalRDD [COLUMN#144523], false

== Optimized Logical Plan ==
Aggregate [COLUMN#144487], [COLUMN#144487]
+- Join LeftSemi, (COLUMN#144487 <=> COLUMN#144523)
   :- LogicalRDD [COLUMN#144487], false
   +- Aggregate [COLUMN#144523], [COLUMN#144523]
      +- LogicalRDD [COLUMN#144523], false

== Physical Plan ==
*(7) HashAggregate(keys=[COLUMN#144487], functions=[], output=[COLUMN#144487])
+- Exchange hashpartitioning(COLUMN#144487, 200), true, [id=#22790]
   +- *(6) HashAggregate(keys=[COLUMN#144487], functions=[], output=[COLUMN#144487])
      +- *(6) SortMergeJoin [coalesce(COLUMN#144487, ), isnull(COLUMN#144487)], [coalesce(COLUMN#144523, ), isnull(COLUMN#144523)], LeftSemi
         :- *(2) Sort [coalesce(COLUMN#144487, ) ASC NULLS FIRST, isnull(COLUMN#144487) ASC NULLS FIRST], false, 0
         :  +- Exchange hashpartitioning(coalesce(COLUMN#144487, ), isnull(COLUMN#144487), 200), true, [id=#22772]
         :     +- *(1) Scan ExistingRDD[COLUMN#144487]
         +- *(5) Sort [coalesce(COLUMN#144523, ) ASC NULLS FIRST, isnull(COLUMN#144523) ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(coalesce(COLUMN#144523, ), isnull(COLUMN#144523), 200), true, [id=#22782]
               +- *(4) HashAggregate(keys=[COLUMN#144523], functions=[], output=[COLUMN#144523])
                  +- Exchange hashpartitioning(COLUMN#144523, 200), true, [id=#22778]
                     +- *(3) HashAggregate(keys=[COLUMN#144523], functions=[], output=[COLUMN#144523])
                        +- *(3) Scan ExistingRDD[COLUMN#144523]
{quote}
and for df.intersectAll(df)
{quote}== Parsed Logical Plan ==
'IntersectAll true
:- Deduplicate [COLUMN#144487]
:  +- LogicalRDD [COLUMN#144487], false
+- Deduplicate [COLUMN#144487]
   +- LogicalRDD [COLUMN#144487], false

== Analyzed Logical Plan ==
COLUMN: string
IntersectAll true
:- Deduplicate [COLUMN#144487]
:  +- LogicalRDD [COLUMN#144487], false
+- Deduplicate [COLUMN#144533]
   +- LogicalRDD [COLUMN#144533], false

== Optimized Logical Plan ==
Project [COLUMN#144487]
+- Generate replicaterows(min_count#144566L, COLUMN#144487), [1], false, [COLUMN#144487]
   +- Project [COLUMN#144487, if ((vcol1_count#144563L > vcol2_count#144565L)) vcol2_count#144565L else vcol1_count#144563L AS min_count#144566L]
      +- Filter ((vcol1_count#144563L >= 1) AND (vcol2_count#144565L >= 1))
         +- Aggregate [COLUMN#144487], [count(vcol1#144558) AS vcol1_count#144563L, count(vcol2#144561) AS vcol2_count#144565L, COLUMN#144487]
            +- Union
               :- Aggregate [COLUMN#144487], [true AS vcol1#144558, null AS vcol2#144561, COLUMN#144487]
               :  +- LogicalRDD [COLUMN#144487], false
               +- Aggregate [COLUMN#144533], [null AS vcol1#144559, true AS vcol2#144560, COLUMN#144533]
                  +- LogicalRDD [COLUMN#144533], false

== Physical Plan ==
*(7) Project [COLUMN#144487]
+- Generate replicaterows(min_count#144566L, COLUMN#144487), [COLUMN#144487], false, [COLUMN#144487]
   +- *(6) Project [COLUMN#144487, if ((vcol1_count#144563L > vcol2_count#144565L)) vcol2_count#144565L else vcol1_count#144563L AS min_count#144566L]
      +- *(6) Filter ((vcol1_count#144563L >= 1) AND (vcol2_count#144565L >= 1))
         +- *(6) HashAggregate(keys=[COLUMN#144487], functions=[count(vcol1#144558), count(vcol2#144561)], output=[vcol1_count#144563L, vcol2_count#144565L, COLUMN#144487])
            +- Exchange hashpartitioning(COLUMN#144487, 200), true, [id=#23310]
               +- *(5) HashAggregate(keys=[COLUMN#144487], functions=[partial_count(vcol1#144558), partial_count(vcol2#144561)], output=[COLUMN#144487, count#144569L, count#144570L])
                  +- Union
                     :- *(2) HashAggregate(keys=[COLUMN#144487], functions=[], output=[vcol1#144558, vcol2#144561, COLUMN#144487])
                     :  +- Exchange hashpartitioning(COLUMN#144487, 200), true, [id=#23267]
                     :     +- *(1) HashAggregate(keys=[COLUMN#144487], functions=[], output=[COLUMN#144487])
                     :        +- *(1) Scan ExistingRDD[COLUMN#144487]
                     +- *(4) HashAggregate(keys=[COLUMN#144533], functions=[], output=[vcol1#144559, vcol2#144560, COLUMN#144533])
                        +- ReusedExchange [COLUMN#144533], Exchange hashpartitioning(COLUMN#144487, 200), true, [id=#23267] 
{quote}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org