You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Gavin Ray <ra...@gmail.com> on 2022/05/16 16:55:38 UTC

[Spark SQL]: Configuring/Using Spark + Catalyst optimally for read-heavy transactional workloads in JDBC sources?

Hi all,

I've not got much experience with Spark, but have been reading the Catalyst
and
Datasources V2 code/tests to try to get a basic understanding.

I'm interested in trying Catalyst's query planner + optimizer for queries
spanning one-or-more JDBC sources.

Somewhat unusually, I'd like to do this with as minimal latency as possible
to
see what the experience for standard line-of-business apps is like (~90/10
read/write ratio).
Few rows would be returned in the reads (something on the order of
1-to-1,000).

My question is: What configuration settings would you want to use for
something
like this?

I imagine that doing codegen/JIT compilation of the query plan might not be
worth the cost, so maybe you'd want to disable that and do interpretation?

And possibly you'd want to use query plan config/rules that reduce the time
spent in planning, trading efficiency for latency?

Does anyone know how you'd configure Spark to test something like this?

Would greatly appreciate any input (even if it's "This is a bad idea and
will
never work well").

Thank you =)

Re: [Spark SQL]: Configuring/Using Spark + Catalyst optimally for read-heavy transactional workloads in JDBC sources?

Posted by Gavin Ray <ra...@gmail.com>.
I found a repo which replaces RDD's with native JVM iterators, and
optimizes Spark for single-node/in-memory workloads:
direct-spark-sql/direct-spark-sql: a hyper-optimized single-node(local)
version of spark sql engine, which's fundamental data structure is scala
Iterator rather than RDD. (github.com)
<https://github.com/direct-spark-sql/direct-spark-sql>

Updating this to last Spark 2.x and Scala 2.12, it's not any faster than
the latest Spark snapshot's default configuration:

val stopwatch = StopWatch.createStarted()
> val df = spark.sqlDirectly("SELECT name, ROW_NUMBER() OVER (PARTITION BY
> genda ORDER BY name) as row FROM people")
> stopwatch.stop()
> println("stopwatch:" + stopwatch.getTime(TimeUnit.MILLISECONDS))
> println(df.data.mkString(","))


Output:
===========
stopwatch:222
[a,1],[c,2],[bbb,1],[ddd,2],[e,3]

Looking at the profile, it seems like this might be because it's repeatedly
calling this, though?
"org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen()"

There appear to be a bit over a dozen "Codegen completed" lines logged for
each query, which seems odd.

If this is solid code, then it would seem like the overhead of the
multi-node support in Spark (broadcasts, transfers, etc) are negligible in
local mode at least.

[image: image.png]

On Wed, May 18, 2022 at 9:35 PM Gavin Ray <ra...@gmail.com> wrote:

