You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Cyril Scetbon <cy...@free.fr> on 2016/06/01 16:00:08 UTC
Using data frames to join separate RDDs in spark streaming
Hi guys,
I have a 2 input data streams that I want to join using Dataframes and unfortunately I get the message produced by https://issues.apache.org/jira/browse/SPARK-5063 as I can't reference rdd1 in (2) :
(1)
val rdd1 = sc.esRDD(es_resource.toLowerCase, query)
.map(r => (r._1, r._2))
(2)
mgs.map(x => x._1)
.foreachRDD { rdd =>
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val df_aids = rdd.toDF("id")
val df = rdd1.toDF("id", "aid")
df.select(explode(df("aid")).as("aid"), df("id"))
.join(df_aids, $"aid" === df_aids("id"))
.select(df("id"), df_aids("id"))
.....
}
Is there a way to still use Dataframes to do it or I need to do everything using RDDs join only ?
And If I need to use only RDDs join, how to do it ? as I have a RDD (rdd1) and a DStream (mgs) ?
Thanks
--
Cyril SCETBON
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Using data frames to join separate RDDs in spark streaming
Posted by Cyril Scetbon <cy...@free.fr>.
Problem solved by creating only one RDD.
> On Jun 1, 2016, at 14:05, Cyril Scetbon <cy...@free.fr> wrote:
>
> It seems that to join a DStream with a RDD I can use :
>
> mgs.transform(rdd => rdd.join(rdd1))
>
> or
>
> mgs.foreachRDD(rdd => rdd.join(rdd1))
>
> But, I can't see why rdd1.toDF("id","aid") really causes SPARK-5063
>
>
>> On Jun 1, 2016, at 12:00, Cyril Scetbon <cy...@free.fr> wrote:
>>
>> Hi guys,
>>
>> I have a 2 input data streams that I want to join using Dataframes and unfortunately I get the message produced by https://issues.apache.org/jira/browse/SPARK-5063 as I can't reference rdd1 in (2) :
>>
>> (1)
>> val rdd1 = sc.esRDD(es_resource.toLowerCase, query)
>> .map(r => (r._1, r._2))
>>
>> (2)
>> mgs.map(x => x._1)
>> .foreachRDD { rdd =>
>> val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
>> import sqlContext.implicits._
>>
>> val df_aids = rdd.toDF("id")
>>
>> val df = rdd1.toDF("id", "aid")
>>
>> df.select(explode(df("aid")).as("aid"), df("id"))
>> .join(df_aids, $"aid" === df_aids("id"))
>> .select(df("id"), df_aids("id"))
>> .....
>> }
>>
>> Is there a way to still use Dataframes to do it or I need to do everything using RDDs join only ?
>> And If I need to use only RDDs join, how to do it ? as I have a RDD (rdd1) and a DStream (mgs) ?
>>
>> Thanks
>> --
>> Cyril SCETBON
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
Re: Using data frames to join separate RDDs in spark streaming
Posted by Cyril Scetbon <cy...@free.fr>.
It seems that to join a DStream with a RDD I can use :
mgs.transform(rdd => rdd.join(rdd1))
or
mgs.foreachRDD(rdd => rdd.join(rdd1))
But, I can't see why rdd1.toDF("id","aid") really causes SPARK-5063
> On Jun 1, 2016, at 12:00, Cyril Scetbon <cy...@free.fr> wrote:
>
> Hi guys,
>
> I have a 2 input data streams that I want to join using Dataframes and unfortunately I get the message produced by https://issues.apache.org/jira/browse/SPARK-5063 as I can't reference rdd1 in (2) :
>
> (1)
> val rdd1 = sc.esRDD(es_resource.toLowerCase, query)
> .map(r => (r._1, r._2))
>
> (2)
> mgs.map(x => x._1)
> .foreachRDD { rdd =>
> val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
> import sqlContext.implicits._
>
> val df_aids = rdd.toDF("id")
>
> val df = rdd1.toDF("id", "aid")
>
> df.select(explode(df("aid")).as("aid"), df("id"))
> .join(df_aids, $"aid" === df_aids("id"))
> .select(df("id"), df_aids("id"))
> .....
> }
>
> Is there a way to still use Dataframes to do it or I need to do everything using RDDs join only ?
> And If I need to use only RDDs join, how to do it ? as I have a RDD (rdd1) and a DStream (mgs) ?
>
> Thanks
> --
> Cyril SCETBON
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org