You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Tony Jin <li...@gmail.com> on 2016/05/11 13:55:08 UTC

dataframe udf functioin will be executed twice when filter on new column created by withColumn

Hi guys,

I have a problem about spark DataFrame. My spark version is 1.6.1.
Basically, i used udf and df.withColumn to create a "new" column, and then
i filter the values on this new columns and call show(action). I see the
udf function (which is used to by withColumn to create the new column) is
called twice(duplicated). And if filter on "old" column, udf only run once
which is expected. I attached the example codes, line 30~38 shows the
problem.

 Anyone knows the internal reason? Can you give me any advices? Thank you
very much.


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
df: org.apache.spark.sql.DataFrame = [old: string, old1: string]

scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
udfFunc: org.apache.spark.sql.UserDefinedFunction =
UserDefinedFunction(<function1>,StringType,List(StringType))

scala> val newDF = df.withColumn("new", udfFunc(df("old")))
newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]

scala> newDF.show
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
| a1|  b1| a1|
+---+----+---+


scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string,
old1: string, new: string]

scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string,
old1: string, new: string]

scala> filteredOnNewColumnDF.show
running udf(a)
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+


scala> filteredOnOldColumnDF.show
running udf(a)
+---+----+---+
|old|old1|new|
+---+----+---+
|  a|   b|  a|
+---+----+---+



Best wishes.
By Linbo

Re: dataframe udf functioin will be executed twice when filter on new column created by withColumn

Posted by James Hammerton <ja...@gluru.co>.
This may be related to: https://issues.apache.org/jira/browse/SPARK-13773

Regards,

James

On 11 May 2016 at 15:49, Ted Yu <yu...@gmail.com> wrote:

> In master branch, behavior is the same.
>
> Suggest opening a JIRA if you haven't done so.
>
> On Wed, May 11, 2016 at 6:55 AM, Tony Jin <li...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I have a problem about spark DataFrame. My spark version is 1.6.1.
>> Basically, i used udf and df.withColumn to create a "new" column, and
>> then i filter the values on this new columns and call show(action). I see
>> the udf function (which is used to by withColumn to create the new column)
>> is called twice(duplicated). And if filter on "old" column, udf only run
>> once which is expected. I attached the example codes, line 30~38 shows the
>> problem.
>>
>>  Anyone knows the internal reason? Can you give me any advices? Thank you
>> very much.
>>
>>
>> 1
>> 2
>> 3
>> 4
>> 5
>> 6
>> 7
>> 8
>> 9
>> 10
>> 11
>> 12
>> 13
>> 14
>> 15
>> 16
>> 17
>> 18
>> 19
>> 20
>> 21
>> 22
>> 23
>> 24
>> 25
>> 26
>> 27
>> 28
>> 29
>> 30
>> 31
>> 32
>> 33
>> 34
>> 35
>> 36
>> 37
>> 38
>> 39
>> 40
>> 41
>> 42
>> 43
>> 44
>> 45
>> 46
>> 47
>>
>> scala> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.functions._
>>
>> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
>> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
>>
>> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
>> udfFunc: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,List(StringType))
>>
>> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
>> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>>
>> scala> newDF.show
>> running udf(a)
>> running udf(a1)
>> +---+----+---+
>> |old|old1|new|
>> +---+----+---+
>> |  a|   b|  a|
>> | a1|  b1| a1|
>> +---+----+---+
>>
>>
>> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
>> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>>
>> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
>> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>>
>> scala> filteredOnNewColumnDF.show
>> running udf(a)
>> running udf(a)
>> running udf(a1)
>> +---+----+---+
>> |old|old1|new|
>> +---+----+---+
>> |  a|   b|  a|
>> +---+----+---+
>>
>>
>> scala> filteredOnOldColumnDF.show
>> running udf(a)
>> +---+----+---+
>> |old|old1|new|
>> +---+----+---+
>> |  a|   b|  a|
>> +---+----+---+
>>
>>
>>
>> Best wishes.
>> By Linbo
>>
>>
>

Re: dataframe udf functioin will be executed twice when filter on new column created by withColumn

Posted by James Hammerton <ja...@gluru.co>.
This may be related to: https://issues.apache.org/jira/browse/SPARK-13773

Regards,

James

On 11 May 2016 at 15:49, Ted Yu <yu...@gmail.com> wrote:

