You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Cyril Scetbon <cy...@free.fr> on 2016/05/08 06:41:16 UTC

Joining a RDD to a Dataframe

Hi,

I have a RDD built during a spark streaming job and I'd like to join it to a DataFrame (E/S input) to enrich it.
It seems that I can't join the RDD and the DF without converting first the RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :

scala> df
res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string]

scala> df_input
res33: org.apache.spark.sql.DataFrame = [id: string]

scala> df_input.collect
res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])

I can get ids I want if I know the value to look for in addresses.id using :

scala> df.filter(array_contains(df("addresses.id"), "idaddress2")).select("id").collect
res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY])

However when I try to join df_input and df and to use the previous filter as the join condition I get an exception :

scala> df.join(df_input, array_contains(df("adresses.id"), df_input("id")))
java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.Column id
	at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
	at org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
	...

It seems that array_contains only supports static arguments and does not replace a sql.Column by its value.

What's the best way to achieve what I want to do ? (Also speaking in term of performance)

Thanks
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Joining a RDD to a Dataframe

Posted by Xinh Huynh <xi...@gmail.com>.
Hi Cyril,

In the case where there are no documents, it looks like there is a typo in
"addresses" (check the number of "d"s):

| scala> df.select(explode(df("addresses.id")).as("aid"), df("id"))  <==
addresses
| org.apache.spark.sql.AnalysisException: Cannot resolve column name "id"
among (adresses); <== adresses

As for your question about joining on a nested array column, I don't know
if it is possible. Is it supported in normal SQL? Exploding seems the right
way because then there is only one join key per row, as opposed to the
array, which could have multiple join keys inside the array.

Xinh

On Thu, May 12, 2016 at 7:32 PM, Cyril Scetbon <cy...@free.fr>
wrote:

> Nobody has the answer ?
>
> Another thing I've seen is that if I have no documents at all :
>
> scala> df.select(explode(df("addresses.id")).as("aid")).collect
> res27: Array[org.apache.spark.sql.Row] = Array()
>
> Then
>
> scala> df.select(explode(df("addresses.id")).as("aid"), df("id"))
> org.apache.spark.sql.AnalysisException: Cannot resolve column name "id"
> among (adresses);
>         at
> org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
>         at
> org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
>
> Is there a better way to query nested objects and to join between a DF
> containing nested objects and another regular data frame (yes it's the
> current case)
>
> On May 9, 2016, at 00:42, Cyril Scetbon <cy...@free.fr> wrote:
>
> Hi Ashish,
>
> The issue is not related to converting a RDD to a DF. I did it. I was just
> asking if I should do it differently.
>
> The issue regards the exception when using array_contains with a
> sql.Column instead of a value.
>
> I found another way to do it using explode as follows :
>
> df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input,
> $"aid" === df_input("id")).select(df("id"))
>
> However, I'm wondering if it does almost the same or if the query is
> different and worst in term of performance.
>
> If someone can comment on it and maybe give me advices.
>
> Thank you.
>
> On May 8, 2016, at 22:12, Ashish Dubey <as...@gmail.com> wrote:
>
> Is there any reason you dont want to convert this - i dont think join b/w
> RDD and DF is supported.
>
> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon <cy...@free.fr>
> wrote:
>
>> Hi,
>>
>> I have a RDD built during a spark streaming job and I'd like to join it
>> to a DataFrame (E/S input) to enrich it.
>> It seems that I can't join the RDD and the DF without converting first
>> the RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>>
>> scala> df
>> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses:
>> array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string]
>>
>> scala> df_input
>> res33: org.apache.spark.sql.DataFrame = [id: string]
>>
>> scala> df_input.collect
>> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2],
>> [idaddress12])
>>
>> I can get ids I want if I know the value to look for in addresses.id
>> using :
>>
>> scala> df.filter(array_contains(df("addresses.id"),
>> "idaddress2")).select("id").collect
>> res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY])
>>
>> However when I try to join df_input and df and to use the previous filter
>> as the join condition I get an exception :
>>
>> scala> df.join(df_input, array_contains(df("adresses.id"),
>> df_input("id")))
>> java.lang.RuntimeException: Unsupported literal type class
>> org.apache.spark.sql.Column id
>>         at
>> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
>>         at
>> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
>>         ...
>>
>> It seems that array_contains only supports static arguments and does not
>> replace a sql.Column by its value.
>>
>> What's the best way to achieve what I want to do ? (Also speaking in term
>> of performance)
>>
>> Thanks
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>>
>
>
>

Re: Joining a RDD to a Dataframe

Posted by Cyril Scetbon <cy...@free.fr>.
Nobody has the answer ? 

Another thing I've seen is that if I have no documents at all : 

scala> df.select(explode(df("addresses.id")).as("aid")).collect
res27: Array[org.apache.spark.sql.Row] = Array()

Then

scala> df.select(explode(df("addresses.id")).as("aid"), df("id"))
org.apache.spark.sql.AnalysisException: Cannot resolve column name "id" among (adresses);
        at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)
        at org.apache.spark.sql.DataFrame$$anonfun$resolve$1.apply(DataFrame.scala:152)

