You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Yan Fang <ya...@gmail.com> on 2014/07/17 22:37:18 UTC

unserializable object in Spark Streaming context

Hi guys,

need some help in this problem. In our use case, we need to continuously
insert values into the database. So our approach is to create the jdbc
object in the main method and then do the inserting operation in the
DStream foreachRDD operation. Is this approach reasonable?

Then the problem comes: since we are using com.mysql.jdbc.java, which is
unserializable, we keep seeing the notSerializableException. I think that
is because Spark Streaming is trying to serialize and then checkpoint the
whole class which contains the StreamingContext, not only the
StreamingContext object, right? Or other reason to trigger the serialize
operation? Any workaround for this? (except not using the
com.mysql.jdbc.java)

Thank you.

Cheers,
Fang, Yan
yanfang724@gmail.com
+1 (206) 849-4108

Re: unserializable object in Spark Streaming context

Posted by Tathagata Das <ta...@gmail.com>.
Actually, let me clarify further. There are number of possibilities.

1. The easier, less efficient way is to create a connection object every
time you do foreachPartition (as shown in the pseudocode earlier in the
thread). For each partition, you create a connection, use it to push a all
the records in the partition, and then close it. You dont even need to
create a singleton in that case. The cons of this method is
   - You are not reusing connection across tasks and jobs. So you will be
creating a lot of connections to the database, which may or may not be fine.
   - It will get worse if you partitions are tiny and pushing each
partition takes few 100ms or few seconds (as possible with Spark Streaming).

2. The slightly harder, but more efficient way would be to use singletons,
which can contain one connection, or maintain a connection pool. Then
connections in the pool are created on demand, but not explicitly closed at
the end of the task, and are reused across tasks and jobs. In that case,
closing the connection would require some kind of timeout mechanism as I
explained in the previous post. Care also need to be taken if these
connections are threadsafe or not.

Hope this helps!

TD








On Fri, Jul 18, 2014 at 8:14 PM, Tathagata Das <ta...@gmail.com>
wrote:

> Thats, a good question. My first reach is timeout. Timing out after 10s of
> seconds should be sufficient. So there should be a timer in the singleton
> that runs a check every second, on when the singleton was last used, and
> closes the connections after a time out. Any attempts to use the connection
> again will create a new connection.
>
> TD
>
>
> On Fri, Jul 18, 2014 at 7:59 PM, Gino Bustelo <lb...@gmail.com> wrote:
>
>> I get TD's recommendation of sharing a connection among tasks. Now, is
>> there a good way to determine when to close connections?
>>
>> Gino B.
>>
>> On Jul 17, 2014, at 7:05 PM, Yan Fang <ya...@gmail.com> wrote:
>>
>> Hi Sean,
>>
>> Thank you. I see your point. What I was thinking is that, do computation
>> in a distributed fashion and do the storing from a single place. But you
>> are right, having multiple DB connections actually is fine.
>>
>> Thanks for answering my questions. That helps me understand the system.
>>
>> Cheers,
>>
>> Fang, Yan
>> yanfang724@gmail.com
>> +1 (206) 849-4108
>>
>>
>> On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>>> On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang <ya...@gmail.com> wrote:
>>> > Thank you for the help. If I use TD's approache, it works and there is
>>> no
>>> > exception. Only drawback is that it will create many connections to
>>> the DB,
>>> > which I was trying to avoid.
>>>
>>> Connection-like objects aren't data that can be serialized. What would
>>> it mean to share one connection with N workers? that they all connect
>>> back to the driver, and through one DB connection there? this defeats
>>> the purpose of distributed computing. You want multiple DB
>>> connections. You can limit the number of partitions if needed.
>>>
>>>
>>> > Here is a snapshot of my code. Mark as red for the important code.
>>> What I
>>> > was thinking is that, if I call the collect() method, Spark Streaming
>>> will
>>> > bring the data to the driver and then the db object does not need to
>>> be sent
>>>
>>> The Function you pass to foreachRDD() has a reference to db though.
>>> That's what is making it be serialized.
>>>
>>> > to executors. My observation is that, thought exceptions are thrown,
>>> the
>>> > insert function still works. Any thought about that? Also paste the
>>> log in
>>> > case it helps .http://pastebin.com/T1bYvLWB
>>>
>>> Any executors that run locally might skip the serialization and
>>> succeed (?) but I don't think the remote executors can be succeeding.
>>>
>>
>>
>

Re: unserializable object in Spark Streaming context

Posted by Tathagata Das <ta...@gmail.com>.
Thats, a good question. My first reach is timeout. Timing out after 10s of
seconds should be sufficient. So there should be a timer in the singleton
that runs a check every second, on when the singleton was last used, and
closes the connections after a time out. Any attempts to use the connection
again will create a new connection.