> In master branch, behavior is the same.
>
> Suggest opening a JIRA if you haven't done so.
>
> On Wed, May 11, 2016 at 6:55 AM, Tony Jin <li...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I have a problem about spark DataFrame. My spark version is 1.6.1.
>> Basically, i used udf and df.withColumn to create a "new" column, and
>> then i filter the values on this new columns and call show(action). I see
>> the udf function (which is used to by withColumn to create the new column)
>> is called twice(duplicated). And if filter on "old" column, udf only run
>> once which is expected. I attached the example codes, line 30~38 shows the
>> problem.
>>
>>  Anyone knows the internal reason? Can you give me any advices? Thank you
>> very much.
>>
>>
>> 1
>> 2
>> 3
>> 4
>> 5
>> 6
>> 7
>> 8
>> 9
>> 10
>> 11
>> 12
>> 13
>> 14
>> 15
>> 16
>> 17
>> 18
>> 19
>> 20
>> 21
>> 22
>> 23
>> 24
>> 25
>> 26
>> 27
>> 28
>> 29
>> 30
>> 31
>> 32
>> 33
>> 34
>> 35
>> 36
>> 37
>> 38
>> 39
>> 40
>> 41
>> 42
>> 43
>> 44
>> 45
>> 46
>> 47
>>
>> scala> import org.apache.spark.sql.functions._
>> import org.apache.spark.sql.functions._
>>
>> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
>> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
>>
>> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
>> udfFunc: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,List(StringType))
>>
>> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
>> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>>
>> scala> newDF.show
>> running udf(a)
>> running udf(a1)
>> +---+----+---+
>> |old|old1|new|
>> +---+----+---+
>> |  a|   b|  a|
>> | a1|  b1| a1|
>> +---+----+---+
>>
>>
>> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
>> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>>
>> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
>> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>>
>> scala> filteredOnNewColumnDF.show
>> running udf(a)
>> running udf(a)
>> running udf(a1)
>> +---+----+---+
>> |old|old1|new|
>> +---+----+---+
>> |  a|   b|  a|
>> +---+----+---+
>>
>>
>> scala> filteredOnOldColumnDF.show
>> running udf(a)
>> +---+----+---+
>> |old|old1|new|
>> +---+----+---+
>> |  a|   b|  a|
>> +---+----+---+
>>
>>
>>
>> Best wishes.
>> By Linbo
>>
>>
>

Re: dataframe udf functioin will be executed twice when filter on new column created by withColumn

Posted by Ted Yu <yu...@gmail.com>.
In master branch, behavior is the same.

Suggest opening a JIRA if you haven't done so.

On Wed, May 11, 2016 at 6:55 AM, Tony Jin <li...@gmail.com> wrote:

> Hi guys,
>
> I have a problem about spark DataFrame. My spark version is 1.6.1.
> Basically, i used udf and df.withColumn to create a "new" column, and then
> i filter the values on this new columns and call show(action). I see the
> udf function (which is used to by withColumn to create the new column) is
> called twice(duplicated). And if filter on "old" column, udf only run once
> which is expected. I attached the example codes, line 30~38 shows the
> problem.
>
>  Anyone knows the internal reason? Can you give me any advices? Thank you
> very much.
>
>
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9
> 10
> 11
> 12
> 13
> 14
> 15
> 16
> 17
> 18
> 19
> 20
> 21
> 22
> 23
> 24
> 25
> 26
> 27
> 28
> 29
> 30
> 31
> 32
> 33
> 34
> 35
> 36
> 37
> 38
> 39
> 40
> 41
> 42
> 43
> 44
> 45
> 46
> 47
>
> scala> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions._
>
> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
>
> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
> udfFunc: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,List(StringType))
>
> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>
> scala> newDF.show
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> | a1|  b1| a1|
> +---+----+---+
>
>
> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>
> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>
> scala> filteredOnNewColumnDF.show
> running udf(a)
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
>
>
> scala> filteredOnOldColumnDF.show
> running udf(a)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
>
>
>
> Best wishes.
> By Linbo
>
>

Re: dataframe udf functioin will be executed twice when filter on new column created by withColumn

Posted by Ted Yu <yu...@gmail.com>.
In master branch, behavior is the same.

Suggest opening a JIRA if you haven't done so.

On Wed, May 11, 2016 at 6:55 AM, Tony Jin <li...@gmail.com> wrote:

> Hi guys,
>
> I have a problem about spark DataFrame. My spark version is 1.6.1.
> Basically, i used udf and df.withColumn to create a "new" column, and then
> i filter the values on this new columns and call show(action). I see the
> udf function (which is used to by withColumn to create the new column) is
> called twice(duplicated). And if filter on "old" column, udf only run once
> which is expected. I attached the example codes, line 30~38 shows the
> problem.
>
>  Anyone knows the internal reason? Can you give me any advices? Thank you
> very much.
>
>
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> 8
> 9
> 10
> 11
> 12
> 13
> 14
> 15
> 16
> 17
> 18
> 19
> 20
> 21
> 22
> 23
> 24
> 25
> 26
> 27
> 28
> 29
> 30
> 31
> 32
> 33
> 34
> 35
> 36
> 37
> 38
> 39
> 40
> 41
> 42
> 43
> 44
> 45
> 46
> 47
>
> scala> import org.apache.spark.sql.functions._
> import org.apache.spark.sql.functions._
>
> scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
> df: org.apache.spark.sql.DataFrame = [old: string, old1: string]
>
> scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
> udfFunc: org.apache.spark.sql.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,List(StringType))
>
> scala> val newDF = df.withColumn("new", udfFunc(df("old")))
> newDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>
> scala> newDF.show
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> | a1|  b1| a1|
> +---+----+---+
>
>
> scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
> filteredOnNewColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>
> scala> val filteredOnOldColumnDF = newDF.filter("old <> 'a1'")
> filteredOnOldColumnDF: org.apache.spark.sql.DataFrame = [old: string, old1: string, new: string]
>
> scala> filteredOnNewColumnDF.show
> running udf(a)
> running udf(a)
> running udf(a1)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
>
>
> scala> filteredOnOldColumnDF.show
> running udf(a)
> +---+----+---+
> |old|old1|new|
> +---+----+---+
> |  a|   b|  a|
> +---+----+---+
>
>
>
> Best wishes.
> By Linbo
>
>