You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by "יורי אולייניקוב <yurkao@gmail.com>" <Yuri> on 2021/01/16 13:20:42 UTC

Dynamic Spark metrics creation

Hi all,
I have a spark application with Arbitrary Stateful Aggregation implemented
with FlatMapGroupsWithStateFunction.

I want to make some statistics about incoming events inside
FlatMapGroupsWithStateFunction.
The statistics are made from some event property which on the one hand has
dynamic values but on the other hand - small finite set (thought unknown)
of values (e.g. country name).

So I thought to register dynamic metrics inside
FlatMapGroupsWithStateFunction but as far as I understand, this requires
accessing MetricsSystem via SparkEnv.get() which is unavailable from
executors.

Any thoughts/suggestions?

With best regards,
Yurii

Re: Dynamic Spark metrics creation

Posted by Ivan Petrov <ca...@gmail.com>.
Would custom accumulator work for you? It should be do-able for
Map[String,Long] too
https://stackoverflow.com/questions/42293798/how-to-create-custom-set-accumulator-i-e-setstring


‪вс, 17 янв. 2021 г. в 15:16, ‫"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"‬‎ <
yurkao@gmail.com>:‬

> Hey Jacek, I’ll clarify myself a bit:
> As bottom line I need following metrics being reported by structured
> streaming:
> Country-USA:7
> Country-Poland: 23
> Country-Brazil: 56
>
> The country names are included in incoming events and unknown at very
> beginning/application startup.
>
> Thus registering accumulator and binding it to metric source at driver
> side on application startup is impossible (unless you register with all
> possible country names - which is waste of Spark memory, polluting metrics
> namespace with 99% of metrics having zero value, and wasting the network
> bandwidth ).
>
>
> Отправлено с iPhone
>
> 17 янв. 2021 г., в 15:56, Jacek Laskowski <ja...@japila.pl> написал(а):
>
> 
> Hey Yurii,
>
> > which is unavailable from executors.
>
> Register it on the driver and use accumulators on executors to update the
> values (on the driver)?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books <https://books.japila.pl/>
> Follow me on https://twitter.com/jaceklaskowski
>
> <https://twitter.com/jaceklaskowski>
>
>
> ‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <
> yurkao@gmail.com> wrote:‬
>
>> Hi all,
>> I have a spark application with Arbitrary Stateful Aggregation
>> implemented with FlatMapGroupsWithStateFunction.
>>
>> I want to make some statistics about incoming events inside
>> FlatMapGroupsWithStateFunction.
>> The statistics are made from some event property which on the one hand
>> has dynamic values but on the other hand - small finite set (thought
>> unknown) of values (e.g. country name).
>>
>> So I thought to register dynamic metrics inside
>> FlatMapGroupsWithStateFunction but as far as I understand, this requires
>> accessing MetricsSystem via SparkEnv.get() which is unavailable from
>> executors.
>>
>> Any thoughts/suggestions?
>>
>> With best regards,
>> Yurii
>>
>>

Re: Dynamic Spark metrics creation

Posted by "Yuri Oleynikov (‫יורי אולייניקוב‬‎)" <yu...@gmail.com>.
Hey Jacek, I’ll clarify myself a bit:
As bottom line I need following metrics being reported by structured streaming:
Country-USA:7
Country-Poland: 23
Country-Brazil: 56

The country names are included in incoming events and unknown at very beginning/application startup.

Thus registering accumulator and binding it to metric source at driver side on application startup is impossible (unless you register with all possible country names - which is waste of Spark memory, polluting metrics namespace with 99% of metrics having zero value, and wasting the network bandwidth ).


Отправлено с iPhone

> 17 янв. 2021 г., в 15:56, Jacek Laskowski <ja...@japila.pl> написал(а):
> 
> 
> Hey Yurii,
> 
> > which is unavailable from executors.
> 
> Register it on the driver and use accumulators on executors to update the values (on the driver)?
> 
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://about.me/JacekLaskowski
> "The Internals Of" Online Books
> Follow me on https://twitter.com/jaceklaskowski
> 
> 
> 
> ‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <yu...@gmail.com> wrote:‬
>> Hi all, 
>> I have a spark application with Arbitrary Stateful Aggregation implemented with FlatMapGroupsWithStateFunction.
>> 
>> I want to make some statistics about incoming events inside FlatMapGroupsWithStateFunction.
>> The statistics are made from some event property which on the one hand has dynamic values but on the other hand - small finite set (thought unknown) of values (e.g. country name).
>> 
>> So I thought to register dynamic metrics inside  FlatMapGroupsWithStateFunction but as far as I understand, this requires accessing MetricsSystem via SparkEnv.get() which is unavailable from executors.
>> 
>> Any thoughts/suggestions? 
>> 
>> With best regards,
>> Yurii
>> 

Re: Dynamic Spark metrics creation

Posted by Jacek Laskowski <ja...@japila.pl>.
Hey Yurii,

> which is unavailable from executors.

Register it on the driver and use accumulators on executors to update the
values (on the driver)?

Pozdrawiam,
Jacek Laskowski
----
https://about.me/JacekLaskowski
"The Internals Of" Online Books <https://books.japila.pl/>
Follow me on https://twitter.com/jaceklaskowski

<https://twitter.com/jaceklaskowski>


‪On Sat, Jan 16, 2021 at 2:21 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <
yurkao@gmail.com> wrote:‬

> Hi all,
> I have a spark application with Arbitrary Stateful Aggregation implemented
> with FlatMapGroupsWithStateFunction.
>
> I want to make some statistics about incoming events inside
> FlatMapGroupsWithStateFunction.
> The statistics are made from some event property which on the one hand has
> dynamic values but on the other hand - small finite set (thought unknown)
> of values (e.g. country name).
>
> So I thought to register dynamic metrics inside
> FlatMapGroupsWithStateFunction but as far as I understand, this requires
> accessing MetricsSystem via SparkEnv.get() which is unavailable from
> executors.
>
> Any thoughts/suggestions?
>
> With best regards,
> Yurii
>
>