You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Wush Wu <wu...@gmail.com> on 2015/07/15 06:15:21 UTC

Efficiency of leftOuterJoin a cassandra rdd

Dear all,

I am trying to join two RDDs, named rdd1 and rdd2.

rdd1 is loaded from a textfile with about 33000 records.

rdd2 is loaded from a table in cassandra which has about 3 billions records.

I tried the following code:

```scala

val rdd1 : (String, XXX) = sc.textFile(...).map(...)
import org.apache.spark.sql.cassandra.CassandraSQLContext
cc.setKeyspace("xxx")
val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r => ...)

val result = rdd1.leftOuterJoin(rdd2)
result.take(20)

```

However, the log shows that the spark loaded 3 billions records from
cassandra and only 33000 records left at the end.

Is there a way to query the cassandra based on the key in rdd1?

Here is some information of our system:

- The spark version is 1.3.1
- The cassandra version is 2.0.14
- The key of joining is the primary key of the cassandra table.

Best,
Wush

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Efficiency of leftOuterJoin a cassandra rdd

Posted by Wush Wu <wu...@gmail.com>.
Dear Sujit,

Thanks for your suggestion.

After testing, the `joinWithCassandraTable` does the trick like what
you mentioned.

The rdd2 only query those data which have the same key in rdd1.

Best,
Wush

2015-07-16 0:00 GMT+08:00 Sujit Pal <su...@gmail.com>:
> Hi Wush,
>
> One option may be to try a replicated join. Since your rdd1 is small, read
> it into a collection and broadcast it to the workers, then filter your
> larger rdd2 against the collection on the workers.
>
> -sujit
>
>
> On Tue, Jul 14, 2015 at 11:33 PM, Deepak Jain <de...@gmail.com> wrote:
>>
>> Leftouterjoin and join apis are super slow in spark. 100x slower than
>> hadoop
>>
>> Sent from my iPhone
>>
>> > On 14-Jul-2015, at 10:59 PM, Wush Wu <wu...@gmail.com> wrote:
>> >
>> > I don't understand.
>> >
>> > By the way, the `joinWithCassandraTable` does improve my query time
>> > from 40 mins to 3 mins.
>> >
>> >
>> > 2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>:
>> >> I have explored spark joins for last few months (you can search my
>> >> posts)
>> >> and its frustrating useless.
>> >>
>> >>> On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu <wu...@gmail.com> wrote:
>> >>>
>> >>> Dear all,
>> >>>
>> >>> I have found a post discussing the same thing:
>> >>>
>> >>>
>> >>> https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J
>> >>>
>> >>> The solution is using "joinWithCassandraTable" and the documentation
>> >>> is here:
>> >>>
>> >>> https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md
>> >>>
>> >>> Wush
>> >>>
>> >>> 2015-07-15 12:15 GMT+08:00 Wush Wu <wu...@gmail.com>:
>> >>>> Dear all,
>> >>>>
>> >>>> I am trying to join two RDDs, named rdd1 and rdd2.
>> >>>>
>> >>>> rdd1 is loaded from a textfile with about 33000 records.
>> >>>>
>> >>>> rdd2 is loaded from a table in cassandra which has about 3 billions
>> >>>> records.
>> >>>>
>> >>>> I tried the following code:
>> >>>>
>> >>>> ```scala
>> >>>>
>> >>>> val rdd1 : (String, XXX) = sc.textFile(...).map(...)
>> >>>> import org.apache.spark.sql.cassandra.CassandraSQLContext
>> >>>> cc.setKeyspace("xxx")
>> >>>> val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r =>
>> >>>> ...)
>> >>>>
>> >>>> val result = rdd1.leftOuterJoin(rdd2)
>> >>>> result.take(20)
>> >>>>
>> >>>> ```
>> >>>>
>> >>>> However, the log shows that the spark loaded 3 billions records from
>> >>>> cassandra and only 33000 records left at the end.
>> >>>>
>> >>>> Is there a way to query the cassandra based on the key in rdd1?
>> >>>>
>> >>>> Here is some information of our system:
>> >>>>
>> >>>> - The spark version is 1.3.1
>> >>>> - The cassandra version is 2.0.14
>> >>>> - The key of joining is the primary key of the cassandra table.
>> >>>>
>> >>>> Best,
>> >>>> Wush
>> >>>
>> >>> ---------------------------------------------------------------------
>> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >>> For additional commands, e-mail: user-help@spark.apache.org
>> >>
>> >>
>> >>
>> >> --
>> >> Deepak
>> >>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Efficiency of leftOuterJoin a cassandra rdd

