You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Christiaan Ras <ch...@semmelwise.nl> on 2018/07/19 10:51:07 UTC

[STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

I use the state function flatmapgroupswithstate to track state of a kafka stream. To further customize the state function I like to use a static datasource (JDBC) in the state function. This datasource contains data I like to join with the stream (as Iterator) within flatmapgroupswithstate.

When I try to access the JDBC source within flatmapgroupswithstate Spark execution freezes without any Exceptions or logging.
To verify the JDBC connection works, I also tried to access the source outside the state function and that works. So now I join the static source with streaming source before feeding it to flatmapgroupswithstate. It seems to work so far…

Any ideas why accessing the JDBC source within flatmapgroupswithstate could fail (freezes Spark execution)? Is it wise to use external datasources within flatmapgroupswithstate?

Thanks,
Chris



Re: [STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

Posted by Christiaan Ras <ch...@semmelwise.nl>.
Hi Gerard,

First, I like to thank you for your fast reply and for directing my question to the proper mailinglist!
I established the JDBC connection in the context of the state function (flatMapGroupsWithState). The JDBC connection is made by using the read in the SparkSession. Like below:
spark.read
                    .format("jdbc")
                    .option("url", s"jdbc:postgresql://${connection.host}/${connection.db}")
                    .option("dbtable", s"table")
                    .option("user", connection.user)
                    .option("password", connection.pwd)
                    .option("driver", "org.postgresql.Driver")
                    .option("numPartitions", "5")
                    .load()
                    .as(Encoders.product[Class])

I use a ‘shared’ SparkSession, initialized in main (run by Driver, I guess), but made accessible by Singleton to other classes. The class which handles the state function fetches the shared session from this singleton.
My test ran local with a single thread, so all logging should be visible on the console.

BTW: I now implemented the approach to join this datasource with the streaming source before feeding it the state function. That works! But I am still curious how to do this in flatmapgroupswithstate? Or that the state functions have not been designed to do such things…

Regards,

Chris

From: Gerard Maas <ge...@gmail.com>
Date: Thursday, 19 July 2018 at 15:20
To: Christiaan Ras <ch...@semmelwise.nl>, spark users <us...@spark.apache.org>
Subject: Re: [STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

Hi Chris,

Could you show the code you are using? When you mention "I like to use a static datasource (JDBC) in the state function" do you refer to a DataFrame from a JDBC source or an independent JDBC connection?

The key point to consider is that the flatMapGroupsWithState function must be serializable. Its execution happens in the workers of a Spark job.

If you are using a JDBC connection, you need to make sure the connection is made in the context of the function. JDBC connections are not serializable.
Likewise, Dataset/DataFrames only function in the driver where they are defined. They are bound to the Spark Session in the driver and it does not make sense to access them in a remote executor.

Make sure you check the executor logs as well. There might be a NullPointerException lurking somewhere in your logs.

met vriendelijke groeten, Gerard.

PS: spark-dev (dev@spark.apache.org<ma...@spark.apache.org>) is for discussions about open source development of the Spark project.
For general questions like this, use the user's  mailing list (user@spark.apache.org<ma...@spark.apache.org>)  (note that I changed that address in the to: )

On Thu, Jul 19, 2018 at 12:51 PM Christiaan Ras <ch...@semmelwise.nl>> wrote:
I use the state function flatmapgroupswithstate to track state of a kafka stream. To further customize the state function I like to use a static datasource (JDBC) in the state function. This datasource contains data I like to join with the stream (as Iterator) within flatmapgroupswithstate.

When I try to access the JDBC source within flatmapgroupswithstate Spark execution freezes without any Exceptions or logging.
To verify the JDBC connection works, I also tried to access the source outside the state function and that works. So now I join the static source with streaming source before feeding it to flatmapgroupswithstate. It seems to work so far…

Any ideas why accessing the JDBC source within flatmapgroupswithstate could fail (freezes Spark execution)? Is it wise to use external datasources within flatmapgroupswithstate?

Thanks,
Chris



Re: [STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

Posted by Gerard Maas <ge...@gmail.com>.
Hi Chris,

Could you show the code you are using? When you mention "I like to use a
static datasource (JDBC) in the state function" do you refer to a DataFrame
from a JDBC source or an independent JDBC connection?

The key point to consider is that the flatMapGroupsWithState function must
be serializable. Its execution happens in the workers of a Spark job.

If you are using a JDBC connection, you need to make sure the connection is
made in the context of the function. JDBC connections are not serializable.
Likewise, Dataset/DataFrames only function in the driver where they are
defined. They are bound to the Spark Session in the driver and it does not
make sense to access them in a remote executor.

Make sure you check the executor logs as well. There might be a
NullPointerException lurking somewhere in your logs.

met vriendelijke groeten, Gerard.

PS: spark-dev (dev@spark.apache.org) is for discussions about open source
development of the Spark project.
For general questions like this, use the user's  mailing list (
user@spark.apache.org)  (note that I changed that address in the to: )

On Thu, Jul 19, 2018 at 12:51 PM Christiaan Ras <
christiaan.ras@semmelwise.nl> wrote:

> I use the state function flatmapgroupswithstate to track state of a kafka
> stream. To further customize the state function I like to use a static
> datasource (JDBC) in the state function. This datasource contains data I
> like to join with the stream (as Iterator) within flatmapgroupswithstate.
>
>
>
> When I try to access the JDBC source within flatmapgroupswithstate Spark
> execution freezes without any Exceptions or logging.
>
> To verify the JDBC connection works, I also tried to access the source
> outside the state function and that works. So now I join the static source
> with streaming source before feeding it to flatmapgroupswithstate. It seems
> to work so far…
>
>
>
> Any ideas why accessing the JDBC source within flatmapgroupswithstate
> could fail (freezes Spark execution)? Is it wise to use external
> datasources within flatmapgroupswithstate?
>
>
>
> Thanks,
>
> Chris
>
>
>
>
>