You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by RodrigoB <ro...@aspect.com> on 2014/07/09 15:24:14 UTC

Cassandra driver Spark question

Hi all,

I am currently trying to save to Cassandra after some Spark Streaming
computation.

I call a myDStream.foreachRDD so that I can collect each RDD in the driver
app runtime and inside I do something like this:
myDStream.foreachRDD(rdd => {

var someCol = Seq[MyType]()

foreach(kv =>{
  someCol :+ rdd._2 //I only want the RDD value and not the key
 }
val collectionRDD = sc.parallelize(someCol) //THIS IS WHY IT FAILS TRYING TO
RUN THE WORKER
collectionRDD.saveToCassandra(...)
}

I get the NotSerializableException while trying to run the Node (also tried
someCol as shared variable).
I believe this happens because the myDStream doesn't exist yet when the code
is pushed to the Node so the parallelize doens't have any structure to
relate to it. Inside this foreachRDD I should only do RDD calls which are
only related to other RDDs. I guess this was just a desperate attempt....

So I have a question
Using the Cassandra Spark driver - Can we only write to Cassandra from an
RDD? In my case I only want to write once all the computation is finished in
a single batch on the driver app.

tnks in advance.

Rod











--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Cassandra driver Spark question

Posted by RodrigoB <ro...@aspect.com>.

Tnks to both for the comments and the debugging suggestion, I will try to
use. 

Regarding you comment, yes I do agree the current solution was not efficient
but for using the saveToCassandra method I need an RDD thus the paralelize
method. I finally got direct by Piotr to use the CassandraConnect and got
this fixed in the meantime.
Bottom line is I started using the new Cassandra Spark driver with async
calls, prepared statements and batch executions on the node transformation
and performance improved greatly.


tnks,
Rod



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177p9990.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Cassandra driver Spark question

Posted by Tathagata Das <ta...@gmail.com>.
Can you find out what is the class that is causing the NotSerializable
exception? In fact, you can enabled extended serialization debugging
<http://stackoverflow.com/questions/1660441/java-flag-to-enable-extended-serialization-debugging-info>
to
figure out object structure through the foreachRDD's closure that is
causing it.

On a not-so-related note, why are you collecting and then parallelizing?
That is really inefficient thing to do as all the data is being brought
back to the driver. If you just want the value to be save to cassandra, why
cant you map the pair RDD to have only values and save that to cassandra?

TD


On Wed, Jul 9, 2014 at 7:52 AM, Luis Ángel Vicente Sánchez <
langel.groups@gmail.com> wrote:

> Yes, I'm using it to count concurrent users from a kafka stream of events
> without problems. I'm currently testing it using the local mode but any
> serialization problem would have already appeared so I don't expect any
> serialization issue when I deployed to my cluster.
>
>
> 2014-07-09 15:39 GMT+01:00 RodrigoB <ro...@aspect.com>:
>
> Hi Luis,
>>
>> Yes it's actually an ouput of the previous RDD.
>> Have you ever used the Cassandra Spark Driver on the driver app? I believe
>> these limitations go around that - it's designed to save RDDs from the
>> nodes.
>>
>> tnks,
>> Rod
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177p9187.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>

Re: Cassandra driver Spark question

Posted by Luis Ángel Vicente Sánchez <la...@gmail.com>.
Yes, I'm using it to count concurrent users from a kafka stream of events
without problems. I'm currently testing it using the local mode but any
serialization problem would have already appeared so I don't expect any
serialization issue when I deployed to my cluster.


2014-07-09 15:39 GMT+01:00 RodrigoB <ro...@aspect.com>:

> Hi Luis,
>
> Yes it's actually an ouput of the previous RDD.
> Have you ever used the Cassandra Spark Driver on the driver app? I believe
> these limitations go around that - it's designed to save RDDs from the
> nodes.
>
> tnks,
> Rod
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177p9187.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Re: Cassandra driver Spark question

Posted by RodrigoB <ro...@aspect.com>.
Hi Luis,

Yes it's actually an ouput of the previous RDD. 
Have you ever used the Cassandra Spark Driver on the driver app? I believe
these limitations go around that - it's designed to save RDDs from the
nodes.

tnks,
Rod



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177p9187.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Cassandra driver Spark question

Posted by Luis Ángel Vicente Sánchez <la...@gmail.com>.
Is MyType serializable? Everything inside the foreachRDD closure has to be
serializable.


2014-07-09 14:24 GMT+01:00 RodrigoB <ro...@aspect.com>:

> Hi all,
>
> I am currently trying to save to Cassandra after some Spark Streaming
> computation.
>
> I call a myDStream.foreachRDD so that I can collect each RDD in the driver
> app runtime and inside I do something like this:
> myDStream.foreachRDD(rdd => {
>
> var someCol = Seq[MyType]()
>
> foreach(kv =>{
>   someCol :+ rdd._2 //I only want the RDD value and not the key
>  }
> val collectionRDD = sc.parallelize(someCol) //THIS IS WHY IT FAILS TRYING
> TO
> RUN THE WORKER
> collectionRDD.saveToCassandra(...)
> }
>
> I get the NotSerializableException while trying to run the Node (also tried
> someCol as shared variable).
> I believe this happens because the myDStream doesn't exist yet when the
> code
> is pushed to the Node so the parallelize doens't have any structure to
> relate to it. Inside this foreachRDD I should only do RDD calls which are
> only related to other RDDs. I guess this was just a desperate attempt....
>
> So I have a question
> Using the Cassandra Spark driver - Can we only write to Cassandra from an
> RDD? In my case I only want to write once all the computation is finished
> in
> a single batch on the driver app.
>
> tnks in advance.
>
> Rod
>
>
>
>
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Cassandra-driver-Spark-question-tp9177.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>