Posted by Sujit Pal <su...@gmail.com>.
Hi Wush,

One option may be to try a replicated join. Since your rdd1 is small, read
it into a collection and broadcast it to the workers, then filter your
larger rdd2 against the collection on the workers.

-sujit


On Tue, Jul 14, 2015 at 11:33 PM, Deepak Jain <de...@gmail.com> wrote:

> Leftouterjoin and join apis are super slow in spark. 100x slower than
> hadoop
>
> Sent from my iPhone
>
> > On 14-Jul-2015, at 10:59 PM, Wush Wu <wu...@gmail.com> wrote:
> >
> > I don't understand.
> >
> > By the way, the `joinWithCassandraTable` does improve my query time
> > from 40 mins to 3 mins.
> >
> >
> > 2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>:
> >> I have explored spark joins for last few months (you can search my
> posts)
> >> and its frustrating useless.
> >>
> >>> On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu <wu...@gmail.com> wrote:
> >>>
> >>> Dear all,
> >>>
> >>> I have found a post discussing the same thing:
> >>>
> >>>
> https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J
> >>>
> >>> The solution is using "joinWithCassandraTable" and the documentation
> >>> is here:
> >>>
> https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md
> >>>
> >>> Wush
> >>>
> >>> 2015-07-15 12:15 GMT+08:00 Wush Wu <wu...@gmail.com>:
> >>>> Dear all,
> >>>>
> >>>> I am trying to join two RDDs, named rdd1 and rdd2.
> >>>>
> >>>> rdd1 is loaded from a textfile with about 33000 records.
> >>>>
> >>>> rdd2 is loaded from a table in cassandra which has about 3 billions
> >>>> records.
> >>>>
> >>>> I tried the following code:
> >>>>
> >>>> ```scala
> >>>>
> >>>> val rdd1 : (String, XXX) = sc.textFile(...).map(...)
> >>>> import org.apache.spark.sql.cassandra.CassandraSQLContext
> >>>> cc.setKeyspace("xxx")
> >>>> val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r =>
> >>>> ...)
> >>>>
> >>>> val result = rdd1.leftOuterJoin(rdd2)
> >>>> result.take(20)
> >>>>
> >>>> ```
> >>>>
> >>>> However, the log shows that the spark loaded 3 billions records from
> >>>> cassandra and only 33000 records left at the end.
> >>>>
> >>>> Is there a way to query the cassandra based on the key in rdd1?
> >>>>
> >>>> Here is some information of our system:
> >>>>
> >>>> - The spark version is 1.3.1
> >>>> - The cassandra version is 2.0.14
> >>>> - The key of joining is the primary key of the cassandra table.
> >>>>
> >>>> Best,
> >>>> Wush
> >>>
> >>> ---------------------------------------------------------------------
> >>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >>> For additional commands, e-mail: user-help@spark.apache.org
> >>
> >>
> >>
> >> --
> >> Deepak
> >>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Efficiency of leftOuterJoin a cassandra rdd

Posted by Deepak Jain <de...@gmail.com>.
Leftouterjoin and join apis are super slow in spark. 100x slower than hadoop 

Sent from my iPhone