Is there a better way to query nested objects and to join between a DF containing nested objects and another regular data frame (yes it's the current case) 

> On May 9, 2016, at 00:42, Cyril Scetbon <cy...@free.fr> wrote:
> 
> Hi Ashish,
> 
> The issue is not related to converting a RDD to a DF. I did it. I was just asking if I should do it differently.
> 
> The issue regards the exception when using array_contains with a sql.Column instead of a value.
> 
> I found another way to do it using explode as follows : 
> 
> df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input, $"aid" === df_input("id")).select(df("id"))
> 
> However, I'm wondering if it does almost the same or if the query is different and worst in term of performance.
> 
> If someone can comment on it and maybe give me advices.
> 
> Thank you.
> 
>> On May 8, 2016, at 22:12, Ashish Dubey <ashish.mnr@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Is there any reason you dont want to convert this - i dont think join b/w RDD and DF is supported.
>> 
>> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon <cyril.scetbon@free.fr <ma...@free.fr>> wrote:
>> Hi,
>> 
>> I have a RDD built during a spark streaming job and I'd like to join it to a DataFrame (E/S input) to enrich it.
>> It seems that I can't join the RDD and the DF without converting first the RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>> 
>> scala> df
>> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string]
>> 
>> scala> df_input
>> res33: org.apache.spark.sql.DataFrame = [id: string]
>> 
>> scala> df_input.collect
>> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
>> 
>> I can get ids I want if I know the value to look for in addresses.id <http://addresses.id/> using :
>> 
>> scala> df.filter(array_contains(df("addresses.id <http://addresses.id/>"), "idaddress2")).select("id").collect
>> res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY])
>> 
>> However when I try to join df_input and df and to use the previous filter as the join condition I get an exception :
>> 
>> scala> df.join(df_input, array_contains(df("adresses.id <http://adresses.id/>"), df_input("id")))
>> java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.Column id
>>         at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
>>         at org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
>>         ...
>> 
>> It seems that array_contains only supports static arguments and does not replace a sql.Column by its value.
>> 
>> What's the best way to achieve what I want to do ? (Also speaking in term of performance)
>> 
>> Thanks
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
>> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
>> 
>> 
> 


Re: Joining a RDD to a Dataframe

Posted by Cyril Scetbon <cy...@free.fr>.
Hi Ashish,

The issue is not related to converting a RDD to a DF. I did it. I was just asking if I should do it differently.

The issue regards the exception when using array_contains with a sql.Column instead of a value.

I found another way to do it using explode as follows : 

df.select(explode(df("addresses.id")).as("aid"), df("id")).join(df_input, $"aid" === df_input("id")).select(df("id"))

However, I'm wondering if it does almost the same or if the query is different and worst in term of performance.

If someone can comment on it and maybe give me advices.

Thank you.

> On May 8, 2016, at 22:12, Ashish Dubey <as...@gmail.com> wrote:
> 
> Is there any reason you dont want to convert this - i dont think join b/w RDD and DF is supported.
> 
> On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon <cyril.scetbon@free.fr <ma...@free.fr>> wrote:
> Hi,
> 
> I have a RDD built during a spark streaming job and I'd like to join it to a DataFrame (E/S input) to enrich it.
> It seems that I can't join the RDD and the DF without converting first the RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
> 
> scala> df
> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses: array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string]
> 
> scala> df_input
> res33: org.apache.spark.sql.DataFrame = [id: string]
> 
> scala> df_input.collect
> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
> 
> I can get ids I want if I know the value to look for in addresses.id <http://addresses.id/> using :
> 
> scala> df.filter(array_contains(df("addresses.id <http://addresses.id/>"), "idaddress2")).select("id").collect
> res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY])
> 
> However when I try to join df_input and df and to use the previous filter as the join condition I get an exception :
> 
> scala> df.join(df_input, array_contains(df("adresses.id <http://adresses.id/>"), df_input("id")))
> java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.Column id
>         at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
>         at org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
>         ...
> 
> It seems that array_contains only supports static arguments and does not replace a sql.Column by its value.
> 
> What's the best way to achieve what I want to do ? (Also speaking in term of performance)
> 
> Thanks
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org <ma...@spark.apache.org>
> For additional commands, e-mail: user-help@spark.apache.org <ma...@spark.apache.org>
> 
> 


Re: Joining a RDD to a Dataframe

Posted by Ashish Dubey <as...@gmail.com>.
Is there any reason you dont want to convert this - i dont think join b/w
RDD and DF is supported.

On Sat, May 7, 2016 at 11:41 PM, Cyril Scetbon <cy...@free.fr>
wrote:

> Hi,
>
> I have a RDD built during a spark streaming job and I'd like to join it to
> a DataFrame (E/S input) to enrich it.
> It seems that I can't join the RDD and the DF without converting first the
> RDD to a DF (Tell me if I'm wrong). Here are the schemas of both DF :
>
> scala> df
> res32: org.apache.spark.sql.DataFrame = [f1: string, addresses:
> array<struct<sf1:int,sf2:string,sf3:string,id:string>>, id: string]
>
> scala> df_input
> res33: org.apache.spark.sql.DataFrame = [id: string]
>
> scala> df_input.collect
> res34: Array[org.apache.spark.sql.Row] = Array([idaddress2], [idaddress12])
>
> I can get ids I want if I know the value to look for in addresses.id
> using :
>
> scala> df.filter(array_contains(df("addresses.id"),
> "idaddress2")).select("id").collect
> res35: Array[org.apache.spark.sql.Row] = Array([XXXX], [YY])
>
> However when I try to join df_input and df and to use the previous filter
> as the join condition I get an exception :
>
> scala> df.join(df_input, array_contains(df("adresses.id"),
> df_input("id")))
> java.lang.RuntimeException: Unsupported literal type class
> org.apache.spark.sql.Column id
>         at
> org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:50)
>         at
> org.apache.spark.sql.functions$.array_contains(functions.scala:2452)
>         ...
>
> It seems that array_contains only supports static arguments and does not
> replace a sql.Column by its value.
>
> What's the best way to achieve what I want to do ? (Also speaking in term
> of performance)
>
> Thanks
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>