You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Peter Liu <pe...@gmail.com> on 2018/10/16 16:38:17 UTC

Re: configure yarn to use more vcores as the node provides?

Hi Khaled,

the 50-2-3g I mentioned below is meant for the --conf spark.executor.*
config, in particular,  spark.executor.instances=50,  spark.executor.cores=2
and  spark.executor.memory=3g.
for each run, I configured the streaming producer and kafka broker to have
the partitions aligned with the consumer side (in this case, partition is
100), viewable on the spark UI.

yarn scheduler does not seem to check the actual hardware threads (logical
cores) on the actual node. on the other hand, there seems to be some
mechanism in yarn to allow overcommit for certain jobs (with high or low
watermark etc).

not sure  how this should work in a good practice. Would appreciate any
hint from the experts here.

Thanks for your reply!

Peter / Gang

On Tue, Oct 16, 2018 at 3:43 AM Khaled Zaouk <kh...@gmail.com> wrote:

> Hi Peter,
>
> I actually meant the spark configuration that you put in your spark-submit
> program (such as --conf spark.executor.instances= ..., --conf
> spark.executor.memory= ..., etc...).
>
> I advice you to check the number of partitions that you get in each stage
> of your workload the Spark GUI while the workload is running. I feel like
> this number is beyond 80, and this is why overcommiting cpu cores can
> achieve better latency if the workload is not cpu intensive.
>
> Another question, did you try different values for spark.executor.cores?
> (for example 3, 4 or 5 cores per executor in addition to 2?) Try to play a
> little bit with this parameter and check how it affects your latency...
>
> Best,
>
> Khaled
>
>
>
> On Tue, Oct 16, 2018 at 3:06 AM Peter Liu <pe...@gmail.com> wrote:
>
>> Hi Khaled,
>>
>> I have attached the spark streaming config below in (a).
>> In case of the 100vcore run (see the initial email), I used 50 executors
>> where each executor has 2 vcores and 3g memory. For 70 vcore case, 35
>> executors, for 80 vcore case, 40 executors.
>> In the yarn config (yarn-site.xml, (b) below), the  available vcores set
>> over 80 (I call it "overcommit").
>>
>> Not sure if there is a more proper way to do this (overcommit) and what
>> would be the best practice in this type of situation (say, light cpu
>> workload in a dedicated yarn cluster) to increase the cpu utilization for a
>> better performance.
>>
>> Any help would be very much appreciated.
>>
>> Thanks ...
>>
>> Peter
>>
>> (a)
>>
>>    val df = spark
>>       .readStream
>>       .format("kafka")
>>       .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
>>       .option("startingOffsets", "latest")
>>       .option("subscribe", Variables.EVENTS_TOPIC)
>>       .option("kafkaConsumer.pollTimeoutMs", "5000")
>>       .load()
>>       .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS
>> TIMESTAMP)").as[(String, Timestamp)]
>>       .select(from_json($"value", mySchema).as("data"), $"timestamp")
>>       .select("data.*", "timestamp")
>>       .where($"event_type" === "view")
>>       .select($"ad_id", $"event_time")
>>       .join(campaigns.toSeq.toDS().cache(), Seq("ad_id"))
>>       .groupBy(millisTime(window($"event_time", "10
>> seconds").getField("start")) as 'time_window, $"campaign_id")
>>       .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
>>       .select(to_json(struct("*")) as 'value)
>>       .writeStream
>>       .format("kafka")
>>       .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
>> //original
>>       .option("topic", Variables.OUTPUT_TOPIC)
>>       .option("checkpointLocation",
>> s"/tmp/${java.util.UUID.randomUUID()}") //TBD: ram disk?
>>       .outputMode("update")
>>       .start()
>>
>> (b)
>> <property>
>>     <name>yarn.nodemanager.resource.cpu-vcores</name>
>>     <value>110</value>
>> </property>
>> <property>
>> <name>yarn.scheduler.maximum-allocation-vcores</name>
>> <value>110</value>
>> </property>
>>
>> On Mon, Oct 15, 2018 at 4:26 PM Khaled Zaouk <kh...@gmail.com>
>> wrote:
>>
>>> Hi Peter,
>>>
>>> What parameters are you putting in your spark streaming configuration?
>>> What are you putting as number of executor instances and how many cores per
>>> executor are you setting in your Spark job?
>>>
>>> Best,
>>>
>>> Khaled
>>>
>>> On Mon, Oct 15, 2018 at 9:18 PM Peter Liu <pe...@gmail.com> wrote:
>>>
>>>> Hi there,
>>>>
>>>> I have a system with 80 vcores and a relatively light spark streaming
>>>> workload. Overcomming the vcore resource (i.e. > 80) in the config (see (a)
>>>> below) seems to help to improve the average spark batch time (see (b)
>>>> below).
>>>>
>>>> Is there any best practice guideline on resource overcommit with cpu /
>>>> vcores, such as yarn config options, candidate cases ideal for
>>>> overcommiting vcores etc.?
>>>>
>>>> the slide below (from 2016 though) seems to address the memory
>>>> overcommit topic and hint a "future" topic on cpu overcommit:
>>>>
>>>> https://www.slideshare.net/HadoopSummit/investing-the-effects-of-overcommitting-yarn-resources
>>>> <https://urldefense.proofpoint.com/v2/url?u=https-3A__www.slideshare.net_HadoopSummit_investing-2Dthe-2Deffects-2Dof-2Dovercommitting-2Dyarn-2Dresources&d=DwMFaQ&c=jf_iaSHvJObTbx-siA1ZOg&r=9YF85k6Q86ELSbXl40mkGw&m=ZCbfeVtFh_TC0b2e0fobq62qrBKhQPtyBNfMsVcVzmo&s=UXeomeHkGRlHg9Bxgb81T98oH7zj7T6OmF4dsfhK0Sg&e=>
>>>>
>>>> Would like to know if this is a reasonable config practice and why this
>>>> is not achievable without overcommit. Any help/hint would be very much
>>>> appreciated!
>>>>
>>>> Thanks!
>>>>
>>>> Peter
>>>>
>>>> (a) yarn-site.xml
>>>> <property>
>>>>     <name>yarn.nodemanager.resource.cpu-vcores</name>
>>>>     <value>110</value>
>>>> </property>
>>>>
>>>> <property>
>>>> <name>yarn.scheduler.maximum-allocation-vcores</name>
>>>> <value>110</value>
>>>> </property>
>>>>
>>>>
>>>> (b)
>>>> FYI:
>>>> I have a system with 80 vcores and a relatively light spark streaming
>>>> workload. overcomming the vocore resource (here 100) seems to help the
>>>> average spark batch time. need more understanding on this practice.
>>>> Skylake (1 x 900K msg/sec) total batch# (avg) avg batch time in ms
>>>> (avg) avg user cpu (%) nw read (mb/sec)
>>>> 70vocres 178.20 8154.69 n/a n/a
>>>> 80vocres 177.40 7865.44 27.85 222.31
>>>> 100vcores 177.00 7,209.37 30.02 220.86
>>>>
>>>>