> On 14-Jul-2015, at 10:59 PM, Wush Wu <wu...@gmail.com> wrote:
> 
> I don't understand.
> 
> By the way, the `joinWithCassandraTable` does improve my query time
> from 40 mins to 3 mins.
> 
> 
> 2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>:
>> I have explored spark joins for last few months (you can search my posts)
>> and its frustrating useless.
>> 
>>> On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu <wu...@gmail.com> wrote:
>>> 
>>> Dear all,
>>> 
>>> I have found a post discussing the same thing:
>>> 
>>> https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J
>>> 
>>> The solution is using "joinWithCassandraTable" and the documentation
>>> is here:
>>> https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md
>>> 
>>> Wush
>>> 
>>> 2015-07-15 12:15 GMT+08:00 Wush Wu <wu...@gmail.com>:
>>>> Dear all,
>>>> 
>>>> I am trying to join two RDDs, named rdd1 and rdd2.
>>>> 
>>>> rdd1 is loaded from a textfile with about 33000 records.
>>>> 
>>>> rdd2 is loaded from a table in cassandra which has about 3 billions
>>>> records.
>>>> 
>>>> I tried the following code:
>>>> 
>>>> ```scala
>>>> 
>>>> val rdd1 : (String, XXX) = sc.textFile(...).map(...)
>>>> import org.apache.spark.sql.cassandra.CassandraSQLContext
>>>> cc.setKeyspace("xxx")
>>>> val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r =>
>>>> ...)
>>>> 
>>>> val result = rdd1.leftOuterJoin(rdd2)
>>>> result.take(20)
>>>> 
>>>> ```
>>>> 
>>>> However, the log shows that the spark loaded 3 billions records from
>>>> cassandra and only 33000 records left at the end.
>>>> 
>>>> Is there a way to query the cassandra based on the key in rdd1?
>>>> 
>>>> Here is some information of our system:
>>>> 
>>>> - The spark version is 1.3.1
>>>> - The cassandra version is 2.0.14
>>>> - The key of joining is the primary key of the cassandra table.
>>>> 
>>>> Best,
>>>> Wush
>>> 
>>> ---------------------------------------------------------------------
>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>> For additional commands, e-mail: user-help@spark.apache.org
>> 
>> 
>> 
>> --
>> Deepak
>> 

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Efficiency of leftOuterJoin a cassandra rdd

Posted by Wush Wu <wu...@gmail.com>.
I don't understand.

By the way, the `joinWithCassandraTable` does improve my query time
from 40 mins to 3 mins.


2015-07-15 13:19 GMT+08:00 ÐΞ€ρ@Ҝ (๏̯͡๏) <de...@gmail.com>:
> I have explored spark joins for last few months (you can search my posts)
> and its frustrating useless.
>
> On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu <wu...@gmail.com> wrote:
>>
>> Dear all,
>>
>> I have found a post discussing the same thing:
>>
>> https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J
>>
>> The solution is using "joinWithCassandraTable" and the documentation
>> is here:
>> https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md
>>
>> Wush
>>
>> 2015-07-15 12:15 GMT+08:00 Wush Wu <wu...@gmail.com>:
>> > Dear all,
>> >
>> > I am trying to join two RDDs, named rdd1 and rdd2.
>> >
>> > rdd1 is loaded from a textfile with about 33000 records.
>> >
>> > rdd2 is loaded from a table in cassandra which has about 3 billions
>> > records.
>> >
>> > I tried the following code:
>> >
>> > ```scala
>> >
>> > val rdd1 : (String, XXX) = sc.textFile(...).map(...)
>> > import org.apache.spark.sql.cassandra.CassandraSQLContext
>> > cc.setKeyspace("xxx")
>> > val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r =>
>> > ...)
>> >
>> > val result = rdd1.leftOuterJoin(rdd2)
>> > result.take(20)
>> >
>> > ```
>> >
>> > However, the log shows that the spark loaded 3 billions records from
>> > cassandra and only 33000 records left at the end.
>> >
>> > Is there a way to query the cassandra based on the key in rdd1?
>> >
>> > Here is some information of our system:
>> >
>> > - The spark version is 1.3.1
>> > - The cassandra version is 2.0.14
>> > - The key of joining is the primary key of the cassandra table.
>> >
>> > Best,
>> > Wush
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
>
>
> --
> Deepak
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Efficiency of leftOuterJoin a cassandra rdd

