You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Antoine Wendlinger <aw...@mytraffic.fr> on 2020/08/04 12:57:22 UTC

Renaming a DataFrame column makes Spark lose partitioning information

Hi,

When renaming a DataFrame column, it looks like Spark is forgetting the
partition information:

    Seq((1, 2))
      .toDF("a", "b")
      .repartition($"b")
      .withColumnRenamed("b", "c")
      .repartition($"c")
      .explain()

Gives the following plan:

    == Physical Plan ==
    Exchange hashpartitioning(c#40, 10)
    +- *(1) Project [a#36, b#37 AS c#40]
       +- Exchange hashpartitioning(b#37, 10)
          +- LocalTableScan [a#36, b#37]

As you can see, two shuffles are done, but the second one is unnecessary.
Is there a reason I don't know for this behavior ? Is there a way to work
around it (other than not renaming my columns) ?

I'm using Spark 2.4.3.


Thanks for your help,

Antoine

Re: Renaming a DataFrame column makes Spark lose partitioning information

Posted by Antoine Wendlinger <aw...@mytraffic.fr>.
Well that's great ! Thank you very much :)


Antoine

On Tue, Aug 4, 2020 at 11:22 PM Terry Kim <yu...@gmail.com> wrote:

> This is fixed in Spark 3.0 by https://github.com/apache/spark/pull/26943:
>
> scala> :paste
> // Entering paste mode (ctrl-D to finish)
>
> Seq((1, 2))
>       .toDF("a", "b")
>       .repartition($"b")
>       .withColumnRenamed("b", "c")
>       .repartition($"c")
>       .explain()
>
> // Exiting paste mode, now interpreting.
>
> == Physical Plan ==
> *(1) Project [a#7, b#8 AS c#11]
> +- Exchange hashpartitioning(b#8, 200), false, [id=#12]
>    +- LocalTableScan [a#7, b#8]
>
> Thanks,
> Terry
>
> On Tue, Aug 4, 2020 at 6:26 AM Antoine Wendlinger <
> awendlinger@mytraffic.fr> wrote:
>
>> Hi,
>>
>> When renaming a DataFrame column, it looks like Spark is forgetting the
>> partition information:
>>
>>     Seq((1, 2))
>>       .toDF("a", "b")
>>       .repartition($"b")
>>       .withColumnRenamed("b", "c")
>>       .repartition($"c")
>>       .explain()
>>
>> Gives the following plan:
>>
>>     == Physical Plan ==
>>     Exchange hashpartitioning(c#40, 10)
>>     +- *(1) Project [a#36, b#37 AS c#40]
>>        +- Exchange hashpartitioning(b#37, 10)
>>           +- LocalTableScan [a#36, b#37]
>>
>> As you can see, two shuffles are done, but the second one is unnecessary.
>> Is there a reason I don't know for this behavior ? Is there a way to work
>> around it (other than not renaming my columns) ?
>>
>> I'm using Spark 2.4.3.
>>
>>
>> Thanks for your help,
>>
>> Antoine
>>
>

Re: Renaming a DataFrame column makes Spark lose partitioning information

Posted by Terry Kim <yu...@gmail.com>.
This is fixed in Spark 3.0 by https://github.com/apache/spark/pull/26943:

scala> :paste
// Entering paste mode (ctrl-D to finish)

Seq((1, 2))
      .toDF("a", "b")
      .repartition($"b")
      .withColumnRenamed("b", "c")
      .repartition($"c")
      .explain()

// Exiting paste mode, now interpreting.

== Physical Plan ==
*(1) Project [a#7, b#8 AS c#11]
+- Exchange hashpartitioning(b#8, 200), false, [id=#12]
   +- LocalTableScan [a#7, b#8]

Thanks,
Terry

On Tue, Aug 4, 2020 at 6:26 AM Antoine Wendlinger <aw...@mytraffic.fr>
wrote:

> Hi,
>
> When renaming a DataFrame column, it looks like Spark is forgetting the
> partition information:
>
>     Seq((1, 2))
>       .toDF("a", "b")
>       .repartition($"b")
>       .withColumnRenamed("b", "c")
>       .repartition($"c")
>       .explain()
>
> Gives the following plan:
>
>     == Physical Plan ==
>     Exchange hashpartitioning(c#40, 10)
>     +- *(1) Project [a#36, b#37 AS c#40]
>        +- Exchange hashpartitioning(b#37, 10)
>           +- LocalTableScan [a#36, b#37]
>
> As you can see, two shuffles are done, but the second one is unnecessary.
> Is there a reason I don't know for this behavior ? Is there a way to work
> around it (other than not renaming my columns) ?
>
> I'm using Spark 2.4.3.
>
>
> Thanks for your help,
>
> Antoine
>