TD


On Fri, Jul 18, 2014 at 7:59 PM, Gino Bustelo <lb...@gmail.com> wrote:

> I get TD's recommendation of sharing a connection among tasks. Now, is
> there a good way to determine when to close connections?
>
> Gino B.
>
> On Jul 17, 2014, at 7:05 PM, Yan Fang <ya...@gmail.com> wrote:
>
> Hi Sean,
>
> Thank you. I see your point. What I was thinking is that, do computation
> in a distributed fashion and do the storing from a single place. But you
> are right, having multiple DB connections actually is fine.
>
> Thanks for answering my questions. That helps me understand the system.
>
> Cheers,
>
> Fang, Yan
> yanfang724@gmail.com
> +1 (206) 849-4108
>
>
> On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen <so...@cloudera.com> wrote:
>
>> On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang <ya...@gmail.com> wrote:
>> > Thank you for the help. If I use TD's approache, it works and there is
>> no
>> > exception. Only drawback is that it will create many connections to the
>> DB,
>> > which I was trying to avoid.
>>
>> Connection-like objects aren't data that can be serialized. What would
>> it mean to share one connection with N workers? that they all connect
>> back to the driver, and through one DB connection there? this defeats
>> the purpose of distributed computing. You want multiple DB
>> connections. You can limit the number of partitions if needed.
>>
>>
>> > Here is a snapshot of my code. Mark as red for the important code. What
>> I
>> > was thinking is that, if I call the collect() method, Spark Streaming
>> will
>> > bring the data to the driver and then the db object does not need to be
>> sent
>>
>> The Function you pass to foreachRDD() has a reference to db though.
>> That's what is making it be serialized.
>>
>> > to executors. My observation is that, thought exceptions are thrown, the
>> > insert function still works. Any thought about that? Also paste the log
>> in
>> > case it helps .http://pastebin.com/T1bYvLWB
>>
>> Any executors that run locally might skip the serialization and
>> succeed (?) but I don't think the remote executors can be succeeding.
>>
>
>

Re: unserializable object in Spark Streaming context

Posted by Gino Bustelo <lb...@gmail.com>.
I get TD's recommendation of sharing a connection among tasks. Now, is there a good way to determine when to close connections? 

Gino B.

> On Jul 17, 2014, at 7:05 PM, Yan Fang <ya...@gmail.com> wrote:
> 
> Hi Sean,
> 
> Thank you. I see your point. What I was thinking is that, do computation in a distributed fashion and do the storing from a single place. But you are right, having multiple DB connections actually is fine.
> 
> Thanks for answering my questions. That helps me understand the system.
> 
> Cheers,
> 
> Fang, Yan
> yanfang724@gmail.com
> +1 (206) 849-4108
> 
> 
>> On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen <so...@cloudera.com> wrote:
>> On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang <ya...@gmail.com> wrote:
>> > Thank you for the help. If I use TD's approache, it works and there is no
>> > exception. Only drawback is that it will create many connections to the DB,
>> > which I was trying to avoid.
>> 
>> Connection-like objects aren't data that can be serialized. What would
>> it mean to share one connection with N workers? that they all connect
>> back to the driver, and through one DB connection there? this defeats
>> the purpose of distributed computing. You want multiple DB
>> connections. You can limit the number of partitions if needed.
>> 
>> 
>> > Here is a snapshot of my code. Mark as red for the important code. What I
>> > was thinking is that, if I call the collect() method, Spark Streaming will
>> > bring the data to the driver and then the db object does not need to be sent
>> 
>> The Function you pass to foreachRDD() has a reference to db though.
>> That's what is making it be serialized.
>> 
>> > to executors. My observation is that, thought exceptions are thrown, the
>> > insert function still works. Any thought about that? Also paste the log in
>> > case it helps .http://pastebin.com/T1bYvLWB
>> 
>> Any executors that run locally might skip the serialization and
>> succeed (?) but I don't think the remote executors can be succeeding.
> 

Re: unserializable object in Spark Streaming context

Posted by Yan Fang <ya...@gmail.com>.
Hi Sean,

Thank you. I see your point. What I was thinking is that, do computation in
a distributed fashion and do the storing from a single place. But you are
right, having multiple DB connections actually is fine.

Thanks for answering my questions. That helps me understand the system.

Cheers,

Fang, Yan
yanfang724@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 2:53 PM, Sean Owen <so...@cloudera.com> wrote:

> On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang <ya...@gmail.com> wrote:
> > Thank you for the help. If I use TD's approache, it works and there is no
> > exception. Only drawback is that it will create many connections to the
> DB,
> > which I was trying to avoid.
>
> Connection-like objects aren't data that can be serialized. What would
> it mean to share one connection with N workers? that they all connect
> back to the driver, and through one DB connection there? this defeats
> the purpose of distributed computing. You want multiple DB
> connections. You can limit the number of partitions if needed.
>
>
> > Here is a snapshot of my code. Mark as red for the important code. What I
> > was thinking is that, if I call the collect() method, Spark Streaming
> will
> > bring the data to the driver and then the db object does not need to be
> sent
>
> The Function you pass to foreachRDD() has a reference to db though.
> That's what is making it be serialized.
>
> > to executors. My observation is that, thought exceptions are thrown, the
> > insert function still works. Any thought about that? Also paste the log
> in
> > case it helps .http://pastebin.com/T1bYvLWB
>
> Any executors that run locally might skip the serialization and
> succeed (?) but I don't think the remote executors can be succeeding.
>

Re: unserializable object in Spark Streaming context

Posted by Sean Owen <so...@cloudera.com>.
On Thu, Jul 17, 2014 at 10:39 PM, Yan Fang <ya...@gmail.com> wrote:
> Thank you for the help. If I use TD's approache, it works and there is no
> exception. Only drawback is that it will create many connections to the DB,
> which I was trying to avoid.

Connection-like objects aren't data that can be serialized. What would
it mean to share one connection with N workers? that they all connect
back to the driver, and through one DB connection there? this defeats
the purpose of distributed computing. You want multiple DB
connections. You can limit the number of partitions if needed.


> Here is a snapshot of my code. Mark as red for the important code. What I
> was thinking is that, if I call the collect() method, Spark Streaming will
> bring the data to the driver and then the db object does not need to be sent

The Function you pass to foreachRDD() has a reference to db though.
That's what is making it be serialized.

> to executors. My observation is that, thought exceptions are thrown, the
> insert function still works. Any thought about that? Also paste the log in
> case it helps .http://pastebin.com/T1bYvLWB

Any executors that run locally might skip the serialization and
succeed (?) but I don't think the remote executors can be succeeding.

Re: unserializable object in Spark Streaming context

Posted by Yan Fang <ya...@gmail.com>.
Hi Marcelo and TD,

Thank you for the help. If I use TD's approache, it works and there is no
exception. Only drawback is that it will create many connections to the DB,
which I was trying to avoid.

Here is a snapshot of my code. Mark as red for the important code. What I
was thinking is that, if I call the collect() method, Spark Streaming will
bring the data to the driver and then the db object does not need to be
sent to executors. My observation is that, thought exceptions are thrown,
the insert function still works. Any thought about that? Also paste the log
in case it helps .http://pastebin.com/T1bYvLWB

================== code ==================

       SparkConf sparkConf = new SparkConf().setAppName("balababala");
       JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                new Duration(2000));

        final MySQLHelper db = new MySQLHelper();  // this class contain
instantiate the jdbc driver.

       /**
       /* a few DStream transformations
       **/

        JavaPairDStream<String, MachineState> noiseState = machineIdNoise
                .updateStateByKey(getUpdateFunction());

        JavaPairDStream<String, Tuple2<MachineState, Integer>>
noiseStateTemperature = noiseState.join(machineIdTemperature);

        noiseStateTemperature
        .foreachRDD(new Function<JavaPairRDD<String, Tuple2<MachineState,
Integer>>, Void>() {
            @Override
            public Void call(JavaPairRDD<String, Tuple2<MachineState,
Integer>> arg0)
                    throws Exception {
                List<Tuple2<String, Tuple2<MachineState, Integer>>> list =
arg0
                        .collect();
                for (Tuple2<String, Tuple2<MachineState, Integer>> tuple :
list) {
                    String machineId
                    String machineState
                    db.insertAverages(machineId, machineState);
                }
                return null;
            }
        });

============ end code ===============

Thank you. If there is no other workaround, I may use TD's approach because
it is the only option.

Best,

Fang, Yan
yanfang724@gmail.com
+1 (206) 849-4108


On Thu, Jul 17, 2014 at 1:54 PM, Tathagata Das <ta...@gmail.com>
wrote:

> And if Marcelo's guess is correct, then the right way to do this would be
> to lazily  / dynamically create the jdbc connection server as a singleton
> in the workers/executors and use that. Something like this.
>
>
> dstream.foreachRDD(rdd => {
>    rdd.foreachPartition((iterator: Iterator[...]) => {
>        val driver = JDBCDriver.getSingleton()   // this will create the
> single jdbc server in the worker, if it does not exist
>        // loop through iterator to get the records in the partition and
> use the driver to push them out to the DB
>    }
> }
>
> This will avoid the JDBC server being serialized as part of the closure /
> DStream checkpoint.
>
> TD
>
>
> On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin <va...@cloudera.com>
> wrote:
>
>> Could you share some code (or pseudo-code)?
>>
>> Sounds like you're instantiating the JDBC connection in the driver,
>> and using it inside a closure that would be run in a remote executor.
>> That means that the connection object would need to be serializable.
>> If that sounds like what you're doing, it won't work.
>>
>>
>> On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang <ya...@gmail.com> wrote:
>> > Hi guys,
>> >
>> > need some help in this problem. In our use case, we need to continuously
>> > insert values into the database. So our approach is to create the jdbc
>> > object in the main method and then do the inserting operation in the
>> DStream
>> > foreachRDD operation. Is this approach reasonable?
>> >
>> > Then the problem comes: since we are using com.mysql.jdbc.java, which is
>> > unserializable, we keep seeing the notSerializableException. I think
>> that is
>> > because Spark Streaming is trying to serialize and then checkpoint the
>> whole
>> > class which contains the StreamingContext, not only the StreamingContext
>> > object, right? Or other reason to trigger the serialize operation? Any
>> > workaround for this? (except not using the com.mysql.jdbc.java)
>> >
>> > Thank you.
>> >
>> > Cheers,
>> > Fang, Yan
>> > yanfang724@gmail.com
>> > +1 (206) 849-4108
>>
>>
>>
>> --
>> Marcelo
>>
>
>

Re: unserializable object in Spark Streaming context

Posted by Tathagata Das <ta...@gmail.com>.
And if Marcelo's guess is correct, then the right way to do this would be
to lazily  / dynamically create the jdbc connection server as a singleton
in the workers/executors and use that. Something like this.


dstream.foreachRDD(rdd => {
   rdd.foreachPartition((iterator: Iterator[...]) => {
       val driver = JDBCDriver.getSingleton()   // this will create the
single jdbc server in the worker, if it does not exist
       // loop through iterator to get the records in the partition and use
the driver to push them out to the DB
   }
}

This will avoid the JDBC server being serialized as part of the closure /
DStream checkpoint.

TD


On Thu, Jul 17, 2014 at 1:42 PM, Marcelo Vanzin <va...@cloudera.com> wrote:

> Could you share some code (or pseudo-code)?
>
> Sounds like you're instantiating the JDBC connection in the driver,
> and using it inside a closure that would be run in a remote executor.
> That means that the connection object would need to be serializable.
> If that sounds like what you're doing, it won't work.
>
>
> On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang <ya...@gmail.com> wrote:
> > Hi guys,
> >
> > need some help in this problem. In our use case, we need to continuously
> > insert values into the database. So our approach is to create the jdbc
> > object in the main method and then do the inserting operation in the
> DStream
> > foreachRDD operation. Is this approach reasonable?
> >
> > Then the problem comes: since we are using com.mysql.jdbc.java, which is
> > unserializable, we keep seeing the notSerializableException. I think
> that is
> > because Spark Streaming is trying to serialize and then checkpoint the
> whole
> > class which contains the StreamingContext, not only the StreamingContext
> > object, right? Or other reason to trigger the serialize operation? Any
> > workaround for this? (except not using the com.mysql.jdbc.java)
> >
> > Thank you.
> >
> > Cheers,
> > Fang, Yan
> > yanfang724@gmail.com
> > +1 (206) 849-4108
>
>
>
> --
> Marcelo
>

Re: unserializable object in Spark Streaming context

Posted by Marcelo Vanzin <va...@cloudera.com>.
Could you share some code (or pseudo-code)?

Sounds like you're instantiating the JDBC connection in the driver,
and using it inside a closure that would be run in a remote executor.
That means that the connection object would need to be serializable.
If that sounds like what you're doing, it won't work.


On Thu, Jul 17, 2014 at 1:37 PM, Yan Fang <ya...@gmail.com> wrote:
> Hi guys,
>
> need some help in this problem. In our use case, we need to continuously
> insert values into the database. So our approach is to create the jdbc
> object in the main method and then do the inserting operation in the DStream
> foreachRDD operation. Is this approach reasonable?
>
> Then the problem comes: since we are using com.mysql.jdbc.java, which is
> unserializable, we keep seeing the notSerializableException. I think that is
> because Spark Streaming is trying to serialize and then checkpoint the whole
> class which contains the StreamingContext, not only the StreamingContext
> object, right? Or other reason to trigger the serialize operation? Any
> workaround for this? (except not using the com.mysql.jdbc.java)
>
> Thank you.
>
> Cheers,
> Fang, Yan
> yanfang724@gmail.com
> +1 (206) 849-4108



-- 
Marcelo