Posted by ๏̯͡๏ <ÐΞ€ρ@Ҝ>, de...@gmail.com.
I have explored spark joins for last few months (you can search my posts)
and its frustrating useless.

On Tue, Jul 14, 2015 at 9:35 PM, Wush Wu <wu...@gmail.com> wrote:

> Dear all,
>
> I have found a post discussing the same thing:
>
> https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J
>
> The solution is using "joinWithCassandraTable" and the documentation
> is here:
> https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md
>
> Wush
>
> 2015-07-15 12:15 GMT+08:00 Wush Wu <wu...@gmail.com>:
> > Dear all,
> >
> > I am trying to join two RDDs, named rdd1 and rdd2.
> >
> > rdd1 is loaded from a textfile with about 33000 records.
> >
> > rdd2 is loaded from a table in cassandra which has about 3 billions
> records.
> >
> > I tried the following code:
> >
> > ```scala
> >
> > val rdd1 : (String, XXX) = sc.textFile(...).map(...)
> > import org.apache.spark.sql.cassandra.CassandraSQLContext
> > cc.setKeyspace("xxx")
> > val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r =>
> ...)
> >
> > val result = rdd1.leftOuterJoin(rdd2)
> > result.take(20)
> >
> > ```
> >
> > However, the log shows that the spark loaded 3 billions records from
> > cassandra and only 33000 records left at the end.
> >
> > Is there a way to query the cassandra based on the key in rdd1?
> >
> > Here is some information of our system:
> >
> > - The spark version is 1.3.1
> > - The cassandra version is 2.0.14
> > - The key of joining is the primary key of the cassandra table.
> >
> > Best,
> > Wush
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>


-- 
Deepak

Re: Efficiency of leftOuterJoin a cassandra rdd

Posted by Wush Wu <wu...@gmail.com>.
Dear all,

I have found a post discussing the same thing:
https://groups.google.com/a/lists.datastax.com/forum/#!searchin/spark-connector-user/join/spark-connector-user/q3GotS-n0Wk/g-LPTteCEg0J

The solution is using "joinWithCassandraTable" and the documentation
is here: https://github.com/datastax/spark-cassandra-connector/blob/v1.3.0-M2/doc/2_loading.md

Wush

2015-07-15 12:15 GMT+08:00 Wush Wu <wu...@gmail.com>:
> Dear all,
>
> I am trying to join two RDDs, named rdd1 and rdd2.
>
> rdd1 is loaded from a textfile with about 33000 records.
>
> rdd2 is loaded from a table in cassandra which has about 3 billions records.
>
> I tried the following code:
>
> ```scala
>
> val rdd1 : (String, XXX) = sc.textFile(...).map(...)
> import org.apache.spark.sql.cassandra.CassandraSQLContext
> cc.setKeyspace("xxx")
> val rdd2 : (String, String) = cc.sql("SELECT x, y FROM xxx").map(r => ...)
>
> val result = rdd1.leftOuterJoin(rdd2)
> result.take(20)
>
> ```
>
> However, the log shows that the spark loaded 3 billions records from
> cassandra and only 33000 records left at the end.
>
> Is there a way to query the cassandra based on the key in rdd1?
>
> Here is some information of our system:
>
> - The spark version is 1.3.1
> - The cassandra version is 2.0.14
> - The key of joining is the primary key of the cassandra table.
>
> Best,
> Wush

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org