You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by jchen <jc...@pivotal.io> on 2014/09/07 08:24:12 UTC

Spark Streaming and database access (e.g. MySQL)

Hi,

Has someone tried using Spark Streaming with MySQL (or any other
database/data store)? I can write to MySQL at the beginning of the driver
application. However, when I am trying to write the result of every
streaming processing window to MySQL, it fails with the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException:
com.mysql.jdbc.JDBC4PreparedStatement

I think it is because the statement object should be serializable, in order
to be executed on the worker node. Has someone tried the similar cases?
Example code will be very helpful. My intension is to execute
INSERT/UPDATE/DELETE/SELECT statements for each sliding window.

Thanks,
JC



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Streaming and database access (e.g. MySQL)

Posted by Mayur Rustagi <ma...@gmail.com>.
I think she is checking for blanks?
But if the RDD is blank then nothing will happen, no db connections etc.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>


On Mon, Sep 8, 2014 at 1:32 PM, Tobias Pfeiffer <tg...@preferred.jp> wrote:

> Hi,
>
> On Mon, Sep 8, 2014 at 4:39 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>> >                 if (rdd.take (1).size == 1) {
>> >                     rdd foreachPartition { iterator =>
>>
>
> I was wondering: Since take() is an output operation, isn't it computed
> twice (once for the take(1), once during the iteration)? Or will only one
> single element be computed for take(1)?
>
> Thanks
> Tobias
>
>
>

Re: Spark Streaming and database access (e.g. MySQL)

Posted by Tobias Pfeiffer <tg...@preferred.jp>.
Hi,

On Mon, Sep 8, 2014 at 4:39 PM, Sean Owen <so...@cloudera.com> wrote:
>
> >                 if (rdd.take (1).size == 1) {
> >                     rdd foreachPartition { iterator =>
>

I was wondering: Since take() is an output operation, isn't it computed
twice (once for the take(1), once during the iteration)? Or will only one
single element be computed for take(1)?

Thanks
Tobias

Re: Spark Streaming and database access (e.g. MySQL)

Posted by Sean Owen <so...@cloudera.com>.
That should be OK, since the iterator is definitely consumed, and
therefore the connection actually done with, at the end of a 'foreach'
method. You might put the close in a finally block.

On Mon, Sep 8, 2014 at 12:29 AM, Soumitra Kumar
<ku...@gmail.com> wrote:
> I have the following code:
>
> stream foreachRDD { rdd =>
>                 if (rdd.take (1).size == 1) {
>                     rdd foreachPartition { iterator =>
>                         initDbConnection ()
>                         iterator foreach {
>                             write to db
>                         }
>                         closeDbConnection ()
>                     }
>                 }
>             }
>
> On Sun, Sep 7, 2014 at 1:26 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>> ... I'd call out that last bit as actually tricky: "close off the driver"
>>
>> See this message for the right-est way to do that, along with the
>> right way to open DB connections remotely instead of trying to
>> serialize them:
>>
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dRNAg@mail.gmail.com%3E
>>
>> On Sun, Sep 7, 2014 at 4:19 PM, Mayur Rustagi <ma...@gmail.com>
>> wrote:
>> > Standard pattern is to initialize the mysql jdbc driver in your
>> > mappartition
>> > call , update database & then close off the driver.
>> > Couple of gotchas
>> > 1. New driver initiated for all your partitions
>> > 2. If the effect(inserts & updates) is not idempotent, so if your server
>> > crashes, Spark will replay updates to mysql & may cause data corruption.
>> >
>> >
>> > Regards
>> > Mayur
>> >
>> > Mayur Rustagi
>> > Ph: +1 (760) 203 3257
>> > http://www.sigmoidanalytics.com
>> > @mayur_rustagi
>> >
>> >
>> > On Sun, Sep 7, 2014 at 11:54 AM, jchen <jc...@pivotal.io> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Has someone tried using Spark Streaming with MySQL (or any other
>> >> database/data store)? I can write to MySQL at the beginning of the
>> >> driver
>> >> application. However, when I am trying to write the result of every
>> >> streaming processing window to MySQL, it fails with the following
>> >> error:
>> >>
>> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> >> not
>> >> serializable: java.io.NotSerializableException:
>> >> com.mysql.jdbc.JDBC4PreparedStatement
>> >>
>> >> I think it is because the statement object should be serializable, in
>> >> order
>> >> to be executed on the worker node. Has someone tried the similar cases?
>> >> Example code will be very helpful. My intension is to execute
>> >> INSERT/UPDATE/DELETE/SELECT statements for each sliding window.
>> >>
>> >> Thanks,
>> >> JC
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html
>> >> Sent from the Apache Spark User List mailing list archive at
>> >> Nabble.com.
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> >> For additional commands, e-mail: user-help@spark.apache.org
>> >>
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Streaming and database access (e.g. MySQL)

Posted by Soumitra Kumar <ku...@gmail.com>.
I have the following code:

stream foreachRDD { rdd =>
                if (rdd.take (1).size == 1) {
                    rdd foreachPartition { iterator =>
                        initDbConnection ()
                        iterator foreach {
                            write to db
                        }
                        closeDbConnection ()
                    }
                }
            }

On Sun, Sep 7, 2014 at 1:26 PM, Sean Owen <so...@cloudera.com> wrote:

> ... I'd call out that last bit as actually tricky: "close off the driver"
>
> See this message for the right-est way to do that, along with the
> right way to open DB connections remotely instead of trying to
> serialize them:
>
>
> http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dRNAg@mail.gmail.com%3E
>
> On Sun, Sep 7, 2014 at 4:19 PM, Mayur Rustagi <ma...@gmail.com>
> wrote:
> > Standard pattern is to initialize the mysql jdbc driver in your
> mappartition
> > call , update database & then close off the driver.
> > Couple of gotchas
> > 1. New driver initiated for all your partitions
> > 2. If the effect(inserts & updates) is not idempotent, so if your server
> > crashes, Spark will replay updates to mysql & may cause data corruption.
> >
> >
> > Regards
> > Mayur
> >
> > Mayur Rustagi
> > Ph: +1 (760) 203 3257
> > http://www.sigmoidanalytics.com
> > @mayur_rustagi
> >
> >
> > On Sun, Sep 7, 2014 at 11:54 AM, jchen <jc...@pivotal.io> wrote:
> >>
> >> Hi,
> >>
> >> Has someone tried using Spark Streaming with MySQL (or any other
> >> database/data store)? I can write to MySQL at the beginning of the
> driver
> >> application. However, when I am trying to write the result of every
> >> streaming processing window to MySQL, it fails with the following error:
> >>
> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task
> >> not
> >> serializable: java.io.NotSerializableException:
> >> com.mysql.jdbc.JDBC4PreparedStatement
> >>
> >> I think it is because the statement object should be serializable, in
> >> order
> >> to be executed on the worker node. Has someone tried the similar cases?
> >> Example code will be very helpful. My intension is to execute
> >> INSERT/UPDATE/DELETE/SELECT statements for each sliding window.
> >>
> >> Thanks,
> >> JC
> >>
> >>
> >>
> >> --
> >> View this message in context:
> >>
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html
> >> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> >> For additional commands, e-mail: user-help@spark.apache.org
> >>
> >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>

Re: Spark Streaming and database access (e.g. MySQL)

Posted by Sean Owen <so...@cloudera.com>.
... I'd call out that last bit as actually tricky: "close off the driver"

See this message for the right-est way to do that, along with the
right way to open DB connections remotely instead of trying to
serialize them:

http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dRNAg@mail.gmail.com%3E

On Sun, Sep 7, 2014 at 4:19 PM, Mayur Rustagi <ma...@gmail.com> wrote:
> Standard pattern is to initialize the mysql jdbc driver in your mappartition
> call , update database & then close off the driver.
> Couple of gotchas
> 1. New driver initiated for all your partitions
> 2. If the effect(inserts & updates) is not idempotent, so if your server
> crashes, Spark will replay updates to mysql & may cause data corruption.
>
>
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi
>
>
> On Sun, Sep 7, 2014 at 11:54 AM, jchen <jc...@pivotal.io> wrote:
>>
>> Hi,
>>
>> Has someone tried using Spark Streaming with MySQL (or any other
>> database/data store)? I can write to MySQL at the beginning of the driver
>> application. However, when I am trying to write the result of every
>> streaming processing window to MySQL, it fails with the following error:
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> not
>> serializable: java.io.NotSerializableException:
>> com.mysql.jdbc.JDBC4PreparedStatement
>>
>> I think it is because the statement object should be serializable, in
>> order
>> to be executed on the worker node. Has someone tried the similar cases?
>> Example code will be very helpful. My intension is to execute
>> INSERT/UPDATE/DELETE/SELECT statements for each sliding window.
>>
>> Thanks,
>> JC
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Re: Spark Streaming and database access (e.g. MySQL)

Posted by Mayur Rustagi <ma...@gmail.com>.
Standard pattern is to initialize the mysql jdbc driver in your
mappartition call , update database & then close off the driver.
Couple of gotchas
1. New driver initiated for all your partitions
2. If the effect(inserts & updates) is not idempotent, so if your server
crashes, Spark will replay updates to mysql & may cause data corruption.


Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>


On Sun, Sep 7, 2014 at 11:54 AM, jchen <jc...@pivotal.io> wrote:

> Hi,
>
> Has someone tried using Spark Streaming with MySQL (or any other
> database/data store)? I can write to MySQL at the beginning of the driver
> application. However, when I am trying to write the result of every
> streaming processing window to MySQL, it fails with the following error:
>
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not
> serializable: java.io.NotSerializableException:
> com.mysql.jdbc.JDBC4PreparedStatement
>
> I think it is because the statement object should be serializable, in order
> to be executed on the worker node. Has someone tried the similar cases?
> Example code will be very helpful. My intension is to execute
> INSERT/UPDATE/DELETE/SELECT statements for each sliding window.
>
> Thanks,
> JC
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
> For additional commands, e-mail: user-help@spark.apache.org
>
>