You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Hammad <ha...@flexilogix.com> on 2017/10/01 17:55:27 UTC

Fwd: Spark Streaming - Multiple Spark Contexts (SparkSQL) Performance

Hello,

*Background:*

I have Spark Streaming context;

SparkConf conf = new
SparkConf().setMaster("local[2]").setAppName("TransformerStreamPOC");
conf.set("spark.driver.allowMultipleContexts", "true");   *<== this*
JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(60));


that subscribes to certain kafka *topics*;

JavaInputDStream<ConsumerRecord<String, String>> stream =
        KafkaUtils.createDirectStream(
                jssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.<String, String>Subscribe(*topics*,
kafkaParams)
        );

when messages arrive in queue, I recursively process them as follows
(below code section will repeat in Question statement)

stream.foreachRDD(rdd -> {
//process here - below two scenarions code is inserted here

});


*Question starts here:*

Since I need to apply SparkSQL to received events in Queue - I create
SparkSession with two scenarios;

*1) Per partition one sparkSession (after
"spark.driver.allowMultipleContexts" set to true); so all events under
this partition are handled by same sparkSession*

rdd.foreachPartition(partition -> {
    SparkSession sparkSession = SparkSession
            .builder()
            .appName("Java Spark SQL basic example")
            .config("spark.some.config.option", "some-value")
            .getOrCreate();

    while (partition.hasNext()) {
      Dataset<Row> df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL,
"table_name", connectionProperties);

    }}

*2) Per event under each session; so each event under each queue under
each stream has one sparkSession;*

rdd.foreachPartition(partition -> {    while (partition.hasNext()) {
 SparkSession sparkSession = SparkSession.builder().appName("Java
Spark SQL basic example").config("spark.some.config.option",
"some-value").getOrCreate();

    Dataset<Row> df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL,
"table_name", connectionProperties);

    }}


Is it good practice to create multiple contexts (lets say 10 or 100)?
How does number of sparkContext to be allowed vs number of worker nodes
relate?
What are performance considerations with respect to scenario1 and scenario2?

I am looking for these answers as I feel there is more to what I understand
of performance w.r.t sparkContexts created by a streaming application.
Really appreciate your support in anticipation.

Hammad

Re: Spark Streaming - Multiple Spark Contexts (SparkSQL) Performance

Posted by Gerard Maas <ge...@gmail.com>.
Hammad,

The recommended way to implement this logic would be to:

Create a SparkSession.
Create a Streaming Context using the SparkContext embedded in the
SparkSession

Use the single SparkSession instance for the SQL operations within the
foreachRDD.
It's important to note that spark operations can process the complete
dataset. In this case, there's no need to do a perPartition or perElement
operation. (that would be the case if we were directly using the drivers
API and DB connections)

Reorganizing the code in the question a bit, we should have:

 SparkSession sparkSession = SparkSession
            .builder()
            .setMaster("local[2]").setAppName("TransformerStreamPOC")

            .config("spark.some.config.option", "some-value")
            .getOrCreate();

JavaStreamingContext jssc = new
JavaStreamingContext(sparkSession.sparkContext,
Durations.seconds(60));

// this dataset doesn't seem to depend on the received data, so we can
load it once.

Dataset<Row> baselineData =
sparkSession.read().jdbc(MYSQL_CONNECTION_URL, "table_name",
connectionProperties);

// create dstream

DStream<???> dstream = ...

... operations on dstream...

dstream.foreachRDD { rdd =>

    Dataset<???> incomingData = sparkSession.createDataset(rdd)

   ... do something the incoming dataset, eg. join with the baseline ...

   DataFrame joined =  incomingData.join(baselineData, ...)

   ... do something with joined ...

  }


kr, Gerard.

On Sun, Oct 1, 2017 at 7:55 PM, Hammad <ha...@flexilogix.com> wrote:

> Hello,
>
> *Background:*
>
> I have Spark Streaming context;
>
> SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("TransformerStreamPOC");
> conf.set("spark.driver.allowMultipleContexts", "true");   *<== this*
> JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(60));
>
>
> that subscribes to certain kafka *topics*;
>
> JavaInputDStream<ConsumerRecord<String, String>> stream =
>         KafkaUtils.createDirectStream(
>                 jssc,
>                 LocationStrategies.PreferConsistent(),
>                 ConsumerStrategies.<String, String>Subscribe(*topics*, kafkaParams)
>         );
>
> when messages arrive in queue, I recursively process them as follows (below code section will repeat in Question statement)
>
> stream.foreachRDD(rdd -> {
> //process here - below two scenarions code is inserted here
>
> });
>
>
> *Question starts here:*
>
> Since I need to apply SparkSQL to received events in Queue - I create SparkSession with two scenarios;
>
> *1) Per partition one sparkSession (after "spark.driver.allowMultipleContexts" set to true); so all events under this partition are handled by same sparkSession*
>
> rdd.foreachPartition(partition -> {
>     SparkSession sparkSession = SparkSession
>             .builder()
>             .appName("Java Spark SQL basic example")
>             .config("spark.some.config.option", "some-value")
>             .getOrCreate();
>
>     while (partition.hasNext()) {
>       Dataset<Row> df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL, "table_name", connectionProperties);
>
>     }}
>
> *2) Per event under each session; so each event under each queue under each stream has one sparkSession;*
>
> rdd.foreachPartition(partition -> {    while (partition.hasNext()) {    SparkSession sparkSession = SparkSession.builder().appName("Java Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate();
>
>     Dataset<Row> df = sparkSession.read().jdbc(MYSQL_CONNECTION_URL, "table_name", connectionProperties);
>
>     }}
>
>
> Is it good practice to create multiple contexts (lets say 10 or 100)?
> How does number of sparkContext to be allowed vs number of worker nodes
> relate?
> What are performance considerations with respect to scenario1 and
> scenario2?
>
> I am looking for these answers as I feel there is more to what I
> understand of performance w.r.t sparkContexts created by a streaming
> application.
> Really appreciate your support in anticipation.
>
> Hammad
>
>