You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by jpocalan <jp...@gmail.com> on 2016/01/20 22:53:37 UTC

[Spark Streaming][Problem with DataFrame UDFs]

Hi,

I have an application which creates a Kafka Direct Stream from 1 topic
having 5 partitions.
As a result each batch is composed of an RDD having 5 partitions.
In order to apply transformation to my batch I have decided to convert the
RDD to DataFrame (DF) so that I can easily add column to the initial DF by
using custom UDFs.

Although, when I am applying any udf to the DF I am noticing that the udf
will get execute multiple times and this factor is driven by the number of
partitions.
For example, imagine I have a RDD with 10 records and 5 partitions ideally
my UDF should get called 10 times, although it gets consistently called 50
times, but the resulting DF is correct and when executing a count() properly
return 10, as expected.

I have changed my code to work directly with RDDs using mapPartitions and
the transformation gets called proper amount of time.

As additional information, I have set spark.speculation to false and no
tasks failed.

I am working on a smaller example that would isolate this potential issue,
but in the meantime I would like to know if somebody encountered this issue.

Thank you.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Problem-with-DataFrame-UDFs-tp26024.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: [Spark Streaming][Problem with DataFrame UDFs]

Posted by Jean-Pierre OCALAN <jp...@gmail.com>.
Quick correction in the code snippet I sent in my previous email:
Line: val enrichedDF = inputDF.withColumn("semantic", udf(col("url")))
Should be replaced by: val enrichedDF = inputDF.withColumn("semantic",
enrichUDF(col("url")))



On Thu, Jan 21, 2016 at 11:07 AM, Jean-Pierre OCALAN <jp...@gmail.com>
wrote:

> Hi Cody,
>
> First of all thanks a lot for your quick reply, although I have removed
> this post couple of hours after posting it because I ended up finding it
> was due to the way I was using DataFrame UDFs.
>
> Essentially I didn't know that UDFs were purely lazy and in case of the
> example below the UDF gets executed 3 times on the entire data set.
>
> // let's imagine we have an input DataFrame inputDF with "url" column
>
> // semanticClassifier.classify(url) returns Map[Int, Int]
> val enrichUDF = udf { (url: String) => semanticClassifier.classify(url) }
>
> // enrichedDF will have all columns contained in inputDF + a semantic
> column which will contain result of execution of the classification on the
> url column value.
> val enrichedDF = inputDF.withColumn("semantic", udf(col("url")))
>
> val outputDF = enrichedDF.select(col("*"),
> col("semantic")(0).as("semantic1"), col("semantic")(1).as("semantic2"),
> col("semantic")(2).as("semantic3")).drop("semantic")
>
> // The udf will be executed 3 times on the entire dataset
> outputDF.count()
>
> By adding enrichedDF.persist() and unpersist() later on I was able to
> quickly solve the issue but don't really like it, so I will probably work
> directly with the RDD[Row] and schema maintenance in order to recreate
> DataFrame.
>
> Thanks again,
> JP.
>
>
>
> On Thu, Jan 21, 2016 at 10:45 AM, Cody Koeninger <co...@koeninger.org>
> wrote:
>
>> If you can share an isolated example I'll take a look.  Not something
>> I've run into before.
>>
>> On Wed, Jan 20, 2016 at 3:53 PM, jpocalan <jp...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I have an application which creates a Kafka Direct Stream from 1 topic
>>> having 5 partitions.
>>> As a result each batch is composed of an RDD having 5 partitions.
>>> In order to apply transformation to my batch I have decided to convert
>>> the
>>> RDD to DataFrame (DF) so that I can easily add column to the initial DF
>>> by
>>> using custom UDFs.
>>>
>>> Although, when I am applying any udf to the DF I am noticing that the udf
>>> will get execute multiple times and this factor is driven by the number
>>> of
>>> partitions.
>>> For example, imagine I have a RDD with 10 records and 5 partitions
>>> ideally
>>> my UDF should get called 10 times, although it gets consistently called
>>> 50
>>> times, but the resulting DF is correct and when executing a count()
>>> properly
>>> return 10, as expected.
>>>
>>> I have changed my code to work directly with RDDs using mapPartitions and
>>> the transformation gets called proper amount of time.
>>>
>>> As additional information, I have set spark.speculation to false and no
>>> tasks failed.
>>>
>>> I am working on a smaller example that would isolate this potential
>>> issue,
>>> but in the meantime I would like to know if somebody encountered this
>>> issue.
>>>
>>> Thank you.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Problem-with-DataFrame-UDFs-tp26024.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>>>
>>>
>>
>