> Following up on this in case anyone runs across it in the archives in the
> future
> From reading through the config docs and trying various combinations, I've
> discovered that:
>
> - You don't want to disable codegen. This roughly doubled the time to
> perform simple, few-column/few-row queries from basic testing
>   -  Can test this by setting an internal property after setting
> "spark.testing" to "true" in system properties
>
>
>> System.setProperty("spark.testing", "true")
>> val spark = SparkSession.builder()
>>   .config("spark.sql.codegen.wholeStage", "false")
>>   .config("spark.sql.codegen.factoryMode", "NO_CODEGEN")
>>
>
> -  The following gave the best performance. I don't know if enabling CBO
> did much.
>
> val spark = SparkSession.builder()
>> .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>> .config("spark.kryo.unsafe", "true")
>> .config("spark.sql.adaptive.enabled", "true")
>> .config("spark.sql.cbo.enabled", "true")
>> .config("spark.sql.cbo.joinReorder.dp.star.filter", "true")
>> .config("spark.sql.cbo.joinReorder.enabled", "true")
>> .config("spark.sql.cbo.planStats.enabled", "true")
>> .config("spark.sql.cbo.starSchemaDetection", "true")
>
>
> If you're running on more recent JDK's, you'll need to set "--add-opens"
> flags for a few namespaces for "kryo.unsafe" to work.
>
>
>
> On Mon, May 16, 2022 at 12:55 PM Gavin Ray <ra...@gmail.com> wrote:
>
>> Hi all,
>>
>> I've not got much experience with Spark, but have been reading the
>> Catalyst and
>> Datasources V2 code/tests to try to get a basic understanding.
>>
>> I'm interested in trying Catalyst's query planner + optimizer for queries
>> spanning one-or-more JDBC sources.
>>
>> Somewhat unusually, I'd like to do this with as minimal latency as
>> possible to
>> see what the experience for standard line-of-business apps is like
>> (~90/10 read/write ratio).
>> Few rows would be returned in the reads (something on the order of
>> 1-to-1,000).
>>
>> My question is: What configuration settings would you want to use for
>> something
>> like this?
>>
>> I imagine that doing codegen/JIT compilation of the query plan might not
>> be
>> worth the cost, so maybe you'd want to disable that and do interpretation?
>>
>> And possibly you'd want to use query plan config/rules that reduce the
>> time
>> spent in planning, trading efficiency for latency?
>>
>> Does anyone know how you'd configure Spark to test something like this?
>>
>> Would greatly appreciate any input (even if it's "This is a bad idea and
>> will
>> never work well").
>>
>> Thank you =)
>>
>

Re: [Spark SQL]: Configuring/Using Spark + Catalyst optimally for read-heavy transactional workloads in JDBC sources?

Posted by Gavin Ray <ra...@gmail.com>.
Following up on this in case anyone runs across it in the archives in the
future
From reading through the config docs and trying various combinations, I've
discovered that:

- You don't want to disable codegen. This roughly doubled the time to
perform simple, few-column/few-row queries from basic testing
  -  Can test this by setting an internal property after setting
"spark.testing" to "true" in system properties


> System.setProperty("spark.testing", "true")
> val spark = SparkSession.builder()
>   .config("spark.sql.codegen.wholeStage", "false")
>   .config("spark.sql.codegen.factoryMode", "NO_CODEGEN")
>

-  The following gave the best performance. I don't know if enabling CBO
did much.

val spark = SparkSession.builder()
> .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> .config("spark.kryo.unsafe", "true")
> .config("spark.sql.adaptive.enabled", "true")
> .config("spark.sql.cbo.enabled", "true")
> .config("spark.sql.cbo.joinReorder.dp.star.filter", "true")
> .config("spark.sql.cbo.joinReorder.enabled", "true")
> .config("spark.sql.cbo.planStats.enabled", "true")
> .config("spark.sql.cbo.starSchemaDetection", "true")


If you're running on more recent JDK's, you'll need to set "--add-opens"
flags for a few namespaces for "kryo.unsafe" to work.



On Mon, May 16, 2022 at 12:55 PM Gavin Ray <ra...@gmail.com> wrote:

> Hi all,
>
> I've not got much experience with Spark, but have been reading the
> Catalyst and
> Datasources V2 code/tests to try to get a basic understanding.
>
> I'm interested in trying Catalyst's query planner + optimizer for queries
> spanning one-or-more JDBC sources.
>
> Somewhat unusually, I'd like to do this with as minimal latency as
> possible to
> see what the experience for standard line-of-business apps is like (~90/10
> read/write ratio).
> Few rows would be returned in the reads (something on the order of
> 1-to-1,000).
>
> My question is: What configuration settings would you want to use for
> something
> like this?
>
> I imagine that doing codegen/JIT compilation of the query plan might not be
> worth the cost, so maybe you'd want to disable that and do interpretation?
>
> And possibly you'd want to use query plan config/rules that reduce the time
> spent in planning, trading efficiency for latency?
>
> Does anyone know how you'd configure Spark to test something like this?
>
> Would greatly appreciate any input (even if it's "This is a bad idea and
> will
> never work well").
>
> Thank you =)
>