-- 
jean-pierre ocalan
jpocalan@gmail.com

Re: [Spark Streaming][Problem with DataFrame UDFs]

Posted by Jean-Pierre OCALAN <jp...@gmail.com>.
Hi Cody,

First of all thanks a lot for your quick reply, although I have removed
this post couple of hours after posting it because I ended up finding it
was due to the way I was using DataFrame UDFs.

Essentially I didn't know that UDFs were purely lazy and in case of the
example below the UDF gets executed 3 times on the entire data set.

// let's imagine we have an input DataFrame inputDF with "url" column

// semanticClassifier.classify(url) returns Map[Int, Int]
val enrichUDF = udf { (url: String) => semanticClassifier.classify(url) }

// enrichedDF will have all columns contained in inputDF + a semantic
column which will contain result of execution of the classification on the
url column value.
val enrichedDF = inputDF.withColumn("semantic", udf(col("url")))

val outputDF = enrichedDF.select(col("*"),
col("semantic")(0).as("semantic1"), col("semantic")(1).as("semantic2"),
col("semantic")(2).as("semantic3")).drop("semantic")

// The udf will be executed 3 times on the entire dataset
outputDF.count()

By adding enrichedDF.persist() and unpersist() later on I was able to
quickly solve the issue but don't really like it, so I will probably work
directly with the RDD[Row] and schema maintenance in order to recreate
DataFrame.

Thanks again,
JP.



On Thu, Jan 21, 2016 at 10:45 AM, Cody Koeninger <co...@koeninger.org> wrote:

> If you can share an isolated example I'll take a look.  Not something I've
> run into before.
>
> On Wed, Jan 20, 2016 at 3:53 PM, jpocalan <jp...@gmail.com> wrote:
>
>> Hi,
>>
>> I have an application which creates a Kafka Direct Stream from 1 topic
>> having 5 partitions.
>> As a result each batch is composed of an RDD having 5 partitions.
>> In order to apply transformation to my batch I have decided to convert the
>> RDD to DataFrame (DF) so that I can easily add column to the initial DF by
>> using custom UDFs.
>>
>> Although, when I am applying any udf to the DF I am noticing that the udf
>> will get execute multiple times and this factor is driven by the number of
>> partitions.
>> For example, imagine I have a RDD with 10 records and 5 partitions ideally
>> my UDF should get called 10 times, although it gets consistently called 50
>> times, but the resulting DF is correct and when executing a count()
>> properly
>> return 10, as expected.
>>
>> I have changed my code to work directly with RDDs using mapPartitions and
>> the transformation gets called proper amount of time.
>>
>> As additional information, I have set spark.speculation to false and no
>> tasks failed.
>>
>> I am working on a smaller example that would isolate this potential issue,
>> but in the meantime I would like to know if somebody encountered this
>> issue.
>>
>> Thank you.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Problem-with-DataFrame-UDFs-tp26024.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>

Re: [Spark Streaming][Problem with DataFrame UDFs]

Posted by Cody Koeninger <co...@koeninger.org>.
If you can share an isolated example I'll take a look.  Not something I've
run into before.

On Wed, Jan 20, 2016 at 3:53 PM, jpocalan <jp...@gmail.com> wrote:

> Hi,
>
> I have an application which creates a Kafka Direct Stream from 1 topic
> having 5 partitions.
> As a result each batch is composed of an RDD having 5 partitions.
> In order to apply transformation to my batch I have decided to convert the
> RDD to DataFrame (DF) so that I can easily add column to the initial DF by
> using custom UDFs.
>
> Although, when I am applying any udf to the DF I am noticing that the udf
> will get execute multiple times and this factor is driven by the number of
> partitions.
> For example, imagine I have a RDD with 10 records and 5 partitions ideally
> my UDF should get called 10 times, although it gets consistently called 50
> times, but the resulting DF is correct and when executing a count()
> properly
> return 10, as expected.
>
> I have changed my code to work directly with RDDs using mapPartitions and
> the transformation gets called proper amount of time.
>
> As additional information, I have set spark.speculation to false and no
> tasks failed.
>
> I am working on a smaller example that would isolate this potential issue,
> but in the meantime I would like to know if somebody encountered this
> issue.
>
> Thank you.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-Problem-with-DataFrame-UDFs-tp26024.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>