You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Something Something <ma...@gmail.com> on 2020/05/14 21:36:45 UTC

Using Spark Accumulators with Structured Streaming

In my structured streaming job I am updating Spark Accumulators in the
updateAcrossEvents method but they are always 0 when I try to print them in
my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )

The accumulators get incremented in 'updateAcrossEvents'. I've a
StreamingListener which writes values of the accumulators in
'onQueryProgress' method but in this method the Accumulators are ALWAYS
ZERO!

When I added log statements in the updateAcrossEvents, I could see that
these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works
fine which implies that the Accumulators are not getting distributed
correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an
"Action". That's not a solution here. This is a 'Stateful Structured
Streaming' job. Yes, I am also 'registering' them in SparkContext.

Re: unsubscribe

Posted by Hichame El Khalfi <hi...@elkhalfi.com>.
________________________________
From: Basavaraj <ra...@gmail.com>
Sent: Friday, May 15, 2020 9:12:01 PM
To: spark users
Subject: unsubscribe



unsubscribe

Posted by Basavaraj <ra...@gmail.com>.

Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
*Honestly, I don't know how to do this in Scala.* I tried something like
this...



*.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(  new
StateUpdater(myAcc))*

StateUpdater is similar to what Zhang has provided but it's NOT
compiling 'cause I need to return a 'Dataset'.


Here's the definition of mapGroupsWithState in Scala:

def mapGroupsWithState[S: Encoder, U: Encoder](
    timeoutConf: GroupStateTimeout)(
    func: (K, Iterator[V], GroupState[S]) => U): Dataset[U] = {



On Mon, Jun 8, 2020 at 12:07 PM Srinivas V <sr...@gmail.com> wrote:

> Ya, I had asked this question before. No one responded. By the way, what’s
> your actual name “Something something” if you don’t mind me asking?
>
> On Tue, Jun 9, 2020 at 12:27 AM Something Something <
> mailinglists19@gmail.com> wrote:
>
>> What is scary is this interface is marked as "experimental"
>>
>> @Experimental
>> @InterfaceStability.Evolving
>> public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
>>   R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
>> }
>>
>>
>>
>>
>> On Mon, Jun 8, 2020 at 11:54 AM Something Something <
>> mailinglists19@gmail.com> wrote:
>>
>>> Right, this is exactly how I've it right now. Problem is in the cluster
>>> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you
>>> will see what I mean.
>>>
>>> I think how Zhang is using will work. Will try & revert.
>>>
>>> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <sr...@gmail.com> wrote:
>>>
>>>>
>>>> You don’t need to have a separate class. I created that as it has lot
>>>> of code and logic in my case.
>>>> For you to quickly test you can use Zhang’s Scala code in this chain.
>>>> Pasting it below for your quick reference:
>>>>
>>>> ```scala
>>>>     spark.streams.addListener(new StreamingQueryListener {
>>>>       override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent):
>>>> Unit = {
>>>>         println(event.progress.id + " is on progress")
>>>>         println(s"My accu is ${myAcc.value} on query progress")
>>>>       }
>>>>         ...
>>>>     })
>>>>
>>>>     def mappingFunc(key: Long, values: Iterator[String], state:
>>>> GroupState[Long]): ... = {
>>>>       myAcc.add(1)
>>>>       println(s">>> key: $key => state: ${state}")
>>>>         ...
>>>>     }
>>>>
>>>>     val wordCounts = words
>>>>       .groupByKey(v => ...)
>>>>       .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func
>>>> = mappingFunc)
>>>>
>>>>     val query = wordCounts.writeStream
>>>>       .outputMode(OutputMode.Update)
>>>>
>>>>
>>>> On Mon, Jun 8, 2020 at 11:14 AM Something Something <
>>>> mailinglists19@gmail.com> wrote:
>>>>
>>>>> Great. I guess the trick is to use a separate class such as
>>>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into
>>>>> Scala. Will try it out & revert. Thanks for the tips.
>>>>>
>>>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <we...@outlook.com> wrote:
>>>>>
>>>>>> The following Java codes can work in my cluster environment:
>>>>>> ```
>>>>>>     .mapGroupsWithState((MapGroupsWithStateFunction<String, String,
>>>>>> Long, LeadingCharCount>) (key, values, state) -> {
>>>>>>                 myAcc.add(1);
>>>>>>                 <...>
>>>>>>                 state.update(newState);
>>>>>>                 return new LeadingCharCount(key, newState);
>>>>>>             },
>>>>>>             Encoders.LONG(),
>>>>>>             Encoders.bean(LeadingCharCount.class),
>>>>>>             GroupStateTimeout.ProcessingTimeTimeout())
>>>>>> ```
>>>>>>
>>>>>> Also works fine with my `StateUpdateTask`:
>>>>>> ```
>>>>>>     .mapGroupsWithState(
>>>>>>             new StateUpdateTask(myAcc),
>>>>>>             Encoders.LONG(),
>>>>>>             Encoders.bean(LeadingCharCount.class),
>>>>>>             GroupStateTimeout.ProcessingTimeTimeout());
>>>>>>
>>>>>> public class StateUpdateTask
>>>>>>             implements MapGroupsWithStateFunction<String, String,
>>>>>> Long, LeadingCharCount> {
>>>>>>         private LongAccumulator myAccInTask;
>>>>>>
>>>>>>         public StateUpdateTask(LongAccumulator acc) {
>>>>>>             this.myAccInTask = acc;
>>>>>>         }
>>>>>>
>>>>>>         @Override
>>>>>>         public LeadingCharCount call(String key, Iterator<String>
>>>>>> values, GroupState<Long> state) throws Exception {
>>>>>>             myAccInTask.add(1);
>>>>>>             <...>
>>>>>>             state.update(newState);
>>>>>>             return new LeadingCharCount(key, newState);
>>>>>>         }
>>>>>> }
>>>>>> ```
>>>>>>
>>>>>> --
>>>>>> Cheers,
>>>>>> -z
>>>>>>
>>>>>> On Tue, 2 Jun 2020 10:28:36 +0800
>>>>>> ZHANG Wei <we...@outlook.com> wrote:
>>>>>>
>>>>>> > Yes, verified on the cluster with 5 executors.
>>>>>> >
>>>>>> > --
>>>>>> > Cheers,
>>>>>> > -z
>>>>>> >
>>>>>> > On Fri, 29 May 2020 11:16:12 -0700
>>>>>> > Something Something <ma...@gmail.com> wrote:
>>>>>> >
>>>>>> > > Did you try this on the Cluster? Note: This works just fine under
>>>>>> 'Local'
>>>>>> > > mode.
>>>>>> > >
>>>>>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <we...@outlook.com>
>>>>>> wrote:
>>>>>> > >
>>>>>> > > > I can't reproduce the issue with my simple code:
>>>>>> > > > ```scala
>>>>>> > > >     spark.streams.addListener(new StreamingQueryListener {
>>>>>> > > >       override def onQueryProgress(event:
>>>>>> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
>>>>>> > > >         println(event.progress.id + " is on progress")
>>>>>> > > >         println(s"My accu is ${myAcc.value} on query progress")
>>>>>> > > >       }
>>>>>> > > >         ...
>>>>>> > > >     })
>>>>>> > > >
>>>>>> > > >     def mappingFunc(key: Long, values: Iterator[String], state:
>>>>>> > > > GroupState[Long]): ... = {
>>>>>> > > >       myAcc.add(1)
>>>>>> > > >       println(s">>> key: $key => state: ${state}")
>>>>>> > > >         ...
>>>>>> > > >     }
>>>>>> > > >
>>>>>> > > >     val wordCounts = words
>>>>>> > > >       .groupByKey(v => ...)
>>>>>> > > >       .mapGroupsWithState(timeoutConf =
>>>>>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>>>>>> > > >
>>>>>> > > >     val query = wordCounts.writeStream
>>>>>> > > >       .outputMode(OutputMode.Update)
>>>>>> > > >         ...
>>>>>> > > > ```
>>>>>> > > >
>>>>>> > > > I'm wondering if there were any errors can be found from driver
>>>>>> logs? The
>>>>>> > > > micro-batch
>>>>>> > > > exceptions won't terminate the streaming job running.
>>>>>> > > >
>>>>>> > > > For the following code, we have to make sure that
>>>>>> `StateUpdateTask` is
>>>>>> > > > started:
>>>>>> > > > >                 .mapGroupsWithState(
>>>>>> > > > >                         new
>>>>>> > > >
>>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>>> > > > > appConfig, accumulators),
>>>>>> > > > >                         Encoders.bean(ModelStateInfo.class),
>>>>>> > > > >                         Encoders.bean(ModelUpdate.class),
>>>>>> > > > >
>>>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>>>> > > >
>>>>>> > > > --
>>>>>> > > > Cheers,
>>>>>> > > > -z
>>>>>> > > >
>>>>>> > > > On Thu, 28 May 2020 19:59:31 +0530
>>>>>> > > > Srinivas V <sr...@gmail.com> wrote:
>>>>>> > > >
>>>>>> > > > > Giving the code below:
>>>>>> > > > > //accumulators is a class level variable in driver.
>>>>>> > > > >
>>>>>> > > > >  sparkSession.streams().addListener(new
>>>>>> StreamingQueryListener() {
>>>>>> > > > >             @Override
>>>>>> > > > >             public void onQueryStarted(QueryStartedEvent
>>>>>> queryStarted) {
>>>>>> > > > >                 logger.info("Query started: " +
>>>>>> queryStarted.id());
>>>>>> > > > >             }
>>>>>> > > > >             @Override
>>>>>> > > > >             public void onQueryTerminated(QueryTerminatedEvent
>>>>>> > > > > queryTerminated) {
>>>>>> > > > >                 logger.info("Query terminated: " +
>>>>>> > > > queryTerminated.id());
>>>>>> > > > >             }
>>>>>> > > > >             @Override
>>>>>> > > > >             public void onQueryProgress(QueryProgressEvent
>>>>>> > > > queryProgress) {
>>>>>> > > > >
>>>>>> > > > >
>>>>>> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>>>>>> > > > >                 long eventsReceived = 0;
>>>>>> > > > >                 long eventsExpired = 0;
>>>>>> > > > >                 long eventSentSuccess = 0;
>>>>>> > > > >                 try {
>>>>>> > > > >                     eventsReceived =
>>>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>>>>>> > > > >                     eventsExpired =
>>>>>> > > > >
>>>>>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>>>>>> > > > >                     eventSentSuccess =
>>>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>>>>>> > > > >                 } catch (MissingKeyException e) {
>>>>>> > > > >                     logger.error("Accumulator key not found
>>>>>> due to
>>>>>> > > > > Exception {}", e.getMessage());
>>>>>> > > > >                 }
>>>>>> > > > >                 logger.info("Events Received:{}",
>>>>>> eventsReceived);
>>>>>> > > > >                 logger.info("Events State Expired:{}",
>>>>>> eventsExpired);
>>>>>> > > > >                 logger.info("Events Sent Success:{}",
>>>>>> eventSentSuccess);
>>>>>> > > > >                 logger.info("Query made progress - batchId:
>>>>>> {}
>>>>>> > > > > numInputRows:{} inputRowsPerSecond:{}
>>>>>> processedRowsPerSecond:{}
>>>>>> > > > > durationMs:{}" ,
>>>>>> > > > >                         queryProgress.progress().batchId(),
>>>>>> > > > > queryProgress.progress().numInputRows(),
>>>>>> > > > > queryProgress.progress().inputRowsPerSecond(),
>>>>>> > > > >
>>>>>> > > >  queryProgress.progress().processedRowsPerSecond(),
>>>>>> > > > > queryProgress.progress().durationMs());
>>>>>> > > > >
>>>>>> > > > >
>>>>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <
>>>>>> wezhang@outlook.com> wrote:
>>>>>> > > > >
>>>>>> > > > > > May I get how the accumulator is accessed in the method
>>>>>> > > > > > `onQueryProgress()`?
>>>>>> > > > > >
>>>>>> > > > > > AFAICT, the accumulator is incremented well. There is a way
>>>>>> to verify
>>>>>> > > > that
>>>>>> > > > > > in cluster like this:
>>>>>> > > > > > ```
>>>>>> > > > > >     // Add the following while loop before invoking
>>>>>> awaitTermination
>>>>>> > > > > >     while (true) {
>>>>>> > > > > >       println("My acc: " + myAcc.value)
>>>>>> > > > > >       Thread.sleep(5 * 1000)
>>>>>> > > > > >     }
>>>>>> > > > > >
>>>>>> > > > > >     //query.awaitTermination()
>>>>>> > > > > > ```
>>>>>> > > > > >
>>>>>> > > > > > And the accumulator value updated can be found from driver
>>>>>> stdout.
>>>>>> > > > > >
>>>>>> > > > > > --
>>>>>> > > > > > Cheers,
>>>>>> > > > > > -z
>>>>>> > > > > >
>>>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>>>>>> > > > > > Srinivas V <sr...@gmail.com> wrote:
>>>>>> > > > > >
>>>>>> > > > > > > yes, I am using stateful structured streaming. Yes
>>>>>> similar to what
>>>>>> > > > you
>>>>>> > > > > > do.
>>>>>> > > > > > > This is in Java
>>>>>> > > > > > > I do it this way:
>>>>>> > > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>>>> > > > > > >                 .groupByKey(
>>>>>> > > > > > >                         (MapFunction<InputEventModel,
>>>>>> String>) event
>>>>>> > > > ->
>>>>>> > > > > > > event.getId(), Encoders.STRING())
>>>>>> > > > > > >                 .mapGroupsWithState(
>>>>>> > > > > > >                         new
>>>>>> > > > > > >
>>>>>> > > > > >
>>>>>> > > >
>>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>>> > > > > > > appConfig, accumulators),
>>>>>> > > > > > >
>>>>>>  Encoders.bean(ModelStateInfo.class),
>>>>>> > > > > > >                         Encoders.bean(ModelUpdate.class),
>>>>>> > > > > > >
>>>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>>>> > > > > > >
>>>>>> > > > > > > StateUpdateTask contains the update method.
>>>>>> > > > > > >
>>>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>>>> > > > > > > mailinglists19@gmail.com> wrote:
>>>>>> > > > > > >
>>>>>> > > > > > > > Yes, that's exactly how I am creating them.
>>>>>> > > > > > > >
>>>>>> > > > > > > > Question... Are you using 'Stateful Structured
>>>>>> Streaming' in which
>>>>>> > > > > > you've
>>>>>> > > > > > > > something like this?
>>>>>> > > > > > > >
>>>>>> > > > > > > >
>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>> > > > > > > >         updateAcrossEvents
>>>>>> > > > > > > >       )
>>>>>> > > > > > > >
>>>>>> > > > > > > > And updating the Accumulator inside
>>>>>> 'updateAcrossEvents'? We're
>>>>>> > > > > > experiencing this only under 'Stateful Structured
>>>>>> Streaming'. In other
>>>>>> > > > > > streaming applications it works as expected.
>>>>>> > > > > > > >
>>>>>> > > > > > > >
>>>>>> > > > > > > >
>>>>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>>>>>> srini.vyr@gmail.com>
>>>>>> > > > > > wrote:
>>>>>> > > > > > > >
>>>>>> > > > > > > >> Yes, I am talking about Application specific
>>>>>> Accumulators.
>>>>>> > > > Actually I
>>>>>> > > > > > am
>>>>>> > > > > > > >> getting the values printed in my driver log as well as
>>>>>> sent to
>>>>>> > > > > > Grafana. Not
>>>>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>>>>>> “client” on
>>>>>> > > > a
>>>>>> > > > > > yarn
>>>>>> > > > > > > >> cluster(not local Mac) where I submit from master
>>>>>> node. It should
>>>>>> > > > > > work the
>>>>>> > > > > > > >> same for cluster mode as well.
>>>>>> > > > > > > >> Create accumulators like this:
>>>>>> > > > > > > >> AccumulatorV2 accumulator =
>>>>>> sparkContext.longAccumulator(name);
>>>>>> > > > > > > >>
>>>>>> > > > > > > >>
>>>>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>>> > > > > > > >> mailinglists19@gmail.com> wrote:
>>>>>> > > > > > > >>
>>>>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not
>>>>>> getting
>>>>>> > > > > > computed in
>>>>>> > > > > > > >>> your code? I am talking about the Application Specific
>>>>>> > > > Accumulators.
>>>>>> > > > > > The
>>>>>> > > > > > > >>> other standard counters such as
>>>>>> > > > 'event.progress.inputRowsPerSecond'
>>>>>> > > > > > are
>>>>>> > > > > > > >>> getting populated correctly!
>>>>>> > > > > > > >>>
>>>>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
>>>>>> srini.vyr@gmail.com>
>>>>>> > > > > > wrote:
>>>>>> > > > > > > >>>
>>>>>> > > > > > > >>>> Hello,
>>>>>> > > > > > > >>>> Even for me it comes as 0 when I print in
>>>>>> OnQueryProgress. I use
>>>>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local
>>>>>> but not on
>>>>>> > > > > > cluster.
>>>>>> > > > > > > >>>> But one consolation is that when I send metrics to
>>>>>> Graphana, the
>>>>>> > > > > > values
>>>>>> > > > > > > >>>> are coming there.
>>>>>> > > > > > > >>>>
>>>>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>> > > > > > > >>>> mailinglists19@gmail.com> wrote:
>>>>>> > > > > > > >>>>
>>>>>> > > > > > > >>>>> No this is not working even if I use
>>>>>> LongAccumulator.
>>>>>> > > > > > > >>>>>
>>>>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
>>>>>> wezhang@outlook.com
>>>>>> > > > >
>>>>>> > > > > > wrote:
>>>>>> > > > > > > >>>>>
>>>>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1],
>>>>>> the OUT type
>>>>>> > > > > > should
>>>>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
>>>>>> implementation
>>>>>> > > > for
>>>>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is
>>>>>> there any
>>>>>> > > > chance
>>>>>> > > > > > to replace
>>>>>> > > > > > > >>>>>> CollectionLongAccumulator by
>>>>>> CollectionAccumulator[2] or
>>>>>> > > > > > LongAccumulator[3]
>>>>>> > > > > > > >>>>>> and test if the StreamingListener and other codes
>>>>>> are able to
>>>>>> > > > > > work?
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> ---
>>>>>> > > > > > > >>>>>> Cheers,
>>>>>> > > > > > > >>>>>> -z
>>>>>> > > > > > > >>>>>> [1]
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > >
>>>>>> > > >
>>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
>>>>>> > > > > > > >>>>>> [2]
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > >
>>>>>> > > >
>>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
>>>>>> > > > > > > >>>>>> [3]
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > >
>>>>>> > > >
>>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> ________________________________________
>>>>>> > > > > > > >>>>>> From: Something Something <
>>>>>> mailinglists19@gmail.com>
>>>>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>> > > > > > > >>>>>> To: spark-user
>>>>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with
>>>>>> Structured
>>>>>> > > > Streaming
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if
>>>>>> this
>>>>>> > > > > > functionality
>>>>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time
>>>>>> on this but
>>>>>> > > > > > can't get it
>>>>>> > > > > > > >>>>>> to work. Just to add more context, we've our own
>>>>>> Accumulator
>>>>>> > > > > > class that
>>>>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep
>>>>>> track of
>>>>>> > > > one or
>>>>>> > > > > > more
>>>>>> > > > > > > >>>>>> accumulators. Here's the definition:
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> class CollectionLongAccumulator[T]
>>>>>> > > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T,
>>>>>> Long]]
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> When the job begins we register an instance of
>>>>>> this class:
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
>>>>>> "MyAccumulator")
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> Is this working under Structured Streaming?
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> I will keep looking for alternate approaches but
>>>>>> any help
>>>>>> > > > would be
>>>>>> > > > > > > >>>>>> greatly appreciated. Thanks.
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something
>>>>>> Something <
>>>>>> > > > > > > >>>>>> mailinglists19@gmail.com<mailto:
>>>>>> mailinglists19@gmail.com>>
>>>>>> > > > wrote:
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark
>>>>>> > > > Accumulators in
>>>>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always
>>>>>> 0 when I
>>>>>> > > > try to
>>>>>> > > > > > print
>>>>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>> > > > > > > >>>>>>         updateAcrossEvents
>>>>>> > > > > > > >>>>>>       )
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> The accumulators get incremented in
>>>>>> 'updateAcrossEvents'.
>>>>>> > > > I've a
>>>>>> > > > > > > >>>>>> StreamingListener which writes values of the
>>>>>> accumulators in
>>>>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
>>>>>> Accumulators
>>>>>> > > > are
>>>>>> > > > > > ALWAYS
>>>>>> > > > > > > >>>>>> ZERO!
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> When I added log statements in the
>>>>>> updateAcrossEvents, I
>>>>>> > > > could see
>>>>>> > > > > > > >>>>>> that these accumulators are getting incremented as
>>>>>> expected.
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster'
>>>>>> mode. In Local
>>>>>> > > > mode
>>>>>> > > > > > it
>>>>>> > > > > > > >>>>>> works fine which implies that the Accumulators are
>>>>>> not getting
>>>>>> > > > > > distributed
>>>>>> > > > > > > >>>>>> correctly - or something like that!
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web
>>>>>> that tell me to
>>>>>> > > > > > > >>>>>> perform an "Action". That's not a solution here.
>>>>>> This is a
>>>>>> > > > > > 'Stateful
>>>>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also
>>>>>> 'registering' them
>>>>>> > > > in
>>>>>> > > > > > > >>>>>> SparkContext.
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > > > >>>>>>
>>>>>> > > > > >
>>>>>> > > >
>>>>>> >
>>>>>> >
>>>>>> ---------------------------------------------------------------------
>>>>>> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>>> >
>>>>>>
>>>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Srinivas V <sr...@gmail.com>.
Ya, I had asked this question before. No one responded. By the way, what’s
your actual name “Something something” if you don’t mind me asking?

On Tue, Jun 9, 2020 at 12:27 AM Something Something <
mailinglists19@gmail.com> wrote:

> What is scary is this interface is marked as "experimental"
>
> @Experimental
> @InterfaceStability.Evolving
> public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
>   R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
> }
>
>
>
>
> On Mon, Jun 8, 2020 at 11:54 AM Something Something <
> mailinglists19@gmail.com> wrote:
>
>> Right, this is exactly how I've it right now. Problem is in the cluster
>> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you
>> will see what I mean.
>>
>> I think how Zhang is using will work. Will try & revert.
>>
>> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <sr...@gmail.com> wrote:
>>
>>>
>>> You don’t need to have a separate class. I created that as it has lot of
>>> code and logic in my case.
>>> For you to quickly test you can use Zhang’s Scala code in this chain.
>>> Pasting it below for your quick reference:
>>>
>>> ```scala
>>>     spark.streams.addListener(new StreamingQueryListener {
>>>       override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent):
>>> Unit = {
>>>         println(event.progress.id + " is on progress")
>>>         println(s"My accu is ${myAcc.value} on query progress")
>>>       }
>>>         ...
>>>     })
>>>
>>>     def mappingFunc(key: Long, values: Iterator[String], state:
>>> GroupState[Long]): ... = {
>>>       myAcc.add(1)
>>>       println(s">>> key: $key => state: ${state}")
>>>         ...
>>>     }
>>>
>>>     val wordCounts = words
>>>       .groupByKey(v => ...)
>>>       .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func
>>> = mappingFunc)
>>>
>>>     val query = wordCounts.writeStream
>>>       .outputMode(OutputMode.Update)
>>>
>>>
>>> On Mon, Jun 8, 2020 at 11:14 AM Something Something <
>>> mailinglists19@gmail.com> wrote:
>>>
>>>> Great. I guess the trick is to use a separate class such as
>>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into
>>>> Scala. Will try it out & revert. Thanks for the tips.
>>>>
>>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <we...@outlook.com> wrote:
>>>>
>>>>> The following Java codes can work in my cluster environment:
>>>>> ```
>>>>>     .mapGroupsWithState((MapGroupsWithStateFunction<String, String,
>>>>> Long, LeadingCharCount>) (key, values, state) -> {
>>>>>                 myAcc.add(1);
>>>>>                 <...>
>>>>>                 state.update(newState);
>>>>>                 return new LeadingCharCount(key, newState);
>>>>>             },
>>>>>             Encoders.LONG(),
>>>>>             Encoders.bean(LeadingCharCount.class),
>>>>>             GroupStateTimeout.ProcessingTimeTimeout())
>>>>> ```
>>>>>
>>>>> Also works fine with my `StateUpdateTask`:
>>>>> ```
>>>>>     .mapGroupsWithState(
>>>>>             new StateUpdateTask(myAcc),
>>>>>             Encoders.LONG(),
>>>>>             Encoders.bean(LeadingCharCount.class),
>>>>>             GroupStateTimeout.ProcessingTimeTimeout());
>>>>>
>>>>> public class StateUpdateTask
>>>>>             implements MapGroupsWithStateFunction<String, String,
>>>>> Long, LeadingCharCount> {
>>>>>         private LongAccumulator myAccInTask;
>>>>>
>>>>>         public StateUpdateTask(LongAccumulator acc) {
>>>>>             this.myAccInTask = acc;
>>>>>         }
>>>>>
>>>>>         @Override
>>>>>         public LeadingCharCount call(String key, Iterator<String>
>>>>> values, GroupState<Long> state) throws Exception {
>>>>>             myAccInTask.add(1);
>>>>>             <...>
>>>>>             state.update(newState);
>>>>>             return new LeadingCharCount(key, newState);
>>>>>         }
>>>>> }
>>>>> ```
>>>>>
>>>>> --
>>>>> Cheers,
>>>>> -z
>>>>>
>>>>> On Tue, 2 Jun 2020 10:28:36 +0800
>>>>> ZHANG Wei <we...@outlook.com> wrote:
>>>>>
>>>>> > Yes, verified on the cluster with 5 executors.
>>>>> >
>>>>> > --
>>>>> > Cheers,
>>>>> > -z
>>>>> >
>>>>> > On Fri, 29 May 2020 11:16:12 -0700
>>>>> > Something Something <ma...@gmail.com> wrote:
>>>>> >
>>>>> > > Did you try this on the Cluster? Note: This works just fine under
>>>>> 'Local'
>>>>> > > mode.
>>>>> > >
>>>>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <we...@outlook.com>
>>>>> wrote:
>>>>> > >
>>>>> > > > I can't reproduce the issue with my simple code:
>>>>> > > > ```scala
>>>>> > > >     spark.streams.addListener(new StreamingQueryListener {
>>>>> > > >       override def onQueryProgress(event:
>>>>> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
>>>>> > > >         println(event.progress.id + " is on progress")
>>>>> > > >         println(s"My accu is ${myAcc.value} on query progress")
>>>>> > > >       }
>>>>> > > >         ...
>>>>> > > >     })
>>>>> > > >
>>>>> > > >     def mappingFunc(key: Long, values: Iterator[String], state:
>>>>> > > > GroupState[Long]): ... = {
>>>>> > > >       myAcc.add(1)
>>>>> > > >       println(s">>> key: $key => state: ${state}")
>>>>> > > >         ...
>>>>> > > >     }
>>>>> > > >
>>>>> > > >     val wordCounts = words
>>>>> > > >       .groupByKey(v => ...)
>>>>> > > >       .mapGroupsWithState(timeoutConf =
>>>>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>>>>> > > >
>>>>> > > >     val query = wordCounts.writeStream
>>>>> > > >       .outputMode(OutputMode.Update)
>>>>> > > >         ...
>>>>> > > > ```
>>>>> > > >
>>>>> > > > I'm wondering if there were any errors can be found from driver
>>>>> logs? The
>>>>> > > > micro-batch
>>>>> > > > exceptions won't terminate the streaming job running.
>>>>> > > >
>>>>> > > > For the following code, we have to make sure that
>>>>> `StateUpdateTask` is
>>>>> > > > started:
>>>>> > > > >                 .mapGroupsWithState(
>>>>> > > > >                         new
>>>>> > > >
>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>> > > > > appConfig, accumulators),
>>>>> > > > >                         Encoders.bean(ModelStateInfo.class),
>>>>> > > > >                         Encoders.bean(ModelUpdate.class),
>>>>> > > > >
>>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>>> > > >
>>>>> > > > --
>>>>> > > > Cheers,
>>>>> > > > -z
>>>>> > > >
>>>>> > > > On Thu, 28 May 2020 19:59:31 +0530
>>>>> > > > Srinivas V <sr...@gmail.com> wrote:
>>>>> > > >
>>>>> > > > > Giving the code below:
>>>>> > > > > //accumulators is a class level variable in driver.
>>>>> > > > >
>>>>> > > > >  sparkSession.streams().addListener(new
>>>>> StreamingQueryListener() {
>>>>> > > > >             @Override
>>>>> > > > >             public void onQueryStarted(QueryStartedEvent
>>>>> queryStarted) {
>>>>> > > > >                 logger.info("Query started: " +
>>>>> queryStarted.id());
>>>>> > > > >             }
>>>>> > > > >             @Override
>>>>> > > > >             public void onQueryTerminated(QueryTerminatedEvent
>>>>> > > > > queryTerminated) {
>>>>> > > > >                 logger.info("Query terminated: " +
>>>>> > > > queryTerminated.id());
>>>>> > > > >             }
>>>>> > > > >             @Override
>>>>> > > > >             public void onQueryProgress(QueryProgressEvent
>>>>> > > > queryProgress) {
>>>>> > > > >
>>>>> > > > >
>>>>> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>>>>> > > > >                 long eventsReceived = 0;
>>>>> > > > >                 long eventsExpired = 0;
>>>>> > > > >                 long eventSentSuccess = 0;
>>>>> > > > >                 try {
>>>>> > > > >                     eventsReceived =
>>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>>>>> > > > >                     eventsExpired =
>>>>> > > > >
>>>>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>>>>> > > > >                     eventSentSuccess =
>>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>>>>> > > > >                 } catch (MissingKeyException e) {
>>>>> > > > >                     logger.error("Accumulator key not found
>>>>> due to
>>>>> > > > > Exception {}", e.getMessage());
>>>>> > > > >                 }
>>>>> > > > >                 logger.info("Events Received:{}",
>>>>> eventsReceived);
>>>>> > > > >                 logger.info("Events State Expired:{}",
>>>>> eventsExpired);
>>>>> > > > >                 logger.info("Events Sent Success:{}",
>>>>> eventSentSuccess);
>>>>> > > > >                 logger.info("Query made progress - batchId: {}
>>>>> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
>>>>> > > > > durationMs:{}" ,
>>>>> > > > >                         queryProgress.progress().batchId(),
>>>>> > > > > queryProgress.progress().numInputRows(),
>>>>> > > > > queryProgress.progress().inputRowsPerSecond(),
>>>>> > > > >
>>>>> > > >  queryProgress.progress().processedRowsPerSecond(),
>>>>> > > > > queryProgress.progress().durationMs());
>>>>> > > > >
>>>>> > > > >
>>>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <we...@outlook.com>
>>>>> wrote:
>>>>> > > > >
>>>>> > > > > > May I get how the accumulator is accessed in the method
>>>>> > > > > > `onQueryProgress()`?
>>>>> > > > > >
>>>>> > > > > > AFAICT, the accumulator is incremented well. There is a way
>>>>> to verify
>>>>> > > > that
>>>>> > > > > > in cluster like this:
>>>>> > > > > > ```
>>>>> > > > > >     // Add the following while loop before invoking
>>>>> awaitTermination
>>>>> > > > > >     while (true) {
>>>>> > > > > >       println("My acc: " + myAcc.value)
>>>>> > > > > >       Thread.sleep(5 * 1000)
>>>>> > > > > >     }
>>>>> > > > > >
>>>>> > > > > >     //query.awaitTermination()
>>>>> > > > > > ```
>>>>> > > > > >
>>>>> > > > > > And the accumulator value updated can be found from driver
>>>>> stdout.
>>>>> > > > > >
>>>>> > > > > > --
>>>>> > > > > > Cheers,
>>>>> > > > > > -z
>>>>> > > > > >
>>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>>>>> > > > > > Srinivas V <sr...@gmail.com> wrote:
>>>>> > > > > >
>>>>> > > > > > > yes, I am using stateful structured streaming. Yes similar
>>>>> to what
>>>>> > > > you
>>>>> > > > > > do.
>>>>> > > > > > > This is in Java
>>>>> > > > > > > I do it this way:
>>>>> > > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>>> > > > > > >                 .groupByKey(
>>>>> > > > > > >                         (MapFunction<InputEventModel,
>>>>> String>) event
>>>>> > > > ->
>>>>> > > > > > > event.getId(), Encoders.STRING())
>>>>> > > > > > >                 .mapGroupsWithState(
>>>>> > > > > > >                         new
>>>>> > > > > > >
>>>>> > > > > >
>>>>> > > >
>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>> > > > > > > appConfig, accumulators),
>>>>> > > > > > >
>>>>>  Encoders.bean(ModelStateInfo.class),
>>>>> > > > > > >                         Encoders.bean(ModelUpdate.class),
>>>>> > > > > > >
>>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>>> > > > > > >
>>>>> > > > > > > StateUpdateTask contains the update method.
>>>>> > > > > > >
>>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>>> > > > > > > mailinglists19@gmail.com> wrote:
>>>>> > > > > > >
>>>>> > > > > > > > Yes, that's exactly how I am creating them.
>>>>> > > > > > > >
>>>>> > > > > > > > Question... Are you using 'Stateful Structured
>>>>> Streaming' in which
>>>>> > > > > > you've
>>>>> > > > > > > > something like this?
>>>>> > > > > > > >
>>>>> > > > > > > >
>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>> > > > > > > >         updateAcrossEvents
>>>>> > > > > > > >       )
>>>>> > > > > > > >
>>>>> > > > > > > > And updating the Accumulator inside
>>>>> 'updateAcrossEvents'? We're
>>>>> > > > > > experiencing this only under 'Stateful Structured
>>>>> Streaming'. In other
>>>>> > > > > > streaming applications it works as expected.
>>>>> > > > > > > >
>>>>> > > > > > > >
>>>>> > > > > > > >
>>>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>>>>> srini.vyr@gmail.com>
>>>>> > > > > > wrote:
>>>>> > > > > > > >
>>>>> > > > > > > >> Yes, I am talking about Application specific
>>>>> Accumulators.
>>>>> > > > Actually I
>>>>> > > > > > am
>>>>> > > > > > > >> getting the values printed in my driver log as well as
>>>>> sent to
>>>>> > > > > > Grafana. Not
>>>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>>>>> “client” on
>>>>> > > > a
>>>>> > > > > > yarn
>>>>> > > > > > > >> cluster(not local Mac) where I submit from master node.
>>>>> It should
>>>>> > > > > > work the
>>>>> > > > > > > >> same for cluster mode as well.
>>>>> > > > > > > >> Create accumulators like this:
>>>>> > > > > > > >> AccumulatorV2 accumulator =
>>>>> sparkContext.longAccumulator(name);
>>>>> > > > > > > >>
>>>>> > > > > > > >>
>>>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>> > > > > > > >> mailinglists19@gmail.com> wrote:
>>>>> > > > > > > >>
>>>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not
>>>>> getting
>>>>> > > > > > computed in
>>>>> > > > > > > >>> your code? I am talking about the Application Specific
>>>>> > > > Accumulators.
>>>>> > > > > > The
>>>>> > > > > > > >>> other standard counters such as
>>>>> > > > 'event.progress.inputRowsPerSecond'
>>>>> > > > > > are
>>>>> > > > > > > >>> getting populated correctly!
>>>>> > > > > > > >>>
>>>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
>>>>> srini.vyr@gmail.com>
>>>>> > > > > > wrote:
>>>>> > > > > > > >>>
>>>>> > > > > > > >>>> Hello,
>>>>> > > > > > > >>>> Even for me it comes as 0 when I print in
>>>>> OnQueryProgress. I use
>>>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local
>>>>> but not on
>>>>> > > > > > cluster.
>>>>> > > > > > > >>>> But one consolation is that when I send metrics to
>>>>> Graphana, the
>>>>> > > > > > values
>>>>> > > > > > > >>>> are coming there.
>>>>> > > > > > > >>>>
>>>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>> > > > > > > >>>> mailinglists19@gmail.com> wrote:
>>>>> > > > > > > >>>>
>>>>> > > > > > > >>>>> No this is not working even if I use LongAccumulator.
>>>>> > > > > > > >>>>>
>>>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
>>>>> wezhang@outlook.com
>>>>> > > > >
>>>>> > > > > > wrote:
>>>>> > > > > > > >>>>>
>>>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1],
>>>>> the OUT type
>>>>> > > > > > should
>>>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
>>>>> implementation
>>>>> > > > for
>>>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is
>>>>> there any
>>>>> > > > chance
>>>>> > > > > > to replace
>>>>> > > > > > > >>>>>> CollectionLongAccumulator by
>>>>> CollectionAccumulator[2] or
>>>>> > > > > > LongAccumulator[3]
>>>>> > > > > > > >>>>>> and test if the StreamingListener and other codes
>>>>> are able to
>>>>> > > > > > work?
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> ---
>>>>> > > > > > > >>>>>> Cheers,
>>>>> > > > > > > >>>>>> -z
>>>>> > > > > > > >>>>>> [1]
>>>>> > > > > > > >>>>>>
>>>>> > > > > >
>>>>> > > >
>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
>>>>> > > > > > > >>>>>> [2]
>>>>> > > > > > > >>>>>>
>>>>> > > > > >
>>>>> > > >
>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
>>>>> > > > > > > >>>>>> [3]
>>>>> > > > > > > >>>>>>
>>>>> > > > > >
>>>>> > > >
>>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> ________________________________________
>>>>> > > > > > > >>>>>> From: Something Something <mailinglists19@gmail.com
>>>>> >
>>>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>> > > > > > > >>>>>> To: spark-user
>>>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with
>>>>> Structured
>>>>> > > > Streaming
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if
>>>>> this
>>>>> > > > > > functionality
>>>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time
>>>>> on this but
>>>>> > > > > > can't get it
>>>>> > > > > > > >>>>>> to work. Just to add more context, we've our own
>>>>> Accumulator
>>>>> > > > > > class that
>>>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep
>>>>> track of
>>>>> > > > one or
>>>>> > > > > > more
>>>>> > > > > > > >>>>>> accumulators. Here's the definition:
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> class CollectionLongAccumulator[T]
>>>>> > > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> When the job begins we register an instance of this
>>>>> class:
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
>>>>> "MyAccumulator")
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> Is this working under Structured Streaming?
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> I will keep looking for alternate approaches but
>>>>> any help
>>>>> > > > would be
>>>>> > > > > > > >>>>>> greatly appreciated. Thanks.
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something
>>>>> <
>>>>> > > > > > > >>>>>> mailinglists19@gmail.com<mailto:
>>>>> mailinglists19@gmail.com>>
>>>>> > > > wrote:
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark
>>>>> > > > Accumulators in
>>>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0
>>>>> when I
>>>>> > > > try to
>>>>> > > > > > print
>>>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>> > > > > > > >>>>>>         updateAcrossEvents
>>>>> > > > > > > >>>>>>       )
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> The accumulators get incremented in
>>>>> 'updateAcrossEvents'.
>>>>> > > > I've a
>>>>> > > > > > > >>>>>> StreamingListener which writes values of the
>>>>> accumulators in
>>>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
>>>>> Accumulators
>>>>> > > > are
>>>>> > > > > > ALWAYS
>>>>> > > > > > > >>>>>> ZERO!
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> When I added log statements in the
>>>>> updateAcrossEvents, I
>>>>> > > > could see
>>>>> > > > > > > >>>>>> that these accumulators are getting incremented as
>>>>> expected.
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode.
>>>>> In Local
>>>>> > > > mode
>>>>> > > > > > it
>>>>> > > > > > > >>>>>> works fine which implies that the Accumulators are
>>>>> not getting
>>>>> > > > > > distributed
>>>>> > > > > > > >>>>>> correctly - or something like that!
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that
>>>>> tell me to
>>>>> > > > > > > >>>>>> perform an "Action". That's not a solution here.
>>>>> This is a
>>>>> > > > > > 'Stateful
>>>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also
>>>>> 'registering' them
>>>>> > > > in
>>>>> > > > > > > >>>>>> SparkContext.
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > > > >>>>>>
>>>>> > > > > >
>>>>> > > >
>>>>> >
>>>>> > ---------------------------------------------------------------------
>>>>> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>>> >
>>>>>
>>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
What is scary is this interface is marked as "experimental"

@Experimental
@InterfaceStability.Evolving
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
  R call(K key, Iterator<V> values, GroupState<S> state) throws Exception;
}




On Mon, Jun 8, 2020 at 11:54 AM Something Something <
mailinglists19@gmail.com> wrote:

> Right, this is exactly how I've it right now. Problem is in the cluster
> mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you
> will see what I mean.
>
> I think how Zhang is using will work. Will try & revert.
>
> On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <sr...@gmail.com> wrote:
>
>>
>> You don’t need to have a separate class. I created that as it has lot of
>> code and logic in my case.
>> For you to quickly test you can use Zhang’s Scala code in this chain.
>> Pasting it below for your quick reference:
>>
>> ```scala
>>     spark.streams.addListener(new StreamingQueryListener {
>>       override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent):
>> Unit = {
>>         println(event.progress.id + " is on progress")
>>         println(s"My accu is ${myAcc.value} on query progress")
>>       }
>>         ...
>>     })
>>
>>     def mappingFunc(key: Long, values: Iterator[String], state:
>> GroupState[Long]): ... = {
>>       myAcc.add(1)
>>       println(s">>> key: $key => state: ${state}")
>>         ...
>>     }
>>
>>     val wordCounts = words
>>       .groupByKey(v => ...)
>>       .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func
>> = mappingFunc)
>>
>>     val query = wordCounts.writeStream
>>       .outputMode(OutputMode.Update)
>>
>>
>> On Mon, Jun 8, 2020 at 11:14 AM Something Something <
>> mailinglists19@gmail.com> wrote:
>>
>>> Great. I guess the trick is to use a separate class such as
>>> 'StateUpdateTask'. I will try that. My challenge is to convert this into
>>> Scala. Will try it out & revert. Thanks for the tips.
>>>
>>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <we...@outlook.com> wrote:
>>>
>>>> The following Java codes can work in my cluster environment:
>>>> ```
>>>>     .mapGroupsWithState((MapGroupsWithStateFunction<String, String,
>>>> Long, LeadingCharCount>) (key, values, state) -> {
>>>>                 myAcc.add(1);
>>>>                 <...>
>>>>                 state.update(newState);
>>>>                 return new LeadingCharCount(key, newState);
>>>>             },
>>>>             Encoders.LONG(),
>>>>             Encoders.bean(LeadingCharCount.class),
>>>>             GroupStateTimeout.ProcessingTimeTimeout())
>>>> ```
>>>>
>>>> Also works fine with my `StateUpdateTask`:
>>>> ```
>>>>     .mapGroupsWithState(
>>>>             new StateUpdateTask(myAcc),
>>>>             Encoders.LONG(),
>>>>             Encoders.bean(LeadingCharCount.class),
>>>>             GroupStateTimeout.ProcessingTimeTimeout());
>>>>
>>>> public class StateUpdateTask
>>>>             implements MapGroupsWithStateFunction<String, String, Long,
>>>> LeadingCharCount> {
>>>>         private LongAccumulator myAccInTask;
>>>>
>>>>         public StateUpdateTask(LongAccumulator acc) {
>>>>             this.myAccInTask = acc;
>>>>         }
>>>>
>>>>         @Override
>>>>         public LeadingCharCount call(String key, Iterator<String>
>>>> values, GroupState<Long> state) throws Exception {
>>>>             myAccInTask.add(1);
>>>>             <...>
>>>>             state.update(newState);
>>>>             return new LeadingCharCount(key, newState);
>>>>         }
>>>> }
>>>> ```
>>>>
>>>> --
>>>> Cheers,
>>>> -z
>>>>
>>>> On Tue, 2 Jun 2020 10:28:36 +0800
>>>> ZHANG Wei <we...@outlook.com> wrote:
>>>>
>>>> > Yes, verified on the cluster with 5 executors.
>>>> >
>>>> > --
>>>> > Cheers,
>>>> > -z
>>>> >
>>>> > On Fri, 29 May 2020 11:16:12 -0700
>>>> > Something Something <ma...@gmail.com> wrote:
>>>> >
>>>> > > Did you try this on the Cluster? Note: This works just fine under
>>>> 'Local'
>>>> > > mode.
>>>> > >
>>>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <we...@outlook.com>
>>>> wrote:
>>>> > >
>>>> > > > I can't reproduce the issue with my simple code:
>>>> > > > ```scala
>>>> > > >     spark.streams.addListener(new StreamingQueryListener {
>>>> > > >       override def onQueryProgress(event:
>>>> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
>>>> > > >         println(event.progress.id + " is on progress")
>>>> > > >         println(s"My accu is ${myAcc.value} on query progress")
>>>> > > >       }
>>>> > > >         ...
>>>> > > >     })
>>>> > > >
>>>> > > >     def mappingFunc(key: Long, values: Iterator[String], state:
>>>> > > > GroupState[Long]): ... = {
>>>> > > >       myAcc.add(1)
>>>> > > >       println(s">>> key: $key => state: ${state}")
>>>> > > >         ...
>>>> > > >     }
>>>> > > >
>>>> > > >     val wordCounts = words
>>>> > > >       .groupByKey(v => ...)
>>>> > > >       .mapGroupsWithState(timeoutConf =
>>>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>>>> > > >
>>>> > > >     val query = wordCounts.writeStream
>>>> > > >       .outputMode(OutputMode.Update)
>>>> > > >         ...
>>>> > > > ```
>>>> > > >
>>>> > > > I'm wondering if there were any errors can be found from driver
>>>> logs? The
>>>> > > > micro-batch
>>>> > > > exceptions won't terminate the streaming job running.
>>>> > > >
>>>> > > > For the following code, we have to make sure that
>>>> `StateUpdateTask` is
>>>> > > > started:
>>>> > > > >                 .mapGroupsWithState(
>>>> > > > >                         new
>>>> > > >
>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>> > > > > appConfig, accumulators),
>>>> > > > >                         Encoders.bean(ModelStateInfo.class),
>>>> > > > >                         Encoders.bean(ModelUpdate.class),
>>>> > > > >
>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>> > > >
>>>> > > > --
>>>> > > > Cheers,
>>>> > > > -z
>>>> > > >
>>>> > > > On Thu, 28 May 2020 19:59:31 +0530
>>>> > > > Srinivas V <sr...@gmail.com> wrote:
>>>> > > >
>>>> > > > > Giving the code below:
>>>> > > > > //accumulators is a class level variable in driver.
>>>> > > > >
>>>> > > > >  sparkSession.streams().addListener(new
>>>> StreamingQueryListener() {
>>>> > > > >             @Override
>>>> > > > >             public void onQueryStarted(QueryStartedEvent
>>>> queryStarted) {
>>>> > > > >                 logger.info("Query started: " +
>>>> queryStarted.id());
>>>> > > > >             }
>>>> > > > >             @Override
>>>> > > > >             public void onQueryTerminated(QueryTerminatedEvent
>>>> > > > > queryTerminated) {
>>>> > > > >                 logger.info("Query terminated: " +
>>>> > > > queryTerminated.id());
>>>> > > > >             }
>>>> > > > >             @Override
>>>> > > > >             public void onQueryProgress(QueryProgressEvent
>>>> > > > queryProgress) {
>>>> > > > >
>>>> > > > >
>>>> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>>>> > > > >                 long eventsReceived = 0;
>>>> > > > >                 long eventsExpired = 0;
>>>> > > > >                 long eventSentSuccess = 0;
>>>> > > > >                 try {
>>>> > > > >                     eventsReceived =
>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>>>> > > > >                     eventsExpired =
>>>> > > > >
>>>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>>>> > > > >                     eventSentSuccess =
>>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>>>> > > > >                 } catch (MissingKeyException e) {
>>>> > > > >                     logger.error("Accumulator key not found due
>>>> to
>>>> > > > > Exception {}", e.getMessage());
>>>> > > > >                 }
>>>> > > > >                 logger.info("Events Received:{}",
>>>> eventsReceived);
>>>> > > > >                 logger.info("Events State Expired:{}",
>>>> eventsExpired);
>>>> > > > >                 logger.info("Events Sent Success:{}",
>>>> eventSentSuccess);
>>>> > > > >                 logger.info("Query made progress - batchId: {}
>>>> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
>>>> > > > > durationMs:{}" ,
>>>> > > > >                         queryProgress.progress().batchId(),
>>>> > > > > queryProgress.progress().numInputRows(),
>>>> > > > > queryProgress.progress().inputRowsPerSecond(),
>>>> > > > >
>>>> > > >  queryProgress.progress().processedRowsPerSecond(),
>>>> > > > > queryProgress.progress().durationMs());
>>>> > > > >
>>>> > > > >
>>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <we...@outlook.com>
>>>> wrote:
>>>> > > > >
>>>> > > > > > May I get how the accumulator is accessed in the method
>>>> > > > > > `onQueryProgress()`?
>>>> > > > > >
>>>> > > > > > AFAICT, the accumulator is incremented well. There is a way
>>>> to verify
>>>> > > > that
>>>> > > > > > in cluster like this:
>>>> > > > > > ```
>>>> > > > > >     // Add the following while loop before invoking
>>>> awaitTermination
>>>> > > > > >     while (true) {
>>>> > > > > >       println("My acc: " + myAcc.value)
>>>> > > > > >       Thread.sleep(5 * 1000)
>>>> > > > > >     }
>>>> > > > > >
>>>> > > > > >     //query.awaitTermination()
>>>> > > > > > ```
>>>> > > > > >
>>>> > > > > > And the accumulator value updated can be found from driver
>>>> stdout.
>>>> > > > > >
>>>> > > > > > --
>>>> > > > > > Cheers,
>>>> > > > > > -z
>>>> > > > > >
>>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>>>> > > > > > Srinivas V <sr...@gmail.com> wrote:
>>>> > > > > >
>>>> > > > > > > yes, I am using stateful structured streaming. Yes similar
>>>> to what
>>>> > > > you
>>>> > > > > > do.
>>>> > > > > > > This is in Java
>>>> > > > > > > I do it this way:
>>>> > > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>> > > > > > >                 .groupByKey(
>>>> > > > > > >                         (MapFunction<InputEventModel,
>>>> String>) event
>>>> > > > ->
>>>> > > > > > > event.getId(), Encoders.STRING())
>>>> > > > > > >                 .mapGroupsWithState(
>>>> > > > > > >                         new
>>>> > > > > > >
>>>> > > > > >
>>>> > > >
>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>> > > > > > > appConfig, accumulators),
>>>> > > > > > >                         Encoders.bean(ModelStateInfo.class),
>>>> > > > > > >                         Encoders.bean(ModelUpdate.class),
>>>> > > > > > >
>>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>>> > > > > > >
>>>> > > > > > > StateUpdateTask contains the update method.
>>>> > > > > > >
>>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>> > > > > > > mailinglists19@gmail.com> wrote:
>>>> > > > > > >
>>>> > > > > > > > Yes, that's exactly how I am creating them.
>>>> > > > > > > >
>>>> > > > > > > > Question... Are you using 'Stateful Structured Streaming'
>>>> in which
>>>> > > > > > you've
>>>> > > > > > > > something like this?
>>>> > > > > > > >
>>>> > > > > > > >
>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>> > > > > > > >         updateAcrossEvents
>>>> > > > > > > >       )
>>>> > > > > > > >
>>>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
>>>> We're
>>>> > > > > > experiencing this only under 'Stateful Structured Streaming'.
>>>> In other
>>>> > > > > > streaming applications it works as expected.
>>>> > > > > > > >
>>>> > > > > > > >
>>>> > > > > > > >
>>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>>>> srini.vyr@gmail.com>
>>>> > > > > > wrote:
>>>> > > > > > > >
>>>> > > > > > > >> Yes, I am talking about Application specific
>>>> Accumulators.
>>>> > > > Actually I
>>>> > > > > > am
>>>> > > > > > > >> getting the values printed in my driver log as well as
>>>> sent to
>>>> > > > > > Grafana. Not
>>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>>>> “client” on
>>>> > > > a
>>>> > > > > > yarn
>>>> > > > > > > >> cluster(not local Mac) where I submit from master node.
>>>> It should
>>>> > > > > > work the
>>>> > > > > > > >> same for cluster mode as well.
>>>> > > > > > > >> Create accumulators like this:
>>>> > > > > > > >> AccumulatorV2 accumulator =
>>>> sparkContext.longAccumulator(name);
>>>> > > > > > > >>
>>>> > > > > > > >>
>>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>> > > > > > > >> mailinglists19@gmail.com> wrote:
>>>> > > > > > > >>
>>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not
>>>> getting
>>>> > > > > > computed in
>>>> > > > > > > >>> your code? I am talking about the Application Specific
>>>> > > > Accumulators.
>>>> > > > > > The
>>>> > > > > > > >>> other standard counters such as
>>>> > > > 'event.progress.inputRowsPerSecond'
>>>> > > > > > are
>>>> > > > > > > >>> getting populated correctly!
>>>> > > > > > > >>>
>>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
>>>> srini.vyr@gmail.com>
>>>> > > > > > wrote:
>>>> > > > > > > >>>
>>>> > > > > > > >>>> Hello,
>>>> > > > > > > >>>> Even for me it comes as 0 when I print in
>>>> OnQueryProgress. I use
>>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local
>>>> but not on
>>>> > > > > > cluster.
>>>> > > > > > > >>>> But one consolation is that when I send metrics to
>>>> Graphana, the
>>>> > > > > > values
>>>> > > > > > > >>>> are coming there.
>>>> > > > > > > >>>>
>>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>> > > > > > > >>>> mailinglists19@gmail.com> wrote:
>>>> > > > > > > >>>>
>>>> > > > > > > >>>>> No this is not working even if I use LongAccumulator.
>>>> > > > > > > >>>>>
>>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
>>>> wezhang@outlook.com
>>>> > > > >
>>>> > > > > > wrote:
>>>> > > > > > > >>>>>
>>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the
>>>> OUT type
>>>> > > > > > should
>>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
>>>> implementation
>>>> > > > for
>>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is
>>>> there any
>>>> > > > chance
>>>> > > > > > to replace
>>>> > > > > > > >>>>>> CollectionLongAccumulator by
>>>> CollectionAccumulator[2] or
>>>> > > > > > LongAccumulator[3]
>>>> > > > > > > >>>>>> and test if the StreamingListener and other codes
>>>> are able to
>>>> > > > > > work?
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> ---
>>>> > > > > > > >>>>>> Cheers,
>>>> > > > > > > >>>>>> -z
>>>> > > > > > > >>>>>> [1]
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
>>>> > > > > > > >>>>>> [2]
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
>>>> > > > > > > >>>>>> [3]
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> ________________________________________
>>>> > > > > > > >>>>>> From: Something Something <ma...@gmail.com>
>>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
>>>> > > > > > > >>>>>> To: spark-user
>>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
>>>> > > > Streaming
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if
>>>> this
>>>> > > > > > functionality
>>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on
>>>> this but
>>>> > > > > > can't get it
>>>> > > > > > > >>>>>> to work. Just to add more context, we've our own
>>>> Accumulator
>>>> > > > > > class that
>>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep
>>>> track of
>>>> > > > one or
>>>> > > > > > more
>>>> > > > > > > >>>>>> accumulators. Here's the definition:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> class CollectionLongAccumulator[T]
>>>> > > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> When the job begins we register an instance of this
>>>> class:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
>>>> "MyAccumulator")
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> Is this working under Structured Streaming?
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> I will keep looking for alternate approaches but any
>>>> help
>>>> > > > would be
>>>> > > > > > > >>>>>> greatly appreciated. Thanks.
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>> > > > > > > >>>>>> mailinglists19@gmail.com<mailto:
>>>> mailinglists19@gmail.com>>
>>>> > > > wrote:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark
>>>> > > > Accumulators in
>>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0
>>>> when I
>>>> > > > try to
>>>> > > > > > print
>>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>> > > > > > > >>>>>>         updateAcrossEvents
>>>> > > > > > > >>>>>>       )
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> The accumulators get incremented in
>>>> 'updateAcrossEvents'.
>>>> > > > I've a
>>>> > > > > > > >>>>>> StreamingListener which writes values of the
>>>> accumulators in
>>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
>>>> Accumulators
>>>> > > > are
>>>> > > > > > ALWAYS
>>>> > > > > > > >>>>>> ZERO!
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> When I added log statements in the
>>>> updateAcrossEvents, I
>>>> > > > could see
>>>> > > > > > > >>>>>> that these accumulators are getting incremented as
>>>> expected.
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode.
>>>> In Local
>>>> > > > mode
>>>> > > > > > it
>>>> > > > > > > >>>>>> works fine which implies that the Accumulators are
>>>> not getting
>>>> > > > > > distributed
>>>> > > > > > > >>>>>> correctly - or something like that!
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that
>>>> tell me to
>>>> > > > > > > >>>>>> perform an "Action". That's not a solution here.
>>>> This is a
>>>> > > > > > 'Stateful
>>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also
>>>> 'registering' them
>>>> > > > in
>>>> > > > > > > >>>>>> SparkContext.
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > > > >>>>>>
>>>> > > > > >
>>>> > > >
>>>> >
>>>> > ---------------------------------------------------------------------
>>>> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>>> >
>>>>
>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
Right, this is exactly how I've it right now. Problem is in the cluster
mode 'myAcc' does NOT get distributed. Try it out in the cluster mode & you
will see what I mean.

I think how Zhang is using will work. Will try & revert.

On Mon, Jun 8, 2020 at 10:58 AM Srinivas V <sr...@gmail.com> wrote:

>
> You don’t need to have a separate class. I created that as it has lot of
> code and logic in my case.
> For you to quickly test you can use Zhang’s Scala code in this chain.
> Pasting it below for your quick reference:
>
> ```scala
>     spark.streams.addListener(new StreamingQueryListener {
>       override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent):
> Unit = {
>         println(event.progress.id + " is on progress")
>         println(s"My accu is ${myAcc.value} on query progress")
>       }
>         ...
>     })
>
>     def mappingFunc(key: Long, values: Iterator[String], state:
> GroupState[Long]): ... = {
>       myAcc.add(1)
>       println(s">>> key: $key => state: ${state}")
>         ...
>     }
>
>     val wordCounts = words
>       .groupByKey(v => ...)
>       .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func
> = mappingFunc)
>
>     val query = wordCounts.writeStream
>       .outputMode(OutputMode.Update)
>
>
> On Mon, Jun 8, 2020 at 11:14 AM Something Something <
> mailinglists19@gmail.com> wrote:
>
>> Great. I guess the trick is to use a separate class such as
>> 'StateUpdateTask'. I will try that. My challenge is to convert this into
>> Scala. Will try it out & revert. Thanks for the tips.
>>
>> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <we...@outlook.com> wrote:
>>
>>> The following Java codes can work in my cluster environment:
>>> ```
>>>     .mapGroupsWithState((MapGroupsWithStateFunction<String, String,
>>> Long, LeadingCharCount>) (key, values, state) -> {
>>>                 myAcc.add(1);
>>>                 <...>
>>>                 state.update(newState);
>>>                 return new LeadingCharCount(key, newState);
>>>             },
>>>             Encoders.LONG(),
>>>             Encoders.bean(LeadingCharCount.class),
>>>             GroupStateTimeout.ProcessingTimeTimeout())
>>> ```
>>>
>>> Also works fine with my `StateUpdateTask`:
>>> ```
>>>     .mapGroupsWithState(
>>>             new StateUpdateTask(myAcc),
>>>             Encoders.LONG(),
>>>             Encoders.bean(LeadingCharCount.class),
>>>             GroupStateTimeout.ProcessingTimeTimeout());
>>>
>>> public class StateUpdateTask
>>>             implements MapGroupsWithStateFunction<String, String, Long,
>>> LeadingCharCount> {
>>>         private LongAccumulator myAccInTask;
>>>
>>>         public StateUpdateTask(LongAccumulator acc) {
>>>             this.myAccInTask = acc;
>>>         }
>>>
>>>         @Override
>>>         public LeadingCharCount call(String key, Iterator<String>
>>> values, GroupState<Long> state) throws Exception {
>>>             myAccInTask.add(1);
>>>             <...>
>>>             state.update(newState);
>>>             return new LeadingCharCount(key, newState);
>>>         }
>>> }
>>> ```
>>>
>>> --
>>> Cheers,
>>> -z
>>>
>>> On Tue, 2 Jun 2020 10:28:36 +0800
>>> ZHANG Wei <we...@outlook.com> wrote:
>>>
>>> > Yes, verified on the cluster with 5 executors.
>>> >
>>> > --
>>> > Cheers,
>>> > -z
>>> >
>>> > On Fri, 29 May 2020 11:16:12 -0700
>>> > Something Something <ma...@gmail.com> wrote:
>>> >
>>> > > Did you try this on the Cluster? Note: This works just fine under
>>> 'Local'
>>> > > mode.
>>> > >
>>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <we...@outlook.com>
>>> wrote:
>>> > >
>>> > > > I can't reproduce the issue with my simple code:
>>> > > > ```scala
>>> > > >     spark.streams.addListener(new StreamingQueryListener {
>>> > > >       override def onQueryProgress(event:
>>> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
>>> > > >         println(event.progress.id + " is on progress")
>>> > > >         println(s"My accu is ${myAcc.value} on query progress")
>>> > > >       }
>>> > > >         ...
>>> > > >     })
>>> > > >
>>> > > >     def mappingFunc(key: Long, values: Iterator[String], state:
>>> > > > GroupState[Long]): ... = {
>>> > > >       myAcc.add(1)
>>> > > >       println(s">>> key: $key => state: ${state}")
>>> > > >         ...
>>> > > >     }
>>> > > >
>>> > > >     val wordCounts = words
>>> > > >       .groupByKey(v => ...)
>>> > > >       .mapGroupsWithState(timeoutConf =
>>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>>> > > >
>>> > > >     val query = wordCounts.writeStream
>>> > > >       .outputMode(OutputMode.Update)
>>> > > >         ...
>>> > > > ```
>>> > > >
>>> > > > I'm wondering if there were any errors can be found from driver
>>> logs? The
>>> > > > micro-batch
>>> > > > exceptions won't terminate the streaming job running.
>>> > > >
>>> > > > For the following code, we have to make sure that
>>> `StateUpdateTask` is
>>> > > > started:
>>> > > > >                 .mapGroupsWithState(
>>> > > > >                         new
>>> > > >
>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>> > > > > appConfig, accumulators),
>>> > > > >                         Encoders.bean(ModelStateInfo.class),
>>> > > > >                         Encoders.bean(ModelUpdate.class),
>>> > > > >
>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>> > > >
>>> > > > --
>>> > > > Cheers,
>>> > > > -z
>>> > > >
>>> > > > On Thu, 28 May 2020 19:59:31 +0530
>>> > > > Srinivas V <sr...@gmail.com> wrote:
>>> > > >
>>> > > > > Giving the code below:
>>> > > > > //accumulators is a class level variable in driver.
>>> > > > >
>>> > > > >  sparkSession.streams().addListener(new StreamingQueryListener()
>>> {
>>> > > > >             @Override
>>> > > > >             public void onQueryStarted(QueryStartedEvent
>>> queryStarted) {
>>> > > > >                 logger.info("Query started: " +
>>> queryStarted.id());
>>> > > > >             }
>>> > > > >             @Override
>>> > > > >             public void onQueryTerminated(QueryTerminatedEvent
>>> > > > > queryTerminated) {
>>> > > > >                 logger.info("Query terminated: " +
>>> > > > queryTerminated.id());
>>> > > > >             }
>>> > > > >             @Override
>>> > > > >             public void onQueryProgress(QueryProgressEvent
>>> > > > queryProgress) {
>>> > > > >
>>> > > > >
>>> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>>> > > > >                 long eventsReceived = 0;
>>> > > > >                 long eventsExpired = 0;
>>> > > > >                 long eventSentSuccess = 0;
>>> > > > >                 try {
>>> > > > >                     eventsReceived =
>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>>> > > > >                     eventsExpired =
>>> > > > >
>>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>>> > > > >                     eventSentSuccess =
>>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>>> > > > >                 } catch (MissingKeyException e) {
>>> > > > >                     logger.error("Accumulator key not found due
>>> to
>>> > > > > Exception {}", e.getMessage());
>>> > > > >                 }
>>> > > > >                 logger.info("Events Received:{}",
>>> eventsReceived);
>>> > > > >                 logger.info("Events State Expired:{}",
>>> eventsExpired);
>>> > > > >                 logger.info("Events Sent Success:{}",
>>> eventSentSuccess);
>>> > > > >                 logger.info("Query made progress - batchId: {}
>>> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
>>> > > > > durationMs:{}" ,
>>> > > > >                         queryProgress.progress().batchId(),
>>> > > > > queryProgress.progress().numInputRows(),
>>> > > > > queryProgress.progress().inputRowsPerSecond(),
>>> > > > >
>>> > > >  queryProgress.progress().processedRowsPerSecond(),
>>> > > > > queryProgress.progress().durationMs());
>>> > > > >
>>> > > > >
>>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <we...@outlook.com>
>>> wrote:
>>> > > > >
>>> > > > > > May I get how the accumulator is accessed in the method
>>> > > > > > `onQueryProgress()`?
>>> > > > > >
>>> > > > > > AFAICT, the accumulator is incremented well. There is a way to
>>> verify
>>> > > > that
>>> > > > > > in cluster like this:
>>> > > > > > ```
>>> > > > > >     // Add the following while loop before invoking
>>> awaitTermination
>>> > > > > >     while (true) {
>>> > > > > >       println("My acc: " + myAcc.value)
>>> > > > > >       Thread.sleep(5 * 1000)
>>> > > > > >     }
>>> > > > > >
>>> > > > > >     //query.awaitTermination()
>>> > > > > > ```
>>> > > > > >
>>> > > > > > And the accumulator value updated can be found from driver
>>> stdout.
>>> > > > > >
>>> > > > > > --
>>> > > > > > Cheers,
>>> > > > > > -z
>>> > > > > >
>>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>>> > > > > > Srinivas V <sr...@gmail.com> wrote:
>>> > > > > >
>>> > > > > > > yes, I am using stateful structured streaming. Yes similar
>>> to what
>>> > > > you
>>> > > > > > do.
>>> > > > > > > This is in Java
>>> > > > > > > I do it this way:
>>> > > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>> > > > > > >                 .groupByKey(
>>> > > > > > >                         (MapFunction<InputEventModel,
>>> String>) event
>>> > > > ->
>>> > > > > > > event.getId(), Encoders.STRING())
>>> > > > > > >                 .mapGroupsWithState(
>>> > > > > > >                         new
>>> > > > > > >
>>> > > > > >
>>> > > >
>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>> > > > > > > appConfig, accumulators),
>>> > > > > > >                         Encoders.bean(ModelStateInfo.class),
>>> > > > > > >                         Encoders.bean(ModelUpdate.class),
>>> > > > > > >
>>>  GroupStateTimeout.ProcessingTimeTimeout());
>>> > > > > > >
>>> > > > > > > StateUpdateTask contains the update method.
>>> > > > > > >
>>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>>> > > > > > > mailinglists19@gmail.com> wrote:
>>> > > > > > >
>>> > > > > > > > Yes, that's exactly how I am creating them.
>>> > > > > > > >
>>> > > > > > > > Question... Are you using 'Stateful Structured Streaming'
>>> in which
>>> > > > > > you've
>>> > > > > > > > something like this?
>>> > > > > > > >
>>> > > > > > > >
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> > > > > > > >         updateAcrossEvents
>>> > > > > > > >       )
>>> > > > > > > >
>>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
>>> We're
>>> > > > > > experiencing this only under 'Stateful Structured Streaming'.
>>> In other
>>> > > > > > streaming applications it works as expected.
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > >
>>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>>> srini.vyr@gmail.com>
>>> > > > > > wrote:
>>> > > > > > > >
>>> > > > > > > >> Yes, I am talking about Application specific Accumulators.
>>> > > > Actually I
>>> > > > > > am
>>> > > > > > > >> getting the values printed in my driver log as well as
>>> sent to
>>> > > > > > Grafana. Not
>>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>>> “client” on
>>> > > > a
>>> > > > > > yarn
>>> > > > > > > >> cluster(not local Mac) where I submit from master node.
>>> It should
>>> > > > > > work the
>>> > > > > > > >> same for cluster mode as well.
>>> > > > > > > >> Create accumulators like this:
>>> > > > > > > >> AccumulatorV2 accumulator =
>>> sparkContext.longAccumulator(name);
>>> > > > > > > >>
>>> > > > > > > >>
>>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>> > > > > > > >> mailinglists19@gmail.com> wrote:
>>> > > > > > > >>
>>> > > > > > > >>> Hmm... how would they go to Graphana if they are not
>>> getting
>>> > > > > > computed in
>>> > > > > > > >>> your code? I am talking about the Application Specific
>>> > > > Accumulators.
>>> > > > > > The
>>> > > > > > > >>> other standard counters such as
>>> > > > 'event.progress.inputRowsPerSecond'
>>> > > > > > are
>>> > > > > > > >>> getting populated correctly!
>>> > > > > > > >>>
>>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
>>> srini.vyr@gmail.com>
>>> > > > > > wrote:
>>> > > > > > > >>>
>>> > > > > > > >>>> Hello,
>>> > > > > > > >>>> Even for me it comes as 0 when I print in
>>> OnQueryProgress. I use
>>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but
>>> not on
>>> > > > > > cluster.
>>> > > > > > > >>>> But one consolation is that when I send metrics to
>>> Graphana, the
>>> > > > > > values
>>> > > > > > > >>>> are coming there.
>>> > > > > > > >>>>
>>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>> > > > > > > >>>> mailinglists19@gmail.com> wrote:
>>> > > > > > > >>>>
>>> > > > > > > >>>>> No this is not working even if I use LongAccumulator.
>>> > > > > > > >>>>>
>>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
>>> wezhang@outlook.com
>>> > > > >
>>> > > > > > wrote:
>>> > > > > > > >>>>>
>>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the
>>> OUT type
>>> > > > > > should
>>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
>>> implementation
>>> > > > for
>>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there
>>> any
>>> > > > chance
>>> > > > > > to replace
>>> > > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2]
>>> or
>>> > > > > > LongAccumulator[3]
>>> > > > > > > >>>>>> and test if the StreamingListener and other codes are
>>> able to
>>> > > > > > work?
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> ---
>>> > > > > > > >>>>>> Cheers,
>>> > > > > > > >>>>>> -z
>>> > > > > > > >>>>>> [1]
>>> > > > > > > >>>>>>
>>> > > > > >
>>> > > >
>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
>>> > > > > > > >>>>>> [2]
>>> > > > > > > >>>>>>
>>> > > > > >
>>> > > >
>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
>>> > > > > > > >>>>>> [3]
>>> > > > > > > >>>>>>
>>> > > > > >
>>> > > >
>>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> ________________________________________
>>> > > > > > > >>>>>> From: Something Something <ma...@gmail.com>
>>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
>>> > > > > > > >>>>>> To: spark-user
>>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
>>> > > > Streaming
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> Can someone from Spark Development team tell me if
>>> this
>>> > > > > > functionality
>>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on
>>> this but
>>> > > > > > can't get it
>>> > > > > > > >>>>>> to work. Just to add more context, we've our own
>>> Accumulator
>>> > > > > > class that
>>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep
>>> track of
>>> > > > one or
>>> > > > > > more
>>> > > > > > > >>>>>> accumulators. Here's the definition:
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> class CollectionLongAccumulator[T]
>>> > > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> When the job begins we register an instance of this
>>> class:
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
>>> "MyAccumulator")
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> Is this working under Structured Streaming?
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> I will keep looking for alternate approaches but any
>>> help
>>> > > > would be
>>> > > > > > > >>>>>> greatly appreciated. Thanks.
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>> > > > > > > >>>>>> mailinglists19@gmail.com<mailto:
>>> mailinglists19@gmail.com>>
>>> > > > wrote:
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> In my structured streaming job I am updating Spark
>>> > > > Accumulators in
>>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0
>>> when I
>>> > > > try to
>>> > > > > > print
>>> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>> > > > > > > >>>>>>         updateAcrossEvents
>>> > > > > > > >>>>>>       )
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> The accumulators get incremented in
>>> 'updateAcrossEvents'.
>>> > > > I've a
>>> > > > > > > >>>>>> StreamingListener which writes values of the
>>> accumulators in
>>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
>>> Accumulators
>>> > > > are
>>> > > > > > ALWAYS
>>> > > > > > > >>>>>> ZERO!
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> When I added log statements in the
>>> updateAcrossEvents, I
>>> > > > could see
>>> > > > > > > >>>>>> that these accumulators are getting incremented as
>>> expected.
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode.
>>> In Local
>>> > > > mode
>>> > > > > > it
>>> > > > > > > >>>>>> works fine which implies that the Accumulators are
>>> not getting
>>> > > > > > distributed
>>> > > > > > > >>>>>> correctly - or something like that!
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that
>>> tell me to
>>> > > > > > > >>>>>> perform an "Action". That's not a solution here. This
>>> is a
>>> > > > > > 'Stateful
>>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also
>>> 'registering' them
>>> > > > in
>>> > > > > > > >>>>>> SparkContext.
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > > > >>>>>>
>>> > > > > >
>>> > > >
>>> >
>>> > ---------------------------------------------------------------------
>>> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>>> >
>>>
>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Srinivas V <sr...@gmail.com>.
You don’t need to have a separate class. I created that as it has lot of
code and logic in my case.
For you to quickly test you can use Zhang’s Scala code in this chain.
Pasting it below for your quick reference:

```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event:
StreamingQueryListener.QueryProgressEvent):
Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
        ...
    })

    def mappingFunc(key: Long, values: Iterator[String], state:
GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
        ...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf =
GroupStateTimeout.ProcessingTimeTimeout)(func
= mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)


On Mon, Jun 8, 2020 at 11:14 AM Something Something <
mailinglists19@gmail.com> wrote:

> Great. I guess the trick is to use a separate class such as
> 'StateUpdateTask'. I will try that. My challenge is to convert this into
> Scala. Will try it out & revert. Thanks for the tips.
>
> On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <we...@outlook.com> wrote:
>
>> The following Java codes can work in my cluster environment:
>> ```
>>     .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long,
>> LeadingCharCount>) (key, values, state) -> {
>>                 myAcc.add(1);
>>                 <...>
>>                 state.update(newState);
>>                 return new LeadingCharCount(key, newState);
>>             },
>>             Encoders.LONG(),
>>             Encoders.bean(LeadingCharCount.class),
>>             GroupStateTimeout.ProcessingTimeTimeout())
>> ```
>>
>> Also works fine with my `StateUpdateTask`:
>> ```
>>     .mapGroupsWithState(
>>             new StateUpdateTask(myAcc),
>>             Encoders.LONG(),
>>             Encoders.bean(LeadingCharCount.class),
>>             GroupStateTimeout.ProcessingTimeTimeout());
>>
>> public class StateUpdateTask
>>             implements MapGroupsWithStateFunction<String, String, Long,
>> LeadingCharCount> {
>>         private LongAccumulator myAccInTask;
>>
>>         public StateUpdateTask(LongAccumulator acc) {
>>             this.myAccInTask = acc;
>>         }
>>
>>         @Override
>>         public LeadingCharCount call(String key, Iterator<String> values,
>> GroupState<Long> state) throws Exception {
>>             myAccInTask.add(1);
>>             <...>
>>             state.update(newState);
>>             return new LeadingCharCount(key, newState);
>>         }
>> }
>> ```
>>
>> --
>> Cheers,
>> -z
>>
>> On Tue, 2 Jun 2020 10:28:36 +0800
>> ZHANG Wei <we...@outlook.com> wrote:
>>
>> > Yes, verified on the cluster with 5 executors.
>> >
>> > --
>> > Cheers,
>> > -z
>> >
>> > On Fri, 29 May 2020 11:16:12 -0700
>> > Something Something <ma...@gmail.com> wrote:
>> >
>> > > Did you try this on the Cluster? Note: This works just fine under
>> 'Local'
>> > > mode.
>> > >
>> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <we...@outlook.com>
>> wrote:
>> > >
>> > > > I can't reproduce the issue with my simple code:
>> > > > ```scala
>> > > >     spark.streams.addListener(new StreamingQueryListener {
>> > > >       override def onQueryProgress(event:
>> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
>> > > >         println(event.progress.id + " is on progress")
>> > > >         println(s"My accu is ${myAcc.value} on query progress")
>> > > >       }
>> > > >         ...
>> > > >     })
>> > > >
>> > > >     def mappingFunc(key: Long, values: Iterator[String], state:
>> > > > GroupState[Long]): ... = {
>> > > >       myAcc.add(1)
>> > > >       println(s">>> key: $key => state: ${state}")
>> > > >         ...
>> > > >     }
>> > > >
>> > > >     val wordCounts = words
>> > > >       .groupByKey(v => ...)
>> > > >       .mapGroupsWithState(timeoutConf =
>> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>> > > >
>> > > >     val query = wordCounts.writeStream
>> > > >       .outputMode(OutputMode.Update)
>> > > >         ...
>> > > > ```
>> > > >
>> > > > I'm wondering if there were any errors can be found from driver
>> logs? The
>> > > > micro-batch
>> > > > exceptions won't terminate the streaming job running.
>> > > >
>> > > > For the following code, we have to make sure that `StateUpdateTask`
>> is
>> > > > started:
>> > > > >                 .mapGroupsWithState(
>> > > > >                         new
>> > > >
>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>> > > > > appConfig, accumulators),
>> > > > >                         Encoders.bean(ModelStateInfo.class),
>> > > > >                         Encoders.bean(ModelUpdate.class),
>> > > > >
>>  GroupStateTimeout.ProcessingTimeTimeout());
>> > > >
>> > > > --
>> > > > Cheers,
>> > > > -z
>> > > >
>> > > > On Thu, 28 May 2020 19:59:31 +0530
>> > > > Srinivas V <sr...@gmail.com> wrote:
>> > > >
>> > > > > Giving the code below:
>> > > > > //accumulators is a class level variable in driver.
>> > > > >
>> > > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
>> > > > >             @Override
>> > > > >             public void onQueryStarted(QueryStartedEvent
>> queryStarted) {
>> > > > >                 logger.info("Query started: " +
>> queryStarted.id());
>> > > > >             }
>> > > > >             @Override
>> > > > >             public void onQueryTerminated(QueryTerminatedEvent
>> > > > > queryTerminated) {
>> > > > >                 logger.info("Query terminated: " +
>> > > > queryTerminated.id());
>> > > > >             }
>> > > > >             @Override
>> > > > >             public void onQueryProgress(QueryProgressEvent
>> > > > queryProgress) {
>> > > > >
>> > > > >
>> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>> > > > >                 long eventsReceived = 0;
>> > > > >                 long eventsExpired = 0;
>> > > > >                 long eventSentSuccess = 0;
>> > > > >                 try {
>> > > > >                     eventsReceived =
>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>> > > > >                     eventsExpired =
>> > > > >
>> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>> > > > >                     eventSentSuccess =
>> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>> > > > >                 } catch (MissingKeyException e) {
>> > > > >                     logger.error("Accumulator key not found due to
>> > > > > Exception {}", e.getMessage());
>> > > > >                 }
>> > > > >                 logger.info("Events Received:{}",
>> eventsReceived);
>> > > > >                 logger.info("Events State Expired:{}",
>> eventsExpired);
>> > > > >                 logger.info("Events Sent Success:{}",
>> eventSentSuccess);
>> > > > >                 logger.info("Query made progress - batchId: {}
>> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
>> > > > > durationMs:{}" ,
>> > > > >                         queryProgress.progress().batchId(),
>> > > > > queryProgress.progress().numInputRows(),
>> > > > > queryProgress.progress().inputRowsPerSecond(),
>> > > > >
>> > > >  queryProgress.progress().processedRowsPerSecond(),
>> > > > > queryProgress.progress().durationMs());
>> > > > >
>> > > > >
>> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <we...@outlook.com>
>> wrote:
>> > > > >
>> > > > > > May I get how the accumulator is accessed in the method
>> > > > > > `onQueryProgress()`?
>> > > > > >
>> > > > > > AFAICT, the accumulator is incremented well. There is a way to
>> verify
>> > > > that
>> > > > > > in cluster like this:
>> > > > > > ```
>> > > > > >     // Add the following while loop before invoking
>> awaitTermination
>> > > > > >     while (true) {
>> > > > > >       println("My acc: " + myAcc.value)
>> > > > > >       Thread.sleep(5 * 1000)
>> > > > > >     }
>> > > > > >
>> > > > > >     //query.awaitTermination()
>> > > > > > ```
>> > > > > >
>> > > > > > And the accumulator value updated can be found from driver
>> stdout.
>> > > > > >
>> > > > > > --
>> > > > > > Cheers,
>> > > > > > -z
>> > > > > >
>> > > > > > On Thu, 28 May 2020 17:12:48 +0530
>> > > > > > Srinivas V <sr...@gmail.com> wrote:
>> > > > > >
>> > > > > > > yes, I am using stateful structured streaming. Yes similar to
>> what
>> > > > you
>> > > > > > do.
>> > > > > > > This is in Java
>> > > > > > > I do it this way:
>> > > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
>> > > > > > >                 .groupByKey(
>> > > > > > >                         (MapFunction<InputEventModel,
>> String>) event
>> > > > ->
>> > > > > > > event.getId(), Encoders.STRING())
>> > > > > > >                 .mapGroupsWithState(
>> > > > > > >                         new
>> > > > > > >
>> > > > > >
>> > > >
>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>> > > > > > > appConfig, accumulators),
>> > > > > > >                         Encoders.bean(ModelStateInfo.class),
>> > > > > > >                         Encoders.bean(ModelUpdate.class),
>> > > > > > >
>>  GroupStateTimeout.ProcessingTimeTimeout());
>> > > > > > >
>> > > > > > > StateUpdateTask contains the update method.
>> > > > > > >
>> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
>> > > > > > > mailinglists19@gmail.com> wrote:
>> > > > > > >
>> > > > > > > > Yes, that's exactly how I am creating them.
>> > > > > > > >
>> > > > > > > > Question... Are you using 'Stateful Structured Streaming'
>> in which
>> > > > > > you've
>> > > > > > > > something like this?
>> > > > > > > >
>> > > > > > > >
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> > > > > > > >         updateAcrossEvents
>> > > > > > > >       )
>> > > > > > > >
>> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
>> We're
>> > > > > > experiencing this only under 'Stateful Structured Streaming'.
>> In other
>> > > > > > streaming applications it works as expected.
>> > > > > > > >
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
>> srini.vyr@gmail.com>
>> > > > > > wrote:
>> > > > > > > >
>> > > > > > > >> Yes, I am talking about Application specific Accumulators.
>> > > > Actually I
>> > > > > > am
>> > > > > > > >> getting the values printed in my driver log as well as
>> sent to
>> > > > > > Grafana. Not
>> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
>> “client” on
>> > > > a
>> > > > > > yarn
>> > > > > > > >> cluster(not local Mac) where I submit from master node. It
>> should
>> > > > > > work the
>> > > > > > > >> same for cluster mode as well.
>> > > > > > > >> Create accumulators like this:
>> > > > > > > >> AccumulatorV2 accumulator =
>> sparkContext.longAccumulator(name);
>> > > > > > > >>
>> > > > > > > >>
>> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
>> > > > > > > >> mailinglists19@gmail.com> wrote:
>> > > > > > > >>
>> > > > > > > >>> Hmm... how would they go to Graphana if they are not
>> getting
>> > > > > > computed in
>> > > > > > > >>> your code? I am talking about the Application Specific
>> > > > Accumulators.
>> > > > > > The
>> > > > > > > >>> other standard counters such as
>> > > > 'event.progress.inputRowsPerSecond'
>> > > > > > are
>> > > > > > > >>> getting populated correctly!
>> > > > > > > >>>
>> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
>> srini.vyr@gmail.com>
>> > > > > > wrote:
>> > > > > > > >>>
>> > > > > > > >>>> Hello,
>> > > > > > > >>>> Even for me it comes as 0 when I print in
>> OnQueryProgress. I use
>> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but
>> not on
>> > > > > > cluster.
>> > > > > > > >>>> But one consolation is that when I send metrics to
>> Graphana, the
>> > > > > > values
>> > > > > > > >>>> are coming there.
>> > > > > > > >>>>
>> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>> > > > > > > >>>> mailinglists19@gmail.com> wrote:
>> > > > > > > >>>>
>> > > > > > > >>>>> No this is not working even if I use LongAccumulator.
>> > > > > > > >>>>>
>> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
>> wezhang@outlook.com
>> > > > >
>> > > > > > wrote:
>> > > > > > > >>>>>
>> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the
>> OUT type
>> > > > > > should
>> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
>> implementation
>> > > > for
>> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there
>> any
>> > > > chance
>> > > > > > to replace
>> > > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2]
>> or
>> > > > > > LongAccumulator[3]
>> > > > > > > >>>>>> and test if the StreamingListener and other codes are
>> able to
>> > > > > > work?
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> ---
>> > > > > > > >>>>>> Cheers,
>> > > > > > > >>>>>> -z
>> > > > > > > >>>>>> [1]
>> > > > > > > >>>>>>
>> > > > > >
>> > > >
>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
>> > > > > > > >>>>>> [2]
>> > > > > > > >>>>>>
>> > > > > >
>> > > >
>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
>> > > > > > > >>>>>> [3]
>> > > > > > > >>>>>>
>> > > > > >
>> > > >
>> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> ________________________________________
>> > > > > > > >>>>>> From: Something Something <ma...@gmail.com>
>> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
>> > > > > > > >>>>>> To: spark-user
>> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
>> > > > Streaming
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> Can someone from Spark Development team tell me if this
>> > > > > > functionality
>> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on
>> this but
>> > > > > > can't get it
>> > > > > > > >>>>>> to work. Just to add more context, we've our own
>> Accumulator
>> > > > > > class that
>> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep
>> track of
>> > > > one or
>> > > > > > more
>> > > > > > > >>>>>> accumulators. Here's the definition:
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> class CollectionLongAccumulator[T]
>> > > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> When the job begins we register an instance of this
>> class:
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
>> "MyAccumulator")
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> Is this working under Structured Streaming?
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> I will keep looking for alternate approaches but any
>> help
>> > > > would be
>> > > > > > > >>>>>> greatly appreciated. Thanks.
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>> > > > > > > >>>>>> mailinglists19@gmail.com<mailto:
>> mailinglists19@gmail.com>>
>> > > > wrote:
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> In my structured streaming job I am updating Spark
>> > > > Accumulators in
>> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0
>> when I
>> > > > try to
>> > > > > > print
>> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>> > > > > > > >>>>>>         updateAcrossEvents
>> > > > > > > >>>>>>       )
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> The accumulators get incremented in
>> 'updateAcrossEvents'.
>> > > > I've a
>> > > > > > > >>>>>> StreamingListener which writes values of the
>> accumulators in
>> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
>> Accumulators
>> > > > are
>> > > > > > ALWAYS
>> > > > > > > >>>>>> ZERO!
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> When I added log statements in the updateAcrossEvents,
>> I
>> > > > could see
>> > > > > > > >>>>>> that these accumulators are getting incremented as
>> expected.
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In
>> Local
>> > > > mode
>> > > > > > it
>> > > > > > > >>>>>> works fine which implies that the Accumulators are not
>> getting
>> > > > > > distributed
>> > > > > > > >>>>>> correctly - or something like that!
>> > > > > > > >>>>>>
>> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that
>> tell me to
>> > > > > > > >>>>>> perform an "Action". That's not a solution here. This
>> is a
>> > > > > > 'Stateful
>> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also
>> 'registering' them
>> > > > in
>> > > > > > > >>>>>> SparkContext.
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > > > >>>>>>
>> > > > > >
>> > > >
>> >
>> > ---------------------------------------------------------------------
>> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>> >
>>
>

Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
Great. I guess the trick is to use a separate class such as
'StateUpdateTask'. I will try that. My challenge is to convert this into
Scala. Will try it out & revert. Thanks for the tips.

On Wed, Jun 3, 2020 at 11:56 PM ZHANG Wei <we...@outlook.com> wrote:

> The following Java codes can work in my cluster environment:
> ```
>     .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long,
> LeadingCharCount>) (key, values, state) -> {
>                 myAcc.add(1);
>                 <...>
>                 state.update(newState);
>                 return new LeadingCharCount(key, newState);
>             },
>             Encoders.LONG(),
>             Encoders.bean(LeadingCharCount.class),
>             GroupStateTimeout.ProcessingTimeTimeout())
> ```
>
> Also works fine with my `StateUpdateTask`:
> ```
>     .mapGroupsWithState(
>             new StateUpdateTask(myAcc),
>             Encoders.LONG(),
>             Encoders.bean(LeadingCharCount.class),
>             GroupStateTimeout.ProcessingTimeTimeout());
>
> public class StateUpdateTask
>             implements MapGroupsWithStateFunction<String, String, Long,
> LeadingCharCount> {
>         private LongAccumulator myAccInTask;
>
>         public StateUpdateTask(LongAccumulator acc) {
>             this.myAccInTask = acc;
>         }
>
>         @Override
>         public LeadingCharCount call(String key, Iterator<String> values,
> GroupState<Long> state) throws Exception {
>             myAccInTask.add(1);
>             <...>
>             state.update(newState);
>             return new LeadingCharCount(key, newState);
>         }
> }
> ```
>
> --
> Cheers,
> -z
>
> On Tue, 2 Jun 2020 10:28:36 +0800
> ZHANG Wei <we...@outlook.com> wrote:
>
> > Yes, verified on the cluster with 5 executors.
> >
> > --
> > Cheers,
> > -z
> >
> > On Fri, 29 May 2020 11:16:12 -0700
> > Something Something <ma...@gmail.com> wrote:
> >
> > > Did you try this on the Cluster? Note: This works just fine under
> 'Local'
> > > mode.
> > >
> > > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <we...@outlook.com> wrote:
> > >
> > > > I can't reproduce the issue with my simple code:
> > > > ```scala
> > > >     spark.streams.addListener(new StreamingQueryListener {
> > > >       override def onQueryProgress(event:
> > > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > > >         println(event.progress.id + " is on progress")
> > > >         println(s"My accu is ${myAcc.value} on query progress")
> > > >       }
> > > >         ...
> > > >     })
> > > >
> > > >     def mappingFunc(key: Long, values: Iterator[String], state:
> > > > GroupState[Long]): ... = {
> > > >       myAcc.add(1)
> > > >       println(s">>> key: $key => state: ${state}")
> > > >         ...
> > > >     }
> > > >
> > > >     val wordCounts = words
> > > >       .groupByKey(v => ...)
> > > >       .mapGroupsWithState(timeoutConf =
> > > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > > >
> > > >     val query = wordCounts.writeStream
> > > >       .outputMode(OutputMode.Update)
> > > >         ...
> > > > ```
> > > >
> > > > I'm wondering if there were any errors can be found from driver
> logs? The
> > > > micro-batch
> > > > exceptions won't terminate the streaming job running.
> > > >
> > > > For the following code, we have to make sure that `StateUpdateTask`
> is
> > > > started:
> > > > >                 .mapGroupsWithState(
> > > > >                         new
> > > >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > appConfig, accumulators),
> > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > >                         Encoders.bean(ModelUpdate.class),
> > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > >
> > > > --
> > > > Cheers,
> > > > -z
> > > >
> > > > On Thu, 28 May 2020 19:59:31 +0530
> > > > Srinivas V <sr...@gmail.com> wrote:
> > > >
> > > > > Giving the code below:
> > > > > //accumulators is a class level variable in driver.
> > > > >
> > > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > > >             @Override
> > > > >             public void onQueryStarted(QueryStartedEvent
> queryStarted) {
> > > > >                 logger.info("Query started: " +
> queryStarted.id());
> > > > >             }
> > > > >             @Override
> > > > >             public void onQueryTerminated(QueryTerminatedEvent
> > > > > queryTerminated) {
> > > > >                 logger.info("Query terminated: " +
> > > > queryTerminated.id());
> > > > >             }
> > > > >             @Override
> > > > >             public void onQueryProgress(QueryProgressEvent
> > > > queryProgress) {
> > > > >
> > > > >
> accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > > >                 long eventsReceived = 0;
> > > > >                 long eventsExpired = 0;
> > > > >                 long eventSentSuccess = 0;
> > > > >                 try {
> > > > >                     eventsReceived =
> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > > >                     eventsExpired =
> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > > >                     eventSentSuccess =
> > > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > > >                 } catch (MissingKeyException e) {
> > > > >                     logger.error("Accumulator key not found due to
> > > > > Exception {}", e.getMessage());
> > > > >                 }
> > > > >                 logger.info("Events Received:{}", eventsReceived);
> > > > >                 logger.info("Events State Expired:{}",
> eventsExpired);
> > > > >                 logger.info("Events Sent Success:{}",
> eventSentSuccess);
> > > > >                 logger.info("Query made progress - batchId: {}
> > > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > > > durationMs:{}" ,
> > > > >                         queryProgress.progress().batchId(),
> > > > > queryProgress.progress().numInputRows(),
> > > > > queryProgress.progress().inputRowsPerSecond(),
> > > > >
> > > >  queryProgress.progress().processedRowsPerSecond(),
> > > > > queryProgress.progress().durationMs());
> > > > >
> > > > >
> > > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <we...@outlook.com>
> wrote:
> > > > >
> > > > > > May I get how the accumulator is accessed in the method
> > > > > > `onQueryProgress()`?
> > > > > >
> > > > > > AFAICT, the accumulator is incremented well. There is a way to
> verify
> > > > that
> > > > > > in cluster like this:
> > > > > > ```
> > > > > >     // Add the following while loop before invoking
> awaitTermination
> > > > > >     while (true) {
> > > > > >       println("My acc: " + myAcc.value)
> > > > > >       Thread.sleep(5 * 1000)
> > > > > >     }
> > > > > >
> > > > > >     //query.awaitTermination()
> > > > > > ```
> > > > > >
> > > > > > And the accumulator value updated can be found from driver
> stdout.
> > > > > >
> > > > > > --
> > > > > > Cheers,
> > > > > > -z
> > > > > >
> > > > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > > > Srinivas V <sr...@gmail.com> wrote:
> > > > > >
> > > > > > > yes, I am using stateful structured streaming. Yes similar to
> what
> > > > you
> > > > > > do.
> > > > > > > This is in Java
> > > > > > > I do it this way:
> > > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > > > >                 .groupByKey(
> > > > > > >                         (MapFunction<InputEventModel, String>)
> event
> > > > ->
> > > > > > > event.getId(), Encoders.STRING())
> > > > > > >                 .mapGroupsWithState(
> > > > > > >                         new
> > > > > > >
> > > > > >
> > > >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > > appConfig, accumulators),
> > > > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > > > >                         Encoders.bean(ModelUpdate.class),
> > > > > > >
>  GroupStateTimeout.ProcessingTimeTimeout());
> > > > > > >
> > > > > > > StateUpdateTask contains the update method.
> > > > > > >
> > > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > > mailinglists19@gmail.com> wrote:
> > > > > > >
> > > > > > > > Yes, that's exactly how I am creating them.
> > > > > > > >
> > > > > > > > Question... Are you using 'Stateful Structured Streaming' in
> which
> > > > > > you've
> > > > > > > > something like this?
> > > > > > > >
> > > > > > > >
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > > >         updateAcrossEvents
> > > > > > > >       )
> > > > > > > >
> > > > > > > > And updating the Accumulator inside 'updateAcrossEvents'?
> We're
> > > > > > experiencing this only under 'Stateful Structured Streaming'. In
> other
> > > > > > streaming applications it works as expected.
> > > > > > > >
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <
> srini.vyr@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > > Actually I
> > > > > > am
> > > > > > > >> getting the values printed in my driver log as well as sent
> to
> > > > > > Grafana. Not
> > > > > > > >> sure where and when I saw 0 before. My deploy mode is
> “client” on
> > > > a
> > > > > > yarn
> > > > > > > >> cluster(not local Mac) where I submit from master node. It
> should
> > > > > > work the
> > > > > > > >> same for cluster mode as well.
> > > > > > > >> Create accumulators like this:
> > > > > > > >> AccumulatorV2 accumulator =
> sparkContext.longAccumulator(name);
> > > > > > > >>
> > > > > > > >>
> > > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > > >> mailinglists19@gmail.com> wrote:
> > > > > > > >>
> > > > > > > >>> Hmm... how would they go to Graphana if they are not
> getting
> > > > > > computed in
> > > > > > > >>> your code? I am talking about the Application Specific
> > > > Accumulators.
> > > > > > The
> > > > > > > >>> other standard counters such as
> > > > 'event.progress.inputRowsPerSecond'
> > > > > > are
> > > > > > > >>> getting populated correctly!
> > > > > > > >>>
> > > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <
> srini.vyr@gmail.com>
> > > > > > wrote:
> > > > > > > >>>
> > > > > > > >>>> Hello,
> > > > > > > >>>> Even for me it comes as 0 when I print in
> OnQueryProgress. I use
> > > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but
> not on
> > > > > > cluster.
> > > > > > > >>>> But one consolation is that when I send metrics to
> Graphana, the
> > > > > > values
> > > > > > > >>>> are coming there.
> > > > > > > >>>>
> > > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > > >>>> mailinglists19@gmail.com> wrote:
> > > > > > > >>>>
> > > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > > >>>>>
> > > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <
> wezhang@outlook.com
> > > > >
> > > > > > wrote:
> > > > > > > >>>>>
> > > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the
> OUT type
> > > > > > should
> > > > > > > >>>>>> be atomic or thread safe. I'm wondering if the
> implementation
> > > > for
> > > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there
> any
> > > > chance
> > > > > > to replace
> > > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > > LongAccumulator[3]
> > > > > > > >>>>>> and test if the StreamingListener and other codes are
> able to
> > > > > > work?
> > > > > > > >>>>>>
> > > > > > > >>>>>> ---
> > > > > > > >>>>>> Cheers,
> > > > > > > >>>>>> -z
> > > > > > > >>>>>> [1]
> > > > > > > >>>>>>
> > > > > >
> > > >
> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > > > >>>>>> [2]
> > > > > > > >>>>>>
> > > > > >
> > > >
> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > > > >>>>>> [3]
> > > > > > > >>>>>>
> > > > > >
> > > >
> https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > > > >>>>>>
> > > > > > > >>>>>> ________________________________________
> > > > > > > >>>>>> From: Something Something <ma...@gmail.com>
> > > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > > > >>>>>> To: spark-user
> > > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > > > Streaming
> > > > > > > >>>>>>
> > > > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > > > functionality
> > > > > > > >>>>>> is supported and tested? I've spent a lot of time on
> this but
> > > > > > can't get it
> > > > > > > >>>>>> to work. Just to add more context, we've our own
> Accumulator
> > > > > > class that
> > > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track
> of
> > > > one or
> > > > > > more
> > > > > > > >>>>>> accumulators. Here's the definition:
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > > > >>>>>>
> > > > > > > >>>>>> When the job begins we register an instance of this
> class:
> > > > > > > >>>>>>
> > > > > > > >>>>>> spark.sparkContext.register(myAccumulator,
> "MyAccumulator")
> > > > > > > >>>>>>
> > > > > > > >>>>>> Is this working under Structured Streaming?
> > > > > > > >>>>>>
> > > > > > > >>>>>> I will keep looking for alternate approaches but any
> help
> > > > would be
> > > > > > > >>>>>> greatly appreciated. Thanks.
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > > > >>>>>> mailinglists19@gmail.com<mailto:
> mailinglists19@gmail.com>>
> > > > wrote:
> > > > > > > >>>>>>
> > > > > > > >>>>>> In my structured streaming job I am updating Spark
> > > > Accumulators in
> > > > > > > >>>>>> the updateAcrossEvents method but they are always 0
> when I
> > > > try to
> > > > > > print
> > > > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > > >>>>>>         updateAcrossEvents
> > > > > > > >>>>>>       )
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>> The accumulators get incremented in
> 'updateAcrossEvents'.
> > > > I've a
> > > > > > > >>>>>> StreamingListener which writes values of the
> accumulators in
> > > > > > > >>>>>> 'onQueryProgress' method but in this method the
> Accumulators
> > > > are
> > > > > > ALWAYS
> > > > > > > >>>>>> ZERO!
> > > > > > > >>>>>>
> > > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > > > could see
> > > > > > > >>>>>> that these accumulators are getting incremented as
> expected.
> > > > > > > >>>>>>
> > > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In
> Local
> > > > mode
> > > > > > it
> > > > > > > >>>>>> works fine which implies that the Accumulators are not
> getting
> > > > > > distributed
> > > > > > > >>>>>> correctly - or something like that!
> > > > > > > >>>>>>
> > > > > > > >>>>>> Note: I've seen quite a few answers on the Web that
> tell me to
> > > > > > > >>>>>> perform an "Action". That's not a solution here. This
> is a
> > > > > > 'Stateful
> > > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering'
> them
> > > > in
> > > > > > > >>>>>> SparkContext.
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > > > >>>>>>
> > > > > >
> > > >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> >
>

Re: Using Spark Accumulators with Structured Streaming

Posted by ZHANG Wei <we...@outlook.com>.
The following Java codes can work in my cluster environment:
```
    .mapGroupsWithState((MapGroupsWithStateFunction<String, String, Long, LeadingCharCount>) (key, values, state) -> {
                myAcc.add(1);
		<...>
                state.update(newState);
                return new LeadingCharCount(key, newState);
            },
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout())
```

Also works fine with my `StateUpdateTask`:
```
    .mapGroupsWithState(
            new StateUpdateTask(myAcc),
            Encoders.LONG(),
            Encoders.bean(LeadingCharCount.class),
            GroupStateTimeout.ProcessingTimeTimeout());

public class StateUpdateTask
            implements MapGroupsWithStateFunction<String, String, Long, LeadingCharCount> {
        private LongAccumulator myAccInTask;

        public StateUpdateTask(LongAccumulator acc) {
            this.myAccInTask = acc;
        }

        @Override
        public LeadingCharCount call(String key, Iterator<String> values, GroupState<Long> state) throws Exception {
            myAccInTask.add(1);
            <...>
            state.update(newState);
            return new LeadingCharCount(key, newState);
        }
}
```

-- 
Cheers,
-z

On Tue, 2 Jun 2020 10:28:36 +0800
ZHANG Wei <we...@outlook.com> wrote:

> Yes, verified on the cluster with 5 executors.
> 
> -- 
> Cheers,
> -z
> 
> On Fri, 29 May 2020 11:16:12 -0700
> Something Something <ma...@gmail.com> wrote:
> 
> > Did you try this on the Cluster? Note: This works just fine under 'Local'
> > mode.
> > 
> > On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <we...@outlook.com> wrote:
> > 
> > > I can't reproduce the issue with my simple code:
> > > ```scala
> > >     spark.streams.addListener(new StreamingQueryListener {
> > >       override def onQueryProgress(event:
> > > StreamingQueryListener.QueryProgressEvent): Unit = {
> > >         println(event.progress.id + " is on progress")
> > >         println(s"My accu is ${myAcc.value} on query progress")
> > >       }
> > >         ...
> > >     })
> > >
> > >     def mappingFunc(key: Long, values: Iterator[String], state:
> > > GroupState[Long]): ... = {
> > >       myAcc.add(1)
> > >       println(s">>> key: $key => state: ${state}")
> > >         ...
> > >     }
> > >
> > >     val wordCounts = words
> > >       .groupByKey(v => ...)
> > >       .mapGroupsWithState(timeoutConf =
> > > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> > >
> > >     val query = wordCounts.writeStream
> > >       .outputMode(OutputMode.Update)
> > >         ...
> > > ```
> > >
> > > I'm wondering if there were any errors can be found from driver logs? The
> > > micro-batch
> > > exceptions won't terminate the streaming job running.
> > >
> > > For the following code, we have to make sure that `StateUpdateTask` is
> > > started:
> > > >                 .mapGroupsWithState(
> > > >                         new
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > >                         Encoders.bean(ModelStateInfo.class),
> > > >                         Encoders.bean(ModelUpdate.class),
> > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 19:59:31 +0530
> > > Srinivas V <sr...@gmail.com> wrote:
> > >
> > > > Giving the code below:
> > > > //accumulators is a class level variable in driver.
> > > >
> > > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > > >             @Override
> > > >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> > > >                 logger.info("Query started: " + queryStarted.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryTerminated(QueryTerminatedEvent
> > > > queryTerminated) {
> > > >                 logger.info("Query terminated: " +
> > > queryTerminated.id());
> > > >             }
> > > >             @Override
> > > >             public void onQueryProgress(QueryProgressEvent
> > > queryProgress) {
> > > >
> > > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > > >                 long eventsReceived = 0;
> > > >                 long eventsExpired = 0;
> > > >                 long eventSentSuccess = 0;
> > > >                 try {
> > > >                     eventsReceived =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > > >                     eventsExpired =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > > >                     eventSentSuccess =
> > > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > > >                 } catch (MissingKeyException e) {
> > > >                     logger.error("Accumulator key not found due to
> > > > Exception {}", e.getMessage());
> > > >                 }
> > > >                 logger.info("Events Received:{}", eventsReceived);
> > > >                 logger.info("Events State Expired:{}", eventsExpired);
> > > >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> > > >                 logger.info("Query made progress - batchId: {}
> > > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > > durationMs:{}" ,
> > > >                         queryProgress.progress().batchId(),
> > > > queryProgress.progress().numInputRows(),
> > > > queryProgress.progress().inputRowsPerSecond(),
> > > >
> > >  queryProgress.progress().processedRowsPerSecond(),
> > > > queryProgress.progress().durationMs());
> > > >
> > > >
> > > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <we...@outlook.com> wrote:
> > > >
> > > > > May I get how the accumulator is accessed in the method
> > > > > `onQueryProgress()`?
> > > > >
> > > > > AFAICT, the accumulator is incremented well. There is a way to verify
> > > that
> > > > > in cluster like this:
> > > > > ```
> > > > >     // Add the following while loop before invoking awaitTermination
> > > > >     while (true) {
> > > > >       println("My acc: " + myAcc.value)
> > > > >       Thread.sleep(5 * 1000)
> > > > >     }
> > > > >
> > > > >     //query.awaitTermination()
> > > > > ```
> > > > >
> > > > > And the accumulator value updated can be found from driver stdout.
> > > > >
> > > > > --
> > > > > Cheers,
> > > > > -z
> > > > >
> > > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > > Srinivas V <sr...@gmail.com> wrote:
> > > > >
> > > > > > yes, I am using stateful structured streaming. Yes similar to what
> > > you
> > > > > do.
> > > > > > This is in Java
> > > > > > I do it this way:
> > > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > > >                 .groupByKey(
> > > > > >                         (MapFunction<InputEventModel, String>) event
> > > ->
> > > > > > event.getId(), Encoders.STRING())
> > > > > >                 .mapGroupsWithState(
> > > > > >                         new
> > > > > >
> > > > >
> > > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > > appConfig, accumulators),
> > > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > > >                         Encoders.bean(ModelUpdate.class),
> > > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > > > >
> > > > > > StateUpdateTask contains the update method.
> > > > > >
> > > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > > mailinglists19@gmail.com> wrote:
> > > > > >
> > > > > > > Yes, that's exactly how I am creating them.
> > > > > > >
> > > > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > > > you've
> > > > > > > something like this?
> > > > > > >
> > > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >         updateAcrossEvents
> > > > > > >       )
> > > > > > >
> > > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > > streaming applications it works as expected.
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com>
> > > > > wrote:
> > > > > > >
> > > > > > >> Yes, I am talking about Application specific Accumulators.
> > > Actually I
> > > > > am
> > > > > > >> getting the values printed in my driver log as well as sent to
> > > > > Grafana. Not
> > > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > > a
> > > > > yarn
> > > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > > work the
> > > > > > >> same for cluster mode as well.
> > > > > > >> Create accumulators like this:
> > > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > > >>
> > > > > > >>
> > > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > > >> mailinglists19@gmail.com> wrote:
> > > > > > >>
> > > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > > computed in
> > > > > > >>> your code? I am talking about the Application Specific
> > > Accumulators.
> > > > > The
> > > > > > >>> other standard counters such as
> > > 'event.progress.inputRowsPerSecond'
> > > > > are
> > > > > > >>> getting populated correctly!
> > > > > > >>>
> > > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com>
> > > > > wrote:
> > > > > > >>>
> > > > > > >>>> Hello,
> > > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > > cluster.
> > > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > > values
> > > > > > >>>> are coming there.
> > > > > > >>>>
> > > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > > >>>> mailinglists19@gmail.com> wrote:
> > > > > > >>>>
> > > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > > >>>>>
> > > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <wezhang@outlook.com
> > > >
> > > > > wrote:
> > > > > > >>>>>
> > > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > > should
> > > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > > for
> > > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > > chance
> > > > > to replace
> > > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > > LongAccumulator[3]
> > > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > > work?
> > > > > > >>>>>>
> > > > > > >>>>>> ---
> > > > > > >>>>>> Cheers,
> > > > > > >>>>>> -z
> > > > > > >>>>>> [1]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > > >>>>>> [2]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > > >>>>>> [3]
> > > > > > >>>>>>
> > > > >
> > > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > > >>>>>>
> > > > > > >>>>>> ________________________________________
> > > > > > >>>>>> From: Something Something <ma...@gmail.com>
> > > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > > >>>>>> To: spark-user
> > > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > > Streaming
> > > > > > >>>>>>
> > > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > > functionality
> > > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > > can't get it
> > > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > > class that
> > > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > > one or
> > > > > more
> > > > > > >>>>>> accumulators. Here's the definition:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > > >>>>>>
> > > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > > >>>>>>
> > > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > > >>>>>>
> > > > > > >>>>>> Is this working under Structured Streaming?
> > > > > > >>>>>>
> > > > > > >>>>>> I will keep looking for alternate approaches but any help
> > > would be
> > > > > > >>>>>> greatly appreciated. Thanks.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > > >>>>>> mailinglists19@gmail.com<ma...@gmail.com>>
> > > wrote:
> > > > > > >>>>>>
> > > > > > >>>>>> In my structured streaming job I am updating Spark
> > > Accumulators in
> > > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > > try to
> > > > > print
> > > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > > >>>>>>         updateAcrossEvents
> > > > > > >>>>>>       )
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > > I've a
> > > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > > are
> > > > > ALWAYS
> > > > > > >>>>>> ZERO!
> > > > > > >>>>>>
> > > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > > could see
> > > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > > >>>>>>
> > > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > > mode
> > > > > it
> > > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > > distributed
> > > > > > >>>>>> correctly - or something like that!
> > > > > > >>>>>>
> > > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > > 'Stateful
> > > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > > in
> > > > > > >>>>>> SparkContext.
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > > > >>>>>>
> > > > >
> > >
> 
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> 

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Using Spark Accumulators with Structured Streaming

Posted by ZHANG Wei <we...@outlook.com>.
Yes, verified on the cluster with 5 executors.

-- 
Cheers,
-z

On Fri, 29 May 2020 11:16:12 -0700
Something Something <ma...@gmail.com> wrote:

> Did you try this on the Cluster? Note: This works just fine under 'Local'
> mode.
> 
> On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <we...@outlook.com> wrote:
> 
> > I can't reproduce the issue with my simple code:
> > ```scala
> >     spark.streams.addListener(new StreamingQueryListener {
> >       override def onQueryProgress(event:
> > StreamingQueryListener.QueryProgressEvent): Unit = {
> >         println(event.progress.id + " is on progress")
> >         println(s"My accu is ${myAcc.value} on query progress")
> >       }
> >         ...
> >     })
> >
> >     def mappingFunc(key: Long, values: Iterator[String], state:
> > GroupState[Long]): ... = {
> >       myAcc.add(1)
> >       println(s">>> key: $key => state: ${state}")
> >         ...
> >     }
> >
> >     val wordCounts = words
> >       .groupByKey(v => ...)
> >       .mapGroupsWithState(timeoutConf =
> > GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
> >
> >     val query = wordCounts.writeStream
> >       .outputMode(OutputMode.Update)
> >         ...
> > ```
> >
> > I'm wondering if there were any errors can be found from driver logs? The
> > micro-batch
> > exceptions won't terminate the streaming job running.
> >
> > For the following code, we have to make sure that `StateUpdateTask` is
> > started:
> > >                 .mapGroupsWithState(
> > >                         new
> > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > appConfig, accumulators),
> > >                         Encoders.bean(ModelStateInfo.class),
> > >                         Encoders.bean(ModelUpdate.class),
> > >                         GroupStateTimeout.ProcessingTimeTimeout());
> >
> > --
> > Cheers,
> > -z
> >
> > On Thu, 28 May 2020 19:59:31 +0530
> > Srinivas V <sr...@gmail.com> wrote:
> >
> > > Giving the code below:
> > > //accumulators is a class level variable in driver.
> > >
> > >  sparkSession.streams().addListener(new StreamingQueryListener() {
> > >             @Override
> > >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> > >                 logger.info("Query started: " + queryStarted.id());
> > >             }
> > >             @Override
> > >             public void onQueryTerminated(QueryTerminatedEvent
> > > queryTerminated) {
> > >                 logger.info("Query terminated: " +
> > queryTerminated.id());
> > >             }
> > >             @Override
> > >             public void onQueryProgress(QueryProgressEvent
> > queryProgress) {
> > >
> > > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> > >                 long eventsReceived = 0;
> > >                 long eventsExpired = 0;
> > >                 long eventSentSuccess = 0;
> > >                 try {
> > >                     eventsReceived =
> > > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> > >                     eventsExpired =
> > > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> > >                     eventSentSuccess =
> > > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> > >                 } catch (MissingKeyException e) {
> > >                     logger.error("Accumulator key not found due to
> > > Exception {}", e.getMessage());
> > >                 }
> > >                 logger.info("Events Received:{}", eventsReceived);
> > >                 logger.info("Events State Expired:{}", eventsExpired);
> > >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> > >                 logger.info("Query made progress - batchId: {}
> > > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > > durationMs:{}" ,
> > >                         queryProgress.progress().batchId(),
> > > queryProgress.progress().numInputRows(),
> > > queryProgress.progress().inputRowsPerSecond(),
> > >
> >  queryProgress.progress().processedRowsPerSecond(),
> > > queryProgress.progress().durationMs());
> > >
> > >
> > > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <we...@outlook.com> wrote:
> > >
> > > > May I get how the accumulator is accessed in the method
> > > > `onQueryProgress()`?
> > > >
> > > > AFAICT, the accumulator is incremented well. There is a way to verify
> > that
> > > > in cluster like this:
> > > > ```
> > > >     // Add the following while loop before invoking awaitTermination
> > > >     while (true) {
> > > >       println("My acc: " + myAcc.value)
> > > >       Thread.sleep(5 * 1000)
> > > >     }
> > > >
> > > >     //query.awaitTermination()
> > > > ```
> > > >
> > > > And the accumulator value updated can be found from driver stdout.
> > > >
> > > > --
> > > > Cheers,
> > > > -z
> > > >
> > > > On Thu, 28 May 2020 17:12:48 +0530
> > > > Srinivas V <sr...@gmail.com> wrote:
> > > >
> > > > > yes, I am using stateful structured streaming. Yes similar to what
> > you
> > > > do.
> > > > > This is in Java
> > > > > I do it this way:
> > > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > > >                 .groupByKey(
> > > > >                         (MapFunction<InputEventModel, String>) event
> > ->
> > > > > event.getId(), Encoders.STRING())
> > > > >                 .mapGroupsWithState(
> > > > >                         new
> > > > >
> > > >
> > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > > appConfig, accumulators),
> > > > >                         Encoders.bean(ModelStateInfo.class),
> > > > >                         Encoders.bean(ModelUpdate.class),
> > > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > > >
> > > > > StateUpdateTask contains the update method.
> > > > >
> > > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > > mailinglists19@gmail.com> wrote:
> > > > >
> > > > > > Yes, that's exactly how I am creating them.
> > > > > >
> > > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > > you've
> > > > > > something like this?
> > > > > >
> > > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > >         updateAcrossEvents
> > > > > >       )
> > > > > >
> > > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > > streaming applications it works as expected.
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com>
> > > > wrote:
> > > > > >
> > > > > >> Yes, I am talking about Application specific Accumulators.
> > Actually I
> > > > am
> > > > > >> getting the values printed in my driver log as well as sent to
> > > > Grafana. Not
> > > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> > a
> > > > yarn
> > > > > >> cluster(not local Mac) where I submit from master node. It should
> > > > work the
> > > > > >> same for cluster mode as well.
> > > > > >> Create accumulators like this:
> > > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > > >>
> > > > > >>
> > > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > > >> mailinglists19@gmail.com> wrote:
> > > > > >>
> > > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > > computed in
> > > > > >>> your code? I am talking about the Application Specific
> > Accumulators.
> > > > The
> > > > > >>> other standard counters such as
> > 'event.progress.inputRowsPerSecond'
> > > > are
> > > > > >>> getting populated correctly!
> > > > > >>>
> > > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com>
> > > > wrote:
> > > > > >>>
> > > > > >>>> Hello,
> > > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > > cluster.
> > > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > > values
> > > > > >>>> are coming there.
> > > > > >>>>
> > > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > > >>>> mailinglists19@gmail.com> wrote:
> > > > > >>>>
> > > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > > >>>>>
> > > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <wezhang@outlook.com
> > >
> > > > wrote:
> > > > > >>>>>
> > > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > > should
> > > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> > for
> > > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> > chance
> > > > to replace
> > > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > > LongAccumulator[3]
> > > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > > work?
> > > > > >>>>>>
> > > > > >>>>>> ---
> > > > > >>>>>> Cheers,
> > > > > >>>>>> -z
> > > > > >>>>>> [1]
> > > > > >>>>>>
> > > >
> > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860033353&amp;sdata=NPpiZC%2Bnx9rec6G35QvMDV1D3FgvD%2FnIct6OJ06I728%3D&amp;reserved=0
> > > > > >>>>>> [2]
> > > > > >>>>>>
> > > >
> > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=wMskE72per9Js0V7UHJ0qi4UzCEEYh%2Fk53fuP2e92mA%3D&amp;reserved=0
> > > > > >>>>>> [3]
> > > > > >>>>>>
> > > >
> > https://nam05.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Cf802f480bbab46ae07b308d803fc661f%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637263729860038343&amp;sdata=INgHzc0rc6jj7UapB%2FRLfCiGNWEBSKWfgmuJ2dUZ3eM%3D&amp;reserved=0
> > > > > >>>>>>
> > > > > >>>>>> ________________________________________
> > > > > >>>>>> From: Something Something <ma...@gmail.com>
> > > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > > >>>>>> To: spark-user
> > > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> > Streaming
> > > > > >>>>>>
> > > > > >>>>>> Can someone from Spark Development team tell me if this
> > > > functionality
> > > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > > can't get it
> > > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > > class that
> > > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> > one or
> > > > more
> > > > > >>>>>> accumulators. Here's the definition:
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> class CollectionLongAccumulator[T]
> > > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > > >>>>>>
> > > > > >>>>>> When the job begins we register an instance of this class:
> > > > > >>>>>>
> > > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > > >>>>>>
> > > > > >>>>>> Is this working under Structured Streaming?
> > > > > >>>>>>
> > > > > >>>>>> I will keep looking for alternate approaches but any help
> > would be
> > > > > >>>>>> greatly appreciated. Thanks.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > > >>>>>> mailinglists19@gmail.com<ma...@gmail.com>>
> > wrote:
> > > > > >>>>>>
> > > > > >>>>>> In my structured streaming job I am updating Spark
> > Accumulators in
> > > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> > try to
> > > > print
> > > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > > >>>>>>
> > > > > >>>>>>
> > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > > >>>>>>         updateAcrossEvents
> > > > > >>>>>>       )
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> > I've a
> > > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> > are
> > > > ALWAYS
> > > > > >>>>>> ZERO!
> > > > > >>>>>>
> > > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> > could see
> > > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > > >>>>>>
> > > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> > mode
> > > > it
> > > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > > distributed
> > > > > >>>>>> correctly - or something like that!
> > > > > >>>>>>
> > > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > > 'Stateful
> > > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> > in
> > > > > >>>>>> SparkContext.
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > > > >>>>>>
> > > >
> >

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
Did you try this on the Cluster? Note: This works just fine under 'Local'
mode.

On Thu, May 28, 2020 at 9:12 PM ZHANG Wei <we...@outlook.com> wrote:

> I can't reproduce the issue with my simple code:
> ```scala
>     spark.streams.addListener(new StreamingQueryListener {
>       override def onQueryProgress(event:
> StreamingQueryListener.QueryProgressEvent): Unit = {
>         println(event.progress.id + " is on progress")
>         println(s"My accu is ${myAcc.value} on query progress")
>       }
>         ...
>     })
>
>     def mappingFunc(key: Long, values: Iterator[String], state:
> GroupState[Long]): ... = {
>       myAcc.add(1)
>       println(s">>> key: $key => state: ${state}")
>         ...
>     }
>
>     val wordCounts = words
>       .groupByKey(v => ...)
>       .mapGroupsWithState(timeoutConf =
> GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)
>
>     val query = wordCounts.writeStream
>       .outputMode(OutputMode.Update)
>         ...
> ```
>
> I'm wondering if there were any errors can be found from driver logs? The
> micro-batch
> exceptions won't terminate the streaming job running.
>
> For the following code, we have to make sure that `StateUpdateTask` is
> started:
> >                 .mapGroupsWithState(
> >                         new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > appConfig, accumulators),
> >                         Encoders.bean(ModelStateInfo.class),
> >                         Encoders.bean(ModelUpdate.class),
> >                         GroupStateTimeout.ProcessingTimeTimeout());
>
> --
> Cheers,
> -z
>
> On Thu, 28 May 2020 19:59:31 +0530
> Srinivas V <sr...@gmail.com> wrote:
>
> > Giving the code below:
> > //accumulators is a class level variable in driver.
> >
> >  sparkSession.streams().addListener(new StreamingQueryListener() {
> >             @Override
> >             public void onQueryStarted(QueryStartedEvent queryStarted) {
> >                 logger.info("Query started: " + queryStarted.id());
> >             }
> >             @Override
> >             public void onQueryTerminated(QueryTerminatedEvent
> > queryTerminated) {
> >                 logger.info("Query terminated: " +
> queryTerminated.id());
> >             }
> >             @Override
> >             public void onQueryProgress(QueryProgressEvent
> queryProgress) {
> >
> > accumulators.eventsReceived(queryProgress.progress().numInputRows());
> >                 long eventsReceived = 0;
> >                 long eventsExpired = 0;
> >                 long eventSentSuccess = 0;
> >                 try {
> >                     eventsReceived =
> > accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
> >                     eventsExpired =
> > accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
> >                     eventSentSuccess =
> > accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
> >                 } catch (MissingKeyException e) {
> >                     logger.error("Accumulator key not found due to
> > Exception {}", e.getMessage());
> >                 }
> >                 logger.info("Events Received:{}", eventsReceived);
> >                 logger.info("Events State Expired:{}", eventsExpired);
> >                 logger.info("Events Sent Success:{}", eventSentSuccess);
> >                 logger.info("Query made progress - batchId: {}
> > numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> > durationMs:{}" ,
> >                         queryProgress.progress().batchId(),
> > queryProgress.progress().numInputRows(),
> > queryProgress.progress().inputRowsPerSecond(),
> >
>  queryProgress.progress().processedRowsPerSecond(),
> > queryProgress.progress().durationMs());
> >
> >
> > On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <we...@outlook.com> wrote:
> >
> > > May I get how the accumulator is accessed in the method
> > > `onQueryProgress()`?
> > >
> > > AFAICT, the accumulator is incremented well. There is a way to verify
> that
> > > in cluster like this:
> > > ```
> > >     // Add the following while loop before invoking awaitTermination
> > >     while (true) {
> > >       println("My acc: " + myAcc.value)
> > >       Thread.sleep(5 * 1000)
> > >     }
> > >
> > >     //query.awaitTermination()
> > > ```
> > >
> > > And the accumulator value updated can be found from driver stdout.
> > >
> > > --
> > > Cheers,
> > > -z
> > >
> > > On Thu, 28 May 2020 17:12:48 +0530
> > > Srinivas V <sr...@gmail.com> wrote:
> > >
> > > > yes, I am using stateful structured streaming. Yes similar to what
> you
> > > do.
> > > > This is in Java
> > > > I do it this way:
> > > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > > >                 .groupByKey(
> > > >                         (MapFunction<InputEventModel, String>) event
> ->
> > > > event.getId(), Encoders.STRING())
> > > >                 .mapGroupsWithState(
> > > >                         new
> > > >
> > >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > > appConfig, accumulators),
> > > >                         Encoders.bean(ModelStateInfo.class),
> > > >                         Encoders.bean(ModelUpdate.class),
> > > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > > >
> > > > StateUpdateTask contains the update method.
> > > >
> > > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > > mailinglists19@gmail.com> wrote:
> > > >
> > > > > Yes, that's exactly how I am creating them.
> > > > >
> > > > > Question... Are you using 'Stateful Structured Streaming' in which
> > > you've
> > > > > something like this?
> > > > >
> > > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > >         updateAcrossEvents
> > > > >       )
> > > > >
> > > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > > experiencing this only under 'Stateful Structured Streaming'. In other
> > > streaming applications it works as expected.
> > > > >
> > > > >
> > > > >
> > > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com>
> > > wrote:
> > > > >
> > > > >> Yes, I am talking about Application specific Accumulators.
> Actually I
> > > am
> > > > >> getting the values printed in my driver log as well as sent to
> > > Grafana. Not
> > > > >> sure where and when I saw 0 before. My deploy mode is “client” on
> a
> > > yarn
> > > > >> cluster(not local Mac) where I submit from master node. It should
> > > work the
> > > > >> same for cluster mode as well.
> > > > >> Create accumulators like this:
> > > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > > >>
> > > > >>
> > > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > > >> mailinglists19@gmail.com> wrote:
> > > > >>
> > > > >>> Hmm... how would they go to Graphana if they are not getting
> > > computed in
> > > > >>> your code? I am talking about the Application Specific
> Accumulators.
> > > The
> > > > >>> other standard counters such as
> 'event.progress.inputRowsPerSecond'
> > > are
> > > > >>> getting populated correctly!
> > > > >>>
> > > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com>
> > > wrote:
> > > > >>>
> > > > >>>> Hello,
> > > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > > cluster.
> > > > >>>> But one consolation is that when I send metrics to Graphana, the
> > > values
> > > > >>>> are coming there.
> > > > >>>>
> > > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > > >>>> mailinglists19@gmail.com> wrote:
> > > > >>>>
> > > > >>>>> No this is not working even if I use LongAccumulator.
> > > > >>>>>
> > > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <wezhang@outlook.com
> >
> > > wrote:
> > > > >>>>>
> > > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > > should
> > > > >>>>>> be atomic or thread safe. I'm wondering if the implementation
> for
> > > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any
> chance
> > > to replace
> > > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > > LongAccumulator[3]
> > > > >>>>>> and test if the StreamingListener and other codes are able to
> > > work?
> > > > >>>>>>
> > > > >>>>>> ---
> > > > >>>>>> Cheers,
> > > > >>>>>> -z
> > > > >>>>>> [1]
> > > > >>>>>>
> > >
> https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=fY6a%2FeGVVwFvwJKMP6v8yY9S%2FEaSVuyyB89s50lpJRc%3D&amp;reserved=0
> > > > >>>>>> [2]
> > > > >>>>>>
> > >
> https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=QuFZKdKLRWRahfheHS5Zvc9GWApj2QCsBmiSEmOYsdo%3D&amp;reserved=0
> > > > >>>>>> [3]
> > > > >>>>>>
> > >
> https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=3xFOiNXNSuJ8w%2FQY7th2qip16ykMjbjXFLN6NXxXGAo%3D&amp;reserved=0
> > > > >>>>>>
> > > > >>>>>> ________________________________________
> > > > >>>>>> From: Something Something <ma...@gmail.com>
> > > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > > >>>>>> To: spark-user
> > > > >>>>>> Subject: Re: Using Spark Accumulators with Structured
> Streaming
> > > > >>>>>>
> > > > >>>>>> Can someone from Spark Development team tell me if this
> > > functionality
> > > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > > can't get it
> > > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > > class that
> > > > >>>>>> extends from AccumulatorV2. In this class we keep track of
> one or
> > > more
> > > > >>>>>> accumulators. Here's the definition:
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> class CollectionLongAccumulator[T]
> > > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > > >>>>>>
> > > > >>>>>> When the job begins we register an instance of this class:
> > > > >>>>>>
> > > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > > >>>>>>
> > > > >>>>>> Is this working under Structured Streaming?
> > > > >>>>>>
> > > > >>>>>> I will keep looking for alternate approaches but any help
> would be
> > > > >>>>>> greatly appreciated. Thanks.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > > >>>>>> mailinglists19@gmail.com<ma...@gmail.com>>
> wrote:
> > > > >>>>>>
> > > > >>>>>> In my structured streaming job I am updating Spark
> Accumulators in
> > > > >>>>>> the updateAcrossEvents method but they are always 0 when I
> try to
> > > print
> > > > >>>>>> them in my StreamingListener. Here's the code:
> > > > >>>>>>
> > > > >>>>>>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > > >>>>>>         updateAcrossEvents
> > > > >>>>>>       )
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'.
> I've a
> > > > >>>>>> StreamingListener which writes values of the accumulators in
> > > > >>>>>> 'onQueryProgress' method but in this method the Accumulators
> are
> > > ALWAYS
> > > > >>>>>> ZERO!
> > > > >>>>>>
> > > > >>>>>> When I added log statements in the updateAcrossEvents, I
> could see
> > > > >>>>>> that these accumulators are getting incremented as expected.
> > > > >>>>>>
> > > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local
> mode
> > > it
> > > > >>>>>> works fine which implies that the Accumulators are not getting
> > > distributed
> > > > >>>>>> correctly - or something like that!
> > > > >>>>>>
> > > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > > >>>>>> perform an "Action". That's not a solution here. This is a
> > > 'Stateful
> > > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them
> in
> > > > >>>>>> SparkContext.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>
> > >
>

Re: Using Spark Accumulators with Structured Streaming

Posted by ZHANG Wei <we...@outlook.com>.
I can't reproduce the issue with my simple code:
```scala
    spark.streams.addListener(new StreamingQueryListener {
      override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
        println(event.progress.id + " is on progress")
        println(s"My accu is ${myAcc.value} on query progress")
      }
	...
    })

    def mappingFunc(key: Long, values: Iterator[String], state: GroupState[Long]): ... = {
      myAcc.add(1)
      println(s">>> key: $key => state: ${state}")
	...
    }

    val wordCounts = words
      .groupByKey(v => ...)
      .mapGroupsWithState(timeoutConf = GroupStateTimeout.ProcessingTimeTimeout)(func = mappingFunc)

    val query = wordCounts.writeStream
      .outputMode(OutputMode.Update)
	...
```

I'm wondering if there were any errors can be found from driver logs? The micro-batch
exceptions won't terminate the streaming job running.

For the following code, we have to make sure that `StateUpdateTask` is started:
>                 .mapGroupsWithState(
>                         new StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
>                         Encoders.bean(ModelStateInfo.class),
>                         Encoders.bean(ModelUpdate.class),
>                         GroupStateTimeout.ProcessingTimeTimeout());

-- 
Cheers,
-z

On Thu, 28 May 2020 19:59:31 +0530
Srinivas V <sr...@gmail.com> wrote:

> Giving the code below:
> //accumulators is a class level variable in driver.
> 
>  sparkSession.streams().addListener(new StreamingQueryListener() {
>             @Override
>             public void onQueryStarted(QueryStartedEvent queryStarted) {
>                 logger.info("Query started: " + queryStarted.id());
>             }
>             @Override
>             public void onQueryTerminated(QueryTerminatedEvent
> queryTerminated) {
>                 logger.info("Query terminated: " + queryTerminated.id());
>             }
>             @Override
>             public void onQueryProgress(QueryProgressEvent queryProgress) {
> 
> accumulators.eventsReceived(queryProgress.progress().numInputRows());
>                 long eventsReceived = 0;
>                 long eventsExpired = 0;
>                 long eventSentSuccess = 0;
>                 try {
>                     eventsReceived =
> accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
>                     eventsExpired =
> accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
>                     eventSentSuccess =
> accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
>                 } catch (MissingKeyException e) {
>                     logger.error("Accumulator key not found due to
> Exception {}", e.getMessage());
>                 }
>                 logger.info("Events Received:{}", eventsReceived);
>                 logger.info("Events State Expired:{}", eventsExpired);
>                 logger.info("Events Sent Success:{}", eventSentSuccess);
>                 logger.info("Query made progress - batchId: {}
> numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
> durationMs:{}" ,
>                         queryProgress.progress().batchId(),
> queryProgress.progress().numInputRows(),
> queryProgress.progress().inputRowsPerSecond(),
>                         queryProgress.progress().processedRowsPerSecond(),
> queryProgress.progress().durationMs());
> 
> 
> On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <we...@outlook.com> wrote:
> 
> > May I get how the accumulator is accessed in the method
> > `onQueryProgress()`?
> >
> > AFAICT, the accumulator is incremented well. There is a way to verify that
> > in cluster like this:
> > ```
> >     // Add the following while loop before invoking awaitTermination
> >     while (true) {
> >       println("My acc: " + myAcc.value)
> >       Thread.sleep(5 * 1000)
> >     }
> >
> >     //query.awaitTermination()
> > ```
> >
> > And the accumulator value updated can be found from driver stdout.
> >
> > --
> > Cheers,
> > -z
> >
> > On Thu, 28 May 2020 17:12:48 +0530
> > Srinivas V <sr...@gmail.com> wrote:
> >
> > > yes, I am using stateful structured streaming. Yes similar to what you
> > do.
> > > This is in Java
> > > I do it this way:
> > >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> > >                 .groupByKey(
> > >                         (MapFunction<InputEventModel, String>) event ->
> > > event.getId(), Encoders.STRING())
> > >                 .mapGroupsWithState(
> > >                         new
> > >
> > StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > > appConfig, accumulators),
> > >                         Encoders.bean(ModelStateInfo.class),
> > >                         Encoders.bean(ModelUpdate.class),
> > >                         GroupStateTimeout.ProcessingTimeTimeout());
> > >
> > > StateUpdateTask contains the update method.
> > >
> > > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > > mailinglists19@gmail.com> wrote:
> > >
> > > > Yes, that's exactly how I am creating them.
> > > >
> > > > Question... Are you using 'Stateful Structured Streaming' in which
> > you've
> > > > something like this?
> > > >
> > > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > >         updateAcrossEvents
> > > >       )
> > > >
> > > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> > experiencing this only under 'Stateful Structured Streaming'. In other
> > streaming applications it works as expected.
> > > >
> > > >
> > > >
> > > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com>
> > wrote:
> > > >
> > > >> Yes, I am talking about Application specific Accumulators. Actually I
> > am
> > > >> getting the values printed in my driver log as well as sent to
> > Grafana. Not
> > > >> sure where and when I saw 0 before. My deploy mode is “client” on a
> > yarn
> > > >> cluster(not local Mac) where I submit from master node. It should
> > work the
> > > >> same for cluster mode as well.
> > > >> Create accumulators like this:
> > > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > > >>
> > > >>
> > > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > > >> mailinglists19@gmail.com> wrote:
> > > >>
> > > >>> Hmm... how would they go to Graphana if they are not getting
> > computed in
> > > >>> your code? I am talking about the Application Specific Accumulators.
> > The
> > > >>> other standard counters such as 'event.progress.inputRowsPerSecond'
> > are
> > > >>> getting populated correctly!
> > > >>>
> > > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com>
> > wrote:
> > > >>>
> > > >>>> Hello,
> > > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> > cluster.
> > > >>>> But one consolation is that when I send metrics to Graphana, the
> > values
> > > >>>> are coming there.
> > > >>>>
> > > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > > >>>> mailinglists19@gmail.com> wrote:
> > > >>>>
> > > >>>>> No this is not working even if I use LongAccumulator.
> > > >>>>>
> > > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <we...@outlook.com>
> > wrote:
> > > >>>>>
> > > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> > should
> > > >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> > > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance
> > to replace
> > > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> > LongAccumulator[3]
> > > >>>>>> and test if the StreamingListener and other codes are able to
> > work?
> > > >>>>>>
> > > >>>>>> ---
> > > >>>>>> Cheers,
> > > >>>>>> -z
> > > >>>>>> [1]
> > > >>>>>>
> > https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=fY6a%2FeGVVwFvwJKMP6v8yY9S%2FEaSVuyyB89s50lpJRc%3D&amp;reserved=0
> > > >>>>>> [2]
> > > >>>>>>
> > https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=QuFZKdKLRWRahfheHS5Zvc9GWApj2QCsBmiSEmOYsdo%3D&amp;reserved=0
> > > >>>>>> [3]
> > > >>>>>>
> > https://nam01.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7C3d67f4e536ab422670f008d80313920c%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262729866357123&amp;sdata=3xFOiNXNSuJ8w%2FQY7th2qip16ykMjbjXFLN6NXxXGAo%3D&amp;reserved=0
> > > >>>>>>
> > > >>>>>> ________________________________________
> > > >>>>>> From: Something Something <ma...@gmail.com>
> > > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > > >>>>>> To: spark-user
> > > >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
> > > >>>>>>
> > > >>>>>> Can someone from Spark Development team tell me if this
> > functionality
> > > >>>>>> is supported and tested? I've spent a lot of time on this but
> > can't get it
> > > >>>>>> to work. Just to add more context, we've our own Accumulator
> > class that
> > > >>>>>> extends from AccumulatorV2. In this class we keep track of one or
> > more
> > > >>>>>> accumulators. Here's the definition:
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> class CollectionLongAccumulator[T]
> > > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > > >>>>>>
> > > >>>>>> When the job begins we register an instance of this class:
> > > >>>>>>
> > > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > > >>>>>>
> > > >>>>>> Is this working under Structured Streaming?
> > > >>>>>>
> > > >>>>>> I will keep looking for alternate approaches but any help would be
> > > >>>>>> greatly appreciated. Thanks.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > > >>>>>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
> > > >>>>>>
> > > >>>>>> In my structured streaming job I am updating Spark Accumulators in
> > > >>>>>> the updateAcrossEvents method but they are always 0 when I try to
> > print
> > > >>>>>> them in my StreamingListener. Here's the code:
> > > >>>>>>
> > > >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > > >>>>>>         updateAcrossEvents
> > > >>>>>>       )
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
> > > >>>>>> StreamingListener which writes values of the accumulators in
> > > >>>>>> 'onQueryProgress' method but in this method the Accumulators are
> > ALWAYS
> > > >>>>>> ZERO!
> > > >>>>>>
> > > >>>>>> When I added log statements in the updateAcrossEvents, I could see
> > > >>>>>> that these accumulators are getting incremented as expected.
> > > >>>>>>
> > > >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
> > it
> > > >>>>>> works fine which implies that the Accumulators are not getting
> > distributed
> > > >>>>>> correctly - or something like that!
> > > >>>>>>
> > > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > > >>>>>> perform an "Action". That's not a solution here. This is a
> > 'Stateful
> > > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
> > > >>>>>> SparkContext.
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>>>>
> >

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Using Spark Accumulators with Structured Streaming

Posted by Srinivas V <sr...@gmail.com>.
Giving the code below:
//accumulators is a class level variable in driver.

 sparkSession.streams().addListener(new StreamingQueryListener() {
            @Override
            public void onQueryStarted(QueryStartedEvent queryStarted) {
                logger.info("Query started: " + queryStarted.id());
            }
            @Override
            public void onQueryTerminated(QueryTerminatedEvent
queryTerminated) {
                logger.info("Query terminated: " + queryTerminated.id());
            }
            @Override
            public void onQueryProgress(QueryProgressEvent queryProgress) {

accumulators.eventsReceived(queryProgress.progress().numInputRows());
                long eventsReceived = 0;
                long eventsExpired = 0;
                long eventSentSuccess = 0;
                try {
                    eventsReceived =
accumulators.getLong(InstrumentationCounters.EVENTS_RECEIVED);
                    eventsExpired =
accumulators.getLong(InstrumentationCounters.EVENTS_STATE_EXPIRED);
                    eventSentSuccess =
accumulators.getLong(InstrumentationCounters.EVENTS_SENT);
                } catch (MissingKeyException e) {
                    logger.error("Accumulator key not found due to
Exception {}", e.getMessage());
                }
                logger.info("Events Received:{}", eventsReceived);
                logger.info("Events State Expired:{}", eventsExpired);
                logger.info("Events Sent Success:{}", eventSentSuccess);
                logger.info("Query made progress - batchId: {}
numInputRows:{} inputRowsPerSecond:{} processedRowsPerSecond:{}
durationMs:{}" ,
                        queryProgress.progress().batchId(),
queryProgress.progress().numInputRows(),
queryProgress.progress().inputRowsPerSecond(),
                        queryProgress.progress().processedRowsPerSecond(),
queryProgress.progress().durationMs());


On Thu, May 28, 2020 at 7:04 PM ZHANG Wei <we...@outlook.com> wrote:

> May I get how the accumulator is accessed in the method
> `onQueryProgress()`?
>
> AFAICT, the accumulator is incremented well. There is a way to verify that
> in cluster like this:
> ```
>     // Add the following while loop before invoking awaitTermination
>     while (true) {
>       println("My acc: " + myAcc.value)
>       Thread.sleep(5 * 1000)
>     }
>
>     //query.awaitTermination()
> ```
>
> And the accumulator value updated can be found from driver stdout.
>
> --
> Cheers,
> -z
>
> On Thu, 28 May 2020 17:12:48 +0530
> Srinivas V <sr...@gmail.com> wrote:
>
> > yes, I am using stateful structured streaming. Yes similar to what you
> do.
> > This is in Java
> > I do it this way:
> >     Dataset<ModelUpdate> productUpdates = watermarkedDS
> >                 .groupByKey(
> >                         (MapFunction<InputEventModel, String>) event ->
> > event.getId(), Encoders.STRING())
> >                 .mapGroupsWithState(
> >                         new
> >
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> > appConfig, accumulators),
> >                         Encoders.bean(ModelStateInfo.class),
> >                         Encoders.bean(ModelUpdate.class),
> >                         GroupStateTimeout.ProcessingTimeTimeout());
> >
> > StateUpdateTask contains the update method.
> >
> > On Thu, May 28, 2020 at 4:41 AM Something Something <
> > mailinglists19@gmail.com> wrote:
> >
> > > Yes, that's exactly how I am creating them.
> > >
> > > Question... Are you using 'Stateful Structured Streaming' in which
> you've
> > > something like this?
> > >
> > > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > >         updateAcrossEvents
> > >       )
> > >
> > > And updating the Accumulator inside 'updateAcrossEvents'? We're
> experiencing this only under 'Stateful Structured Streaming'. In other
> streaming applications it works as expected.
> > >
> > >
> > >
> > > On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com>
> wrote:
> > >
> > >> Yes, I am talking about Application specific Accumulators. Actually I
> am
> > >> getting the values printed in my driver log as well as sent to
> Grafana. Not
> > >> sure where and when I saw 0 before. My deploy mode is “client” on a
> yarn
> > >> cluster(not local Mac) where I submit from master node. It should
> work the
> > >> same for cluster mode as well.
> > >> Create accumulators like this:
> > >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> > >>
> > >>
> > >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> > >> mailinglists19@gmail.com> wrote:
> > >>
> > >>> Hmm... how would they go to Graphana if they are not getting
> computed in
> > >>> your code? I am talking about the Application Specific Accumulators.
> The
> > >>> other standard counters such as 'event.progress.inputRowsPerSecond'
> are
> > >>> getting populated correctly!
> > >>>
> > >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com>
> wrote:
> > >>>
> > >>>> Hello,
> > >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> > >>>> LongAccumulator as well. Yes, it prints on my local but not on
> cluster.
> > >>>> But one consolation is that when I send metrics to Graphana, the
> values
> > >>>> are coming there.
> > >>>>
> > >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> > >>>> mailinglists19@gmail.com> wrote:
> > >>>>
> > >>>>> No this is not working even if I use LongAccumulator.
> > >>>>>
> > >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <we...@outlook.com>
> wrote:
> > >>>>>
> > >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
> should
> > >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> > >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance
> to replace
> > >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or
> LongAccumulator[3]
> > >>>>>> and test if the StreamingListener and other codes are able to
> work?
> > >>>>>>
> > >>>>>> ---
> > >>>>>> Cheers,
> > >>>>>> -z
> > >>>>>> [1]
> > >>>>>>
> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3D&amp;reserved=0
> > >>>>>> [2]
> > >>>>>>
> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3D&amp;reserved=0
> > >>>>>> [3]
> > >>>>>>
> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3D&amp;reserved=0
> > >>>>>>
> > >>>>>> ________________________________________
> > >>>>>> From: Something Something <ma...@gmail.com>
> > >>>>>> Sent: Saturday, May 16, 2020 0:38
> > >>>>>> To: spark-user
> > >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
> > >>>>>>
> > >>>>>> Can someone from Spark Development team tell me if this
> functionality
> > >>>>>> is supported and tested? I've spent a lot of time on this but
> can't get it
> > >>>>>> to work. Just to add more context, we've our own Accumulator
> class that
> > >>>>>> extends from AccumulatorV2. In this class we keep track of one or
> more
> > >>>>>> accumulators. Here's the definition:
> > >>>>>>
> > >>>>>>
> > >>>>>> class CollectionLongAccumulator[T]
> > >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> > >>>>>>
> > >>>>>> When the job begins we register an instance of this class:
> > >>>>>>
> > >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> > >>>>>>
> > >>>>>> Is this working under Structured Streaming?
> > >>>>>>
> > >>>>>> I will keep looking for alternate approaches but any help would be
> > >>>>>> greatly appreciated. Thanks.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> > >>>>>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
> > >>>>>>
> > >>>>>> In my structured streaming job I am updating Spark Accumulators in
> > >>>>>> the updateAcrossEvents method but they are always 0 when I try to
> print
> > >>>>>> them in my StreamingListener. Here's the code:
> > >>>>>>
> > >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> > >>>>>>         updateAcrossEvents
> > >>>>>>       )
> > >>>>>>
> > >>>>>>
> > >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
> > >>>>>> StreamingListener which writes values of the accumulators in
> > >>>>>> 'onQueryProgress' method but in this method the Accumulators are
> ALWAYS
> > >>>>>> ZERO!
> > >>>>>>
> > >>>>>> When I added log statements in the updateAcrossEvents, I could see
> > >>>>>> that these accumulators are getting incremented as expected.
> > >>>>>>
> > >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
> it
> > >>>>>> works fine which implies that the Accumulators are not getting
> distributed
> > >>>>>> correctly - or something like that!
> > >>>>>>
> > >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> > >>>>>> perform an "Action". That's not a solution here. This is a
> 'Stateful
> > >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
> > >>>>>> SparkContext.
> > >>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>>
>

Re: Using Spark Accumulators with Structured Streaming

Posted by ZHANG Wei <we...@outlook.com>.
May I get how the accumulator is accessed in the method `onQueryProgress()`?

AFAICT, the accumulator is incremented well. There is a way to verify that
in cluster like this:
```
    // Add the following while loop before invoking awaitTermination
    while (true) {
      println("My acc: " + myAcc.value)
      Thread.sleep(5 * 1000)
    }

    //query.awaitTermination()
```

And the accumulator value updated can be found from driver stdout.

-- 
Cheers,
-z

On Thu, 28 May 2020 17:12:48 +0530
Srinivas V <sr...@gmail.com> wrote:

> yes, I am using stateful structured streaming. Yes similar to what you do.
> This is in Java
> I do it this way:
>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>                 .groupByKey(
>                         (MapFunction<InputEventModel, String>) event ->
> event.getId(), Encoders.STRING())
>                 .mapGroupsWithState(
>                         new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
>                         Encoders.bean(ModelStateInfo.class),
>                         Encoders.bean(ModelUpdate.class),
>                         GroupStateTimeout.ProcessingTimeTimeout());
> 
> StateUpdateTask contains the update method.
> 
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglists19@gmail.com> wrote:
> 
> > Yes, that's exactly how I am creating them.
> >
> > Question... Are you using 'Stateful Structured Streaming' in which you've
> > something like this?
> >
> > .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> >         updateAcrossEvents
> >       )
> >
> > And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.
> >
> >
> >
> > On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com> wrote:
> >
> >> Yes, I am talking about Application specific Accumulators. Actually I am
> >> getting the values printed in my driver log as well as sent to Grafana. Not
> >> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
> >> cluster(not local Mac) where I submit from master node. It should work the
> >> same for cluster mode as well.
> >> Create accumulators like this:
> >> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
> >>
> >>
> >> On Tue, May 26, 2020 at 8:42 PM Something Something <
> >> mailinglists19@gmail.com> wrote:
> >>
> >>> Hmm... how would they go to Graphana if they are not getting computed in
> >>> your code? I am talking about the Application Specific Accumulators. The
> >>> other standard counters such as 'event.progress.inputRowsPerSecond' are
> >>> getting populated correctly!
> >>>
> >>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com> wrote:
> >>>
> >>>> Hello,
> >>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
> >>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> >>>> But one consolation is that when I send metrics to Graphana, the values
> >>>> are coming there.
> >>>>
> >>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
> >>>> mailinglists19@gmail.com> wrote:
> >>>>
> >>>>> No this is not working even if I use LongAccumulator.
> >>>>>
> >>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <we...@outlook.com> wrote:
> >>>>>
> >>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
> >>>>>> be atomic or thread safe. I'm wondering if the implementation for
> >>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
> >>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
> >>>>>> and test if the StreamingListener and other codes are able to work?
> >>>>>>
> >>>>>> ---
> >>>>>> Cheers,
> >>>>>> -z
> >>>>>> [1]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.AccumulatorV2&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=73AxOzjhvImCuhXPoMN%2Bm7%2BY3KYwwaoCvmYMoOEGDtU%3D&amp;reserved=0
> >>>>>> [2]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.CollectionAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=BY%2BtYoheicPCByUh2YWlmezHhg9ruKIDlndKQD06N%2FM%3D&amp;reserved=0
> >>>>>> [3]
> >>>>>> https://eur06.safelinks.protection.outlook.com/?url=http%3A%2F%2Fspark.apache.org%2Fdocs%2Flatest%2Fapi%2Fscala%2Findex.html%23org.apache.spark.util.LongAccumulator&amp;data=02%7C01%7C%7Ce9cd79340511422f368608d802fc468d%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637262629816034378&amp;sdata=IosZ%2Fs2CclFuHT8nL8btCU8Geh2%2FjV94DtwxEEoN8F8%3D&amp;reserved=0
> >>>>>>
> >>>>>> ________________________________________
> >>>>>> From: Something Something <ma...@gmail.com>
> >>>>>> Sent: Saturday, May 16, 2020 0:38
> >>>>>> To: spark-user
> >>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
> >>>>>>
> >>>>>> Can someone from Spark Development team tell me if this functionality
> >>>>>> is supported and tested? I've spent a lot of time on this but can't get it
> >>>>>> to work. Just to add more context, we've our own Accumulator class that
> >>>>>> extends from AccumulatorV2. In this class we keep track of one or more
> >>>>>> accumulators. Here's the definition:
> >>>>>>
> >>>>>>
> >>>>>> class CollectionLongAccumulator[T]
> >>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
> >>>>>>
> >>>>>> When the job begins we register an instance of this class:
> >>>>>>
> >>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
> >>>>>>
> >>>>>> Is this working under Structured Streaming?
> >>>>>>
> >>>>>> I will keep looking for alternate approaches but any help would be
> >>>>>> greatly appreciated. Thanks.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
> >>>>>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
> >>>>>>
> >>>>>> In my structured streaming job I am updating Spark Accumulators in
> >>>>>> the updateAcrossEvents method but they are always 0 when I try to print
> >>>>>> them in my StreamingListener. Here's the code:
> >>>>>>
> >>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
> >>>>>>         updateAcrossEvents
> >>>>>>       )
> >>>>>>
> >>>>>>
> >>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
> >>>>>> StreamingListener which writes values of the accumulators in
> >>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> >>>>>> ZERO!
> >>>>>>
> >>>>>> When I added log statements in the updateAcrossEvents, I could see
> >>>>>> that these accumulators are getting incremented as expected.
> >>>>>>
> >>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
> >>>>>> works fine which implies that the Accumulators are not getting distributed
> >>>>>> correctly - or something like that!
> >>>>>>
> >>>>>> Note: I've seen quite a few answers on the Web that tell me to
> >>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
> >>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
> >>>>>> SparkContext.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Using Spark Accumulators with Structured Streaming

Posted by Srinivas V <sr...@gmail.com>.
It’s in constructor

On Sat, May 30, 2020 at 4:15 AM Something Something <
mailinglists19@gmail.com> wrote:

> I mean... I don't see any reference to 'accumulator' in your Class
> *definition*. How can you access it in the class if it's not in your
> definition of class:
>
> public class StateUpdateTask implements MapGroupsWithStateFunction<*String,
> InputEventModel, ModelStateInfo, ModelUpdate*> {.  *--> I was expecting
> to see 'accumulator' here in the definition.*
>
>     @Override
>     public ModelUpdate call(String productId, Iterator<InputEventModel>
> eventsIterator, GroupState<ModelStateInfo> state) {
>     }
> }
>
> On Fri, May 29, 2020 at 1:08 PM Srinivas V <sr...@gmail.com> wrote:
>
>>
>> Yes, accumulators are updated in the call method of StateUpdateTask. Like
>> when state times out or when the data is pushed to next Kafka topic etc.
>>
>> On Fri, May 29, 2020 at 11:55 PM Something Something <
>> mailinglists19@gmail.com> wrote:
>>
>>> Thanks! I will take a look at the link. Just one question, you seem to
>>> be passing 'accumulators' in the constructor but where do you use it in the
>>> StateUpdateTask class? I am still missing that connection. Sorry, if my
>>> question is dumb. I must be missing something. Thanks for your help so far.
>>> It's been useful.
>>>
>>>
>>> On Fri, May 29, 2020 at 6:51 AM Srinivas V <sr...@gmail.com> wrote:
>>>
>>>> Yes it is application specific class. This is how java Spark Functions
>>>> work.
>>>> You can refer to this code in the documentation:
>>>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>>>>
>>>> public class StateUpdateTask implements
>>>> MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo,
>>>> ModelUpdate> {
>>>>
>>>>     @Override
>>>>     public ModelUpdate call(String productId, Iterator<InputEventModel>
>>>> eventsIterator, GroupState<ModelStateInfo> state) {
>>>>     }
>>>> }
>>>>
>>>> On Thu, May 28, 2020 at 10:59 PM Something Something <
>>>> mailinglists19@gmail.com> wrote:
>>>>
>>>>> I am assuming StateUpdateTask is your application specific class. Does
>>>>> it have 'updateState' method or something? I googled but couldn't find any
>>>>> documentation about doing it this way. Can you please direct me to some
>>>>> documentation. Thanks.
>>>>>
>>>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V <sr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> yes, I am using stateful structured streaming. Yes similar to what
>>>>>> you do. This is in Java
>>>>>> I do it this way:
>>>>>>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>>>>                 .groupByKey(
>>>>>>                         (MapFunction<InputEventModel, String>) event
>>>>>> -> event.getId(), Encoders.STRING())
>>>>>>                 .mapGroupsWithState(
>>>>>>                         new
>>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>>> appConfig, accumulators),
>>>>>>                         Encoders.bean(ModelStateInfo.class),
>>>>>>                         Encoders.bean(ModelUpdate.class),
>>>>>>                         GroupStateTimeout.ProcessingTimeTimeout());
>>>>>>
>>>>>> StateUpdateTask contains the update method.
>>>>>>
>>>>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>
>>>>>>> Yes, that's exactly how I am creating them.
>>>>>>>
>>>>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>>>>> you've something like this?
>>>>>>>
>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>         updateAcrossEvents
>>>>>>>       )
>>>>>>>
>>>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Yes, I am talking about Application specific Accumulators. Actually
>>>>>>>> I am getting the values printed in my driver log as well as sent to
>>>>>>>> Grafana. Not sure where and when I saw 0 before. My deploy mode is “client”
>>>>>>>> on a yarn cluster(not local Mac) where I submit from master node. It should
>>>>>>>> work the same for cluster mode as well.
>>>>>>>> Create accumulators like this:
>>>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hmm... how would they go to Graphana if they are not getting
>>>>>>>>> computed in your code? I am talking about the Application Specific
>>>>>>>>> Accumulators. The other standard counters such as
>>>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>>>>>>>>>
>>>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hello,
>>>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>>>>> values are coming there.
>>>>>>>>>>
>>>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>>>>
>>>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation for
>>>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>>>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>>>>
>>>>>>>>>>>> ---
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> -z
>>>>>>>>>>>> [1]
>>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>>>>> [2]
>>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>>>>> [3]
>>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>>>>
>>>>>>>>>>>> ________________________________________
>>>>>>>>>>>> From: Something Something <ma...@gmail.com>
>>>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>>>>> To: spark-user
>>>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>>>>
>>>>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on this but
>>>>>>>>>>>> can't get it to work. Just to add more context, we've our own Accumulator
>>>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track of one
>>>>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>>>>
>>>>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>>>>
>>>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>>>>
>>>>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>>>>
>>>>>>>>>>>> I will keep looking for alternate approaches but any help would
>>>>>>>>>>>> be greatly appreciated. Thanks.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>>>>> mailinglists19@gmail.com<ma...@gmail.com>>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators
>>>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try to print
>>>>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>>>>
>>>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>>>>         updateAcrossEvents
>>>>>>>>>>>>       )
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've
>>>>>>>>>>>> a StreamingListener which writes values of the accumulators in
>>>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>>>>>>>>> ZERO!
>>>>>>>>>>>>
>>>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could
>>>>>>>>>>>> see that these accumulators are getting incremented as expected.
>>>>>>>>>>>>
>>>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local
>>>>>>>>>>>> mode it works fine which implies that the Accumulators are not getting
>>>>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>>>>
>>>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>>>>> SparkContext.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
I mean... I don't see any reference to 'accumulator' in your Class
*definition*. How can you access it in the class if it's not in your
definition of class:

public class StateUpdateTask implements MapGroupsWithStateFunction<*String,
InputEventModel, ModelStateInfo, ModelUpdate*> {.  *--> I was expecting to
see 'accumulator' here in the definition.*

    @Override
    public ModelUpdate call(String productId, Iterator<InputEventModel>
eventsIterator, GroupState<ModelStateInfo> state) {
    }
}

On Fri, May 29, 2020 at 1:08 PM Srinivas V <sr...@gmail.com> wrote:

>
> Yes, accumulators are updated in the call method of StateUpdateTask. Like
> when state times out or when the data is pushed to next Kafka topic etc.
>
> On Fri, May 29, 2020 at 11:55 PM Something Something <
> mailinglists19@gmail.com> wrote:
>
>> Thanks! I will take a look at the link. Just one question, you seem to be
>> passing 'accumulators' in the constructor but where do you use it in the
>> StateUpdateTask class? I am still missing that connection. Sorry, if my
>> question is dumb. I must be missing something. Thanks for your help so far.
>> It's been useful.
>>
>>
>> On Fri, May 29, 2020 at 6:51 AM Srinivas V <sr...@gmail.com> wrote:
>>
>>> Yes it is application specific class. This is how java Spark Functions
>>> work.
>>> You can refer to this code in the documentation:
>>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>>>
>>> public class StateUpdateTask implements
>>> MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo,
>>> ModelUpdate> {
>>>
>>>     @Override
>>>     public ModelUpdate call(String productId, Iterator<InputEventModel>
>>> eventsIterator, GroupState<ModelStateInfo> state) {
>>>     }
>>> }
>>>
>>> On Thu, May 28, 2020 at 10:59 PM Something Something <
>>> mailinglists19@gmail.com> wrote:
>>>
>>>> I am assuming StateUpdateTask is your application specific class. Does
>>>> it have 'updateState' method or something? I googled but couldn't find any
>>>> documentation about doing it this way. Can you please direct me to some
>>>> documentation. Thanks.
>>>>
>>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V <sr...@gmail.com> wrote:
>>>>
>>>>> yes, I am using stateful structured streaming. Yes similar to what you
>>>>> do. This is in Java
>>>>> I do it this way:
>>>>>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>>>                 .groupByKey(
>>>>>                         (MapFunction<InputEventModel, String>) event
>>>>> -> event.getId(), Encoders.STRING())
>>>>>                 .mapGroupsWithState(
>>>>>                         new
>>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>>> appConfig, accumulators),
>>>>>                         Encoders.bean(ModelStateInfo.class),
>>>>>                         Encoders.bean(ModelUpdate.class),
>>>>>                         GroupStateTimeout.ProcessingTimeTimeout());
>>>>>
>>>>> StateUpdateTask contains the update method.
>>>>>
>>>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>>> mailinglists19@gmail.com> wrote:
>>>>>
>>>>>> Yes, that's exactly how I am creating them.
>>>>>>
>>>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>>>> you've something like this?
>>>>>>
>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>         updateAcrossEvents
>>>>>>       )
>>>>>>
>>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Yes, I am talking about Application specific Accumulators. Actually
>>>>>>> I am getting the values printed in my driver log as well as sent to
>>>>>>> Grafana. Not sure where and when I saw 0 before. My deploy mode is “client”
>>>>>>> on a yarn cluster(not local Mac) where I submit from master node. It should
>>>>>>> work the same for cluster mode as well.
>>>>>>> Create accumulators like this:
>>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hmm... how would they go to Graphana if they are not getting
>>>>>>>> computed in your code? I am talking about the Application Specific
>>>>>>>> Accumulators. The other standard counters such as
>>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>>>>>>>>
>>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hello,
>>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>>>> values are coming there.
>>>>>>>>>
>>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>>>
>>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation for
>>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>>>
>>>>>>>>>>> ---
>>>>>>>>>>> Cheers,
>>>>>>>>>>> -z
>>>>>>>>>>> [1]
>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>>>> [2]
>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>>>> [3]
>>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>>>
>>>>>>>>>>> ________________________________________
>>>>>>>>>>> From: Something Something <ma...@gmail.com>
>>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>>>> To: spark-user
>>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>>>
>>>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on this but
>>>>>>>>>>> can't get it to work. Just to add more context, we've our own Accumulator
>>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track of one
>>>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>>>
>>>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>>>
>>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>>>
>>>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>>>
>>>>>>>>>>> I will keep looking for alternate approaches but any help would
>>>>>>>>>>> be greatly appreciated. Thanks.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>>>> mailinglists19@gmail.com<ma...@gmail.com>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators
>>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try to print
>>>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>>>
>>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>>>         updateAcrossEvents
>>>>>>>>>>>       )
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>>>>>>>> ZERO!
>>>>>>>>>>>
>>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could
>>>>>>>>>>> see that these accumulators are getting incremented as expected.
>>>>>>>>>>>
>>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local
>>>>>>>>>>> mode it works fine which implies that the Accumulators are not getting
>>>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>>>
>>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>>>> SparkContext.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Srinivas V <sr...@gmail.com>.
Yes, accumulators are updated in the call method of StateUpdateTask. Like
when state times out or when the data is pushed to next Kafka topic etc.

On Fri, May 29, 2020 at 11:55 PM Something Something <
mailinglists19@gmail.com> wrote:

> Thanks! I will take a look at the link. Just one question, you seem to be
> passing 'accumulators' in the constructor but where do you use it in the
> StateUpdateTask class? I am still missing that connection. Sorry, if my
> question is dumb. I must be missing something. Thanks for your help so far.
> It's been useful.
>
>
> On Fri, May 29, 2020 at 6:51 AM Srinivas V <sr...@gmail.com> wrote:
>
>> Yes it is application specific class. This is how java Spark Functions
>> work.
>> You can refer to this code in the documentation:
>> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>>
>> public class StateUpdateTask implements
>> MapGroupsWithStateFunction<String, InputEventModel, ModelStateInfo,
>> ModelUpdate> {
>>
>>     @Override
>>     public ModelUpdate call(String productId, Iterator<InputEventModel>
>> eventsIterator, GroupState<ModelStateInfo> state) {
>>     }
>> }
>>
>> On Thu, May 28, 2020 at 10:59 PM Something Something <
>> mailinglists19@gmail.com> wrote:
>>
>>> I am assuming StateUpdateTask is your application specific class. Does
>>> it have 'updateState' method or something? I googled but couldn't find any
>>> documentation about doing it this way. Can you please direct me to some
>>> documentation. Thanks.
>>>
>>> On Thu, May 28, 2020 at 4:43 AM Srinivas V <sr...@gmail.com> wrote:
>>>
>>>> yes, I am using stateful structured streaming. Yes similar to what you
>>>> do. This is in Java
>>>> I do it this way:
>>>>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>>                 .groupByKey(
>>>>                         (MapFunction<InputEventModel, String>) event ->
>>>> event.getId(), Encoders.STRING())
>>>>                 .mapGroupsWithState(
>>>>                         new
>>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>>> appConfig, accumulators),
>>>>                         Encoders.bean(ModelStateInfo.class),
>>>>                         Encoders.bean(ModelUpdate.class),
>>>>                         GroupStateTimeout.ProcessingTimeTimeout());
>>>>
>>>> StateUpdateTask contains the update method.
>>>>
>>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>>> mailinglists19@gmail.com> wrote:
>>>>
>>>>> Yes, that's exactly how I am creating them.
>>>>>
>>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>>> you've something like this?
>>>>>
>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>         updateAcrossEvents
>>>>>       )
>>>>>
>>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Yes, I am talking about Application specific Accumulators. Actually I
>>>>>> am getting the values printed in my driver log as well as sent to Grafana.
>>>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a
>>>>>> yarn cluster(not local Mac) where I submit from master node. It should work
>>>>>> the same for cluster mode as well.
>>>>>> Create accumulators like this:
>>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>>
>>>>>>
>>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>
>>>>>>> Hmm... how would they go to Graphana if they are not getting
>>>>>>> computed in your code? I am talking about the Application Specific
>>>>>>> Accumulators. The other standard counters such as
>>>>>>> 'event.progress.inputRowsPerSecond' are getting populated correctly!
>>>>>>>
>>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hello,
>>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>>> values are coming there.
>>>>>>>>
>>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>>
>>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com> wrote:
>>>>>>>>>
>>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation for
>>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>>
>>>>>>>>>> ---
>>>>>>>>>> Cheers,
>>>>>>>>>> -z
>>>>>>>>>> [1]
>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>>> [2]
>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>>> [3]
>>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>>
>>>>>>>>>> ________________________________________
>>>>>>>>>> From: Something Something <ma...@gmail.com>
>>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>>> To: spark-user
>>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>>
>>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>>> functionality is supported and tested? I've spent a lot of time on this but
>>>>>>>>>> can't get it to work. Just to add more context, we've our own Accumulator
>>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track of one
>>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>>
>>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>>
>>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>>
>>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>>
>>>>>>>>>> I will keep looking for alternate approaches but any help would
>>>>>>>>>> be greatly appreciated. Thanks.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
>>>>>>>>>>
>>>>>>>>>> In my structured streaming job I am updating Spark Accumulators
>>>>>>>>>> in the updateAcrossEvents method but they are always 0 when I try to print
>>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>>
>>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>>         updateAcrossEvents
>>>>>>>>>>       )
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>>>>>>> ZERO!
>>>>>>>>>>
>>>>>>>>>> When I added log statements in the updateAcrossEvents, I could
>>>>>>>>>> see that these accumulators are getting incremented as expected.
>>>>>>>>>>
>>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
>>>>>>>>>> it works fine which implies that the Accumulators are not getting
>>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>>
>>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>>> SparkContext.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
Thanks! I will take a look at the link. Just one question, you seem to be
passing 'accumulators' in the constructor but where do you use it in the
StateUpdateTask class? I am still missing that connection. Sorry, if my
question is dumb. I must be missing something. Thanks for your help so far.
It's been useful.


On Fri, May 29, 2020 at 6:51 AM Srinivas V <sr...@gmail.com> wrote:

> Yes it is application specific class. This is how java Spark Functions
> work.
> You can refer to this code in the documentation:
> https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java
>
> public class StateUpdateTask implements MapGroupsWithStateFunction<String,
> InputEventModel, ModelStateInfo, ModelUpdate> {
>
>     @Override
>     public ModelUpdate call(String productId, Iterator<InputEventModel>
> eventsIterator, GroupState<ModelStateInfo> state) {
>     }
> }
>
> On Thu, May 28, 2020 at 10:59 PM Something Something <
> mailinglists19@gmail.com> wrote:
>
>> I am assuming StateUpdateTask is your application specific class. Does it
>> have 'updateState' method or something? I googled but couldn't find any
>> documentation about doing it this way. Can you please direct me to some
>> documentation. Thanks.
>>
>> On Thu, May 28, 2020 at 4:43 AM Srinivas V <sr...@gmail.com> wrote:
>>
>>> yes, I am using stateful structured streaming. Yes similar to what you
>>> do. This is in Java
>>> I do it this way:
>>>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>>                 .groupByKey(
>>>                         (MapFunction<InputEventModel, String>) event ->
>>> event.getId(), Encoders.STRING())
>>>                 .mapGroupsWithState(
>>>                         new
>>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>>> appConfig, accumulators),
>>>                         Encoders.bean(ModelStateInfo.class),
>>>                         Encoders.bean(ModelUpdate.class),
>>>                         GroupStateTimeout.ProcessingTimeTimeout());
>>>
>>> StateUpdateTask contains the update method.
>>>
>>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>>> mailinglists19@gmail.com> wrote:
>>>
>>>> Yes, that's exactly how I am creating them.
>>>>
>>>> Question... Are you using 'Stateful Structured Streaming' in which
>>>> you've something like this?
>>>>
>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>         updateAcrossEvents
>>>>       )
>>>>
>>>> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.
>>>>
>>>>
>>>>
>>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com> wrote:
>>>>
>>>>> Yes, I am talking about Application specific Accumulators. Actually I
>>>>> am getting the values printed in my driver log as well as sent to Grafana.
>>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a
>>>>> yarn cluster(not local Mac) where I submit from master node. It should work
>>>>> the same for cluster mode as well.
>>>>> Create accumulators like this:
>>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>>
>>>>>
>>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>>> mailinglists19@gmail.com> wrote:
>>>>>
>>>>>> Hmm... how would they go to Graphana if they are not getting computed
>>>>>> in your code? I am talking about the Application Specific Accumulators. The
>>>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>>>>> getting populated correctly!
>>>>>>
>>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>>> values are coming there.
>>>>>>>
>>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>>
>>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>>
>>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com> wrote:
>>>>>>>>
>>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation for
>>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>>
>>>>>>>>> ---
>>>>>>>>> Cheers,
>>>>>>>>> -z
>>>>>>>>> [1]
>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>>> [2]
>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>>> [3]
>>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>>
>>>>>>>>> ________________________________________
>>>>>>>>> From: Something Something <ma...@gmail.com>
>>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>>> To: spark-user
>>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>>
>>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>>> functionality is supported and tested? I've spent a lot of time on this but
>>>>>>>>> can't get it to work. Just to add more context, we've our own Accumulator
>>>>>>>>> class that extends from AccumulatorV2. In this class we keep track of one
>>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>>
>>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>>
>>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>>
>>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>>
>>>>>>>>> I will keep looking for alternate approaches but any help would be
>>>>>>>>> greatly appreciated. Thanks.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
>>>>>>>>>
>>>>>>>>> In my structured streaming job I am updating Spark Accumulators in
>>>>>>>>> the updateAcrossEvents method but they are always 0 when I try to print
>>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>>
>>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>>         updateAcrossEvents
>>>>>>>>>       )
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>>>>>> ZERO!
>>>>>>>>>
>>>>>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>>>>>> that these accumulators are getting incremented as expected.
>>>>>>>>>
>>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
>>>>>>>>> it works fine which implies that the Accumulators are not getting
>>>>>>>>> distributed correctly - or something like that!
>>>>>>>>>
>>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>>> SparkContext.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Srinivas V <sr...@gmail.com>.
Yes it is application specific class. This is how java Spark Functions work.
You can refer to this code in the documentation:
https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java

public class StateUpdateTask implements MapGroupsWithStateFunction<String,
InputEventModel, ModelStateInfo, ModelUpdate> {

    @Override
    public ModelUpdate call(String productId, Iterator<InputEventModel>
eventsIterator, GroupState<ModelStateInfo> state) {
    }
}

On Thu, May 28, 2020 at 10:59 PM Something Something <
mailinglists19@gmail.com> wrote:

> I am assuming StateUpdateTask is your application specific class. Does it
> have 'updateState' method or something? I googled but couldn't find any
> documentation about doing it this way. Can you please direct me to some
> documentation. Thanks.
>
> On Thu, May 28, 2020 at 4:43 AM Srinivas V <sr...@gmail.com> wrote:
>
>> yes, I am using stateful structured streaming. Yes similar to what you
>> do. This is in Java
>> I do it this way:
>>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>>                 .groupByKey(
>>                         (MapFunction<InputEventModel, String>) event ->
>> event.getId(), Encoders.STRING())
>>                 .mapGroupsWithState(
>>                         new
>> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
>> appConfig, accumulators),
>>                         Encoders.bean(ModelStateInfo.class),
>>                         Encoders.bean(ModelUpdate.class),
>>                         GroupStateTimeout.ProcessingTimeTimeout());
>>
>> StateUpdateTask contains the update method.
>>
>> On Thu, May 28, 2020 at 4:41 AM Something Something <
>> mailinglists19@gmail.com> wrote:
>>
>>> Yes, that's exactly how I am creating them.
>>>
>>> Question... Are you using 'Stateful Structured Streaming' in which
>>> you've something like this?
>>>
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>         updateAcrossEvents
>>>       )
>>>
>>> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.
>>>
>>>
>>>
>>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com> wrote:
>>>
>>>> Yes, I am talking about Application specific Accumulators. Actually I
>>>> am getting the values printed in my driver log as well as sent to Grafana.
>>>> Not sure where and when I saw 0 before. My deploy mode is “client” on a
>>>> yarn cluster(not local Mac) where I submit from master node. It should work
>>>> the same for cluster mode as well.
>>>> Create accumulators like this:
>>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>>
>>>>
>>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>>> mailinglists19@gmail.com> wrote:
>>>>
>>>>> Hmm... how would they go to Graphana if they are not getting computed
>>>>> in your code? I am talking about the Application Specific Accumulators. The
>>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>>>> getting populated correctly!
>>>>>
>>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>>> values are coming there.
>>>>>>
>>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>>> mailinglists19@gmail.com> wrote:
>>>>>>
>>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>>
>>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com> wrote:
>>>>>>>
>>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type
>>>>>>>> should be atomic or thread safe. I'm wondering if the implementation for
>>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>>
>>>>>>>> ---
>>>>>>>> Cheers,
>>>>>>>> -z
>>>>>>>> [1]
>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>>> [2]
>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>>> [3]
>>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>>
>>>>>>>> ________________________________________
>>>>>>>> From: Something Something <ma...@gmail.com>
>>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>>> To: spark-user
>>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>>
>>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>>> functionality is supported and tested? I've spent a lot of time on this but
>>>>>>>> can't get it to work. Just to add more context, we've our own Accumulator
>>>>>>>> class that extends from AccumulatorV2. In this class we keep track of one
>>>>>>>> or more accumulators. Here's the definition:
>>>>>>>>
>>>>>>>>
>>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>>
>>>>>>>> When the job begins we register an instance of this class:
>>>>>>>>
>>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>>
>>>>>>>> Is this working under Structured Streaming?
>>>>>>>>
>>>>>>>> I will keep looking for alternate approaches but any help would be
>>>>>>>> greatly appreciated. Thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
>>>>>>>>
>>>>>>>> In my structured streaming job I am updating Spark Accumulators in
>>>>>>>> the updateAcrossEvents method but they are always 0 when I try to print
>>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>>
>>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>>         updateAcrossEvents
>>>>>>>>       )
>>>>>>>>
>>>>>>>>
>>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>>>>> ZERO!
>>>>>>>>
>>>>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>>>>> that these accumulators are getting incremented as expected.
>>>>>>>>
>>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode
>>>>>>>> it works fine which implies that the Accumulators are not getting
>>>>>>>> distributed correctly - or something like that!
>>>>>>>>
>>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>>> SparkContext.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
I am assuming StateUpdateTask is your application specific class. Does it
have 'updateState' method or something? I googled but couldn't find any
documentation about doing it this way. Can you please direct me to some
documentation. Thanks.

On Thu, May 28, 2020 at 4:43 AM Srinivas V <sr...@gmail.com> wrote:

> yes, I am using stateful structured streaming. Yes similar to what you do.
> This is in Java
> I do it this way:
>     Dataset<ModelUpdate> productUpdates = watermarkedDS
>                 .groupByKey(
>                         (MapFunction<InputEventModel, String>) event ->
> event.getId(), Encoders.STRING())
>                 .mapGroupsWithState(
>                         new
> StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
> appConfig, accumulators),
>                         Encoders.bean(ModelStateInfo.class),
>                         Encoders.bean(ModelUpdate.class),
>                         GroupStateTimeout.ProcessingTimeTimeout());
>
> StateUpdateTask contains the update method.
>
> On Thu, May 28, 2020 at 4:41 AM Something Something <
> mailinglists19@gmail.com> wrote:
>
>> Yes, that's exactly how I am creating them.
>>
>> Question... Are you using 'Stateful Structured Streaming' in which you've
>> something like this?
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>         updateAcrossEvents
>>       )
>>
>> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.
>>
>>
>>
>> On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com> wrote:
>>
>>> Yes, I am talking about Application specific Accumulators. Actually I am
>>> getting the values printed in my driver log as well as sent to Grafana. Not
>>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
>>> cluster(not local Mac) where I submit from master node. It should work the
>>> same for cluster mode as well.
>>> Create accumulators like this:
>>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>>
>>>
>>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>>> mailinglists19@gmail.com> wrote:
>>>
>>>> Hmm... how would they go to Graphana if they are not getting computed
>>>> in your code? I am talking about the Application Specific Accumulators. The
>>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>>> getting populated correctly!
>>>>
>>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>>> But one consolation is that when I send metrics to Graphana, the
>>>>> values are coming there.
>>>>>
>>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>>> mailinglists19@gmail.com> wrote:
>>>>>
>>>>>> No this is not working even if I use LongAccumulator.
>>>>>>
>>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com> wrote:
>>>>>>
>>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>>
>>>>>>> ---
>>>>>>> Cheers,
>>>>>>> -z
>>>>>>> [1]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>>> [2]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>>> [3]
>>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>>
>>>>>>> ________________________________________
>>>>>>> From: Something Something <ma...@gmail.com>
>>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>>> To: spark-user
>>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>>
>>>>>>> Can someone from Spark Development team tell me if this
>>>>>>> functionality is supported and tested? I've spent a lot of time on this but
>>>>>>> can't get it to work. Just to add more context, we've our own Accumulator
>>>>>>> class that extends from AccumulatorV2. In this class we keep track of one
>>>>>>> or more accumulators. Here's the definition:
>>>>>>>
>>>>>>>
>>>>>>> class CollectionLongAccumulator[T]
>>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>>
>>>>>>> When the job begins we register an instance of this class:
>>>>>>>
>>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>>
>>>>>>> Is this working under Structured Streaming?
>>>>>>>
>>>>>>> I will keep looking for alternate approaches but any help would be
>>>>>>> greatly appreciated. Thanks.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
>>>>>>>
>>>>>>> In my structured streaming job I am updating Spark Accumulators in
>>>>>>> the updateAcrossEvents method but they are always 0 when I try to print
>>>>>>> them in my StreamingListener. Here's the code:
>>>>>>>
>>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>>         updateAcrossEvents
>>>>>>>       )
>>>>>>>
>>>>>>>
>>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>>> StreamingListener which writes values of the accumulators in
>>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>>>> ZERO!
>>>>>>>
>>>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>>>> that these accumulators are getting incremented as expected.
>>>>>>>
>>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>>>>>> works fine which implies that the Accumulators are not getting distributed
>>>>>>> correctly - or something like that!
>>>>>>>
>>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>>> SparkContext.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Srinivas V <sr...@gmail.com>.
yes, I am using stateful structured streaming. Yes similar to what you do.
This is in Java
I do it this way:
    Dataset<ModelUpdate> productUpdates = watermarkedDS
                .groupByKey(
                        (MapFunction<InputEventModel, String>) event ->
event.getId(), Encoders.STRING())
                .mapGroupsWithState(
                        new
StateUpdateTask(Long.parseLong(appConfig.getSparkStructuredStreamingConfig().STATE_TIMEOUT),
appConfig, accumulators),
                        Encoders.bean(ModelStateInfo.class),
                        Encoders.bean(ModelUpdate.class),
                        GroupStateTimeout.ProcessingTimeTimeout());

StateUpdateTask contains the update method.

On Thu, May 28, 2020 at 4:41 AM Something Something <
mailinglists19@gmail.com> wrote:

> Yes, that's exactly how I am creating them.
>
> Question... Are you using 'Stateful Structured Streaming' in which you've
> something like this?
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>         updateAcrossEvents
>       )
>
> And updating the Accumulator inside 'updateAcrossEvents'? We're experiencing this only under 'Stateful Structured Streaming'. In other streaming applications it works as expected.
>
>
>
> On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com> wrote:
>
>> Yes, I am talking about Application specific Accumulators. Actually I am
>> getting the values printed in my driver log as well as sent to Grafana. Not
>> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
>> cluster(not local Mac) where I submit from master node. It should work the
>> same for cluster mode as well.
>> Create accumulators like this:
>> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>>
>>
>> On Tue, May 26, 2020 at 8:42 PM Something Something <
>> mailinglists19@gmail.com> wrote:
>>
>>> Hmm... how would they go to Graphana if they are not getting computed in
>>> your code? I am talking about the Application Specific Accumulators. The
>>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>>> getting populated correctly!
>>>
>>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com> wrote:
>>>
>>>> Hello,
>>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>>> But one consolation is that when I send metrics to Graphana, the values
>>>> are coming there.
>>>>
>>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>>> mailinglists19@gmail.com> wrote:
>>>>
>>>>> No this is not working even if I use LongAccumulator.
>>>>>
>>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com> wrote:
>>>>>
>>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>>
>>>>>> ---
>>>>>> Cheers,
>>>>>> -z
>>>>>> [1]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>>> [2]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>>> [3]
>>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>>
>>>>>> ________________________________________
>>>>>> From: Something Something <ma...@gmail.com>
>>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>>> To: spark-user
>>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>>
>>>>>> Can someone from Spark Development team tell me if this functionality
>>>>>> is supported and tested? I've spent a lot of time on this but can't get it
>>>>>> to work. Just to add more context, we've our own Accumulator class that
>>>>>> extends from AccumulatorV2. In this class we keep track of one or more
>>>>>> accumulators. Here's the definition:
>>>>>>
>>>>>>
>>>>>> class CollectionLongAccumulator[T]
>>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>>
>>>>>> When the job begins we register an instance of this class:
>>>>>>
>>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>>
>>>>>> Is this working under Structured Streaming?
>>>>>>
>>>>>> I will keep looking for alternate approaches but any help would be
>>>>>> greatly appreciated. Thanks.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
>>>>>>
>>>>>> In my structured streaming job I am updating Spark Accumulators in
>>>>>> the updateAcrossEvents method but they are always 0 when I try to print
>>>>>> them in my StreamingListener. Here's the code:
>>>>>>
>>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>>         updateAcrossEvents
>>>>>>       )
>>>>>>
>>>>>>
>>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>>> StreamingListener which writes values of the accumulators in
>>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>>> ZERO!
>>>>>>
>>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>>> that these accumulators are getting incremented as expected.
>>>>>>
>>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>>>>> works fine which implies that the Accumulators are not getting distributed
>>>>>> correctly - or something like that!
>>>>>>
>>>>>> Note: I've seen quite a few answers on the Web that tell me to
>>>>>> perform an "Action". That's not a solution here. This is a 'Stateful
>>>>>> Structured Streaming' job. Yes, I am also 'registering' them in
>>>>>> SparkContext.
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
Yes, that's exactly how I am creating them.

Question... Are you using 'Stateful Structured Streaming' in which you've
something like this?

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )

And updating the Accumulator inside 'updateAcrossEvents'? We're
experiencing this only under 'Stateful Structured Streaming'. In other
streaming applications it works as expected.



On Wed, May 27, 2020 at 9:01 AM Srinivas V <sr...@gmail.com> wrote:

> Yes, I am talking about Application specific Accumulators. Actually I am
> getting the values printed in my driver log as well as sent to Grafana. Not
> sure where and when I saw 0 before. My deploy mode is “client” on a yarn
> cluster(not local Mac) where I submit from master node. It should work the
> same for cluster mode as well.
> Create accumulators like this:
> AccumulatorV2 accumulator = sparkContext.longAccumulator(name);
>
>
> On Tue, May 26, 2020 at 8:42 PM Something Something <
> mailinglists19@gmail.com> wrote:
>
>> Hmm... how would they go to Graphana if they are not getting computed in
>> your code? I am talking about the Application Specific Accumulators. The
>> other standard counters such as 'event.progress.inputRowsPerSecond' are
>> getting populated correctly!
>>
>> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com> wrote:
>>
>>> Hello,
>>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>>> But one consolation is that when I send metrics to Graphana, the values
>>> are coming there.
>>>
>>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>>> mailinglists19@gmail.com> wrote:
>>>
>>>> No this is not working even if I use LongAccumulator.
>>>>
>>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com> wrote:
>>>>
>>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should
>>>>> be atomic or thread safe. I'm wondering if the implementation for
>>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>>>> and test if the StreamingListener and other codes are able to work?
>>>>>
>>>>> ---
>>>>> Cheers,
>>>>> -z
>>>>> [1]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>>> [2]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>>> [3]
>>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>>
>>>>> ________________________________________
>>>>> From: Something Something <ma...@gmail.com>
>>>>> Sent: Saturday, May 16, 2020 0:38
>>>>> To: spark-user
>>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>>
>>>>> Can someone from Spark Development team tell me if this functionality
>>>>> is supported and tested? I've spent a lot of time on this but can't get it
>>>>> to work. Just to add more context, we've our own Accumulator class that
>>>>> extends from AccumulatorV2. In this class we keep track of one or more
>>>>> accumulators. Here's the definition:
>>>>>
>>>>>
>>>>> class CollectionLongAccumulator[T]
>>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>>
>>>>> When the job begins we register an instance of this class:
>>>>>
>>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>>
>>>>> Is this working under Structured Streaming?
>>>>>
>>>>> I will keep looking for alternate approaches but any help would be
>>>>> greatly appreciated. Thanks.
>>>>>
>>>>>
>>>>>
>>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
>>>>>
>>>>> In my structured streaming job I am updating Spark Accumulators in the
>>>>> updateAcrossEvents method but they are always 0 when I try to print them in
>>>>> my StreamingListener. Here's the code:
>>>>>
>>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>>         updateAcrossEvents
>>>>>       )
>>>>>
>>>>>
>>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>>> StreamingListener which writes values of the accumulators in
>>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>>> ZERO!
>>>>>
>>>>> When I added log statements in the updateAcrossEvents, I could see
>>>>> that these accumulators are getting incremented as expected.
>>>>>
>>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>>>> works fine which implies that the Accumulators are not getting distributed
>>>>> correctly - or something like that!
>>>>>
>>>>> Note: I've seen quite a few answers on the Web that tell me to perform
>>>>> an "Action". That's not a solution here. This is a 'Stateful Structured
>>>>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>>>>
>>>>>
>>>>>
>>>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Srinivas V <sr...@gmail.com>.
Yes, I am talking about Application specific Accumulators. Actually I am
getting the values printed in my driver log as well as sent to Grafana. Not
sure where and when I saw 0 before. My deploy mode is “client” on a yarn
cluster(not local Mac) where I submit from master node. It should work the
same for cluster mode as well.
Create accumulators like this:
AccumulatorV2 accumulator = sparkContext.longAccumulator(name);


On Tue, May 26, 2020 at 8:42 PM Something Something <
mailinglists19@gmail.com> wrote:

> Hmm... how would they go to Graphana if they are not getting computed in
> your code? I am talking about the Application Specific Accumulators. The
> other standard counters such as 'event.progress.inputRowsPerSecond' are
> getting populated correctly!
>
> On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com> wrote:
>
>> Hello,
>> Even for me it comes as 0 when I print in OnQueryProgress. I use
>> LongAccumulator as well. Yes, it prints on my local but not on cluster.
>> But one consolation is that when I send metrics to Graphana, the values
>> are coming there.
>>
>> On Tue, May 26, 2020 at 3:10 AM Something Something <
>> mailinglists19@gmail.com> wrote:
>>
>>> No this is not working even if I use LongAccumulator.
>>>
>>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com> wrote:
>>>
>>>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>>>> atomic or thread safe. I'm wondering if the implementation for
>>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>>> and test if the StreamingListener and other codes are able to work?
>>>>
>>>> ---
>>>> Cheers,
>>>> -z
>>>> [1]
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>>> [2]
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>>> [3]
>>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>>
>>>> ________________________________________
>>>> From: Something Something <ma...@gmail.com>
>>>> Sent: Saturday, May 16, 2020 0:38
>>>> To: spark-user
>>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>>
>>>> Can someone from Spark Development team tell me if this functionality
>>>> is supported and tested? I've spent a lot of time on this but can't get it
>>>> to work. Just to add more context, we've our own Accumulator class that
>>>> extends from AccumulatorV2. In this class we keep track of one or more
>>>> accumulators. Here's the definition:
>>>>
>>>>
>>>> class CollectionLongAccumulator[T]
>>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>>
>>>> When the job begins we register an instance of this class:
>>>>
>>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>>
>>>> Is this working under Structured Streaming?
>>>>
>>>> I will keep looking for alternate approaches but any help would be
>>>> greatly appreciated. Thanks.
>>>>
>>>>
>>>>
>>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
>>>>
>>>> In my structured streaming job I am updating Spark Accumulators in the
>>>> updateAcrossEvents method but they are always 0 when I try to print them in
>>>> my StreamingListener. Here's the code:
>>>>
>>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>>         updateAcrossEvents
>>>>       )
>>>>
>>>>
>>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>>> StreamingListener which writes values of the accumulators in
>>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>>> ZERO!
>>>>
>>>> When I added log statements in the updateAcrossEvents, I could see that
>>>> these accumulators are getting incremented as expected.
>>>>
>>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>>> works fine which implies that the Accumulators are not getting distributed
>>>> correctly - or something like that!
>>>>
>>>> Note: I've seen quite a few answers on the Web that tell me to perform
>>>> an "Action". That's not a solution here. This is a 'Stateful Structured
>>>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>>>
>>>>
>>>>
>>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
Hmm... how would they go to Graphana if they are not getting computed in
your code? I am talking about the Application Specific Accumulators. The
other standard counters such as 'event.progress.inputRowsPerSecond' are
getting populated correctly!

On Mon, May 25, 2020 at 8:39 PM Srinivas V <sr...@gmail.com> wrote:

> Hello,
> Even for me it comes as 0 when I print in OnQueryProgress. I use
> LongAccumulator as well. Yes, it prints on my local but not on cluster.
> But one consolation is that when I send metrics to Graphana, the values
> are coming there.
>
> On Tue, May 26, 2020 at 3:10 AM Something Something <
> mailinglists19@gmail.com> wrote:
>
>> No this is not working even if I use LongAccumulator.
>>
>> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com> wrote:
>>
>>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>>> atomic or thread safe. I'm wondering if the implementation for
>>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>>> and test if the StreamingListener and other codes are able to work?
>>>
>>> ---
>>> Cheers,
>>> -z
>>> [1]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>>> [2]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>>> [3]
>>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>>
>>> ________________________________________
>>> From: Something Something <ma...@gmail.com>
>>> Sent: Saturday, May 16, 2020 0:38
>>> To: spark-user
>>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>>
>>> Can someone from Spark Development team tell me if this functionality is
>>> supported and tested? I've spent a lot of time on this but can't get it to
>>> work. Just to add more context, we've our own Accumulator class that
>>> extends from AccumulatorV2. In this class we keep track of one or more
>>> accumulators. Here's the definition:
>>>
>>>
>>> class CollectionLongAccumulator[T]
>>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>>
>>> When the job begins we register an instance of this class:
>>>
>>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>>
>>> Is this working under Structured Streaming?
>>>
>>> I will keep looking for alternate approaches but any help would be
>>> greatly appreciated. Thanks.
>>>
>>>
>>>
>>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
>>>
>>> In my structured streaming job I am updating Spark Accumulators in the
>>> updateAcrossEvents method but they are always 0 when I try to print them in
>>> my StreamingListener. Here's the code:
>>>
>>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>>         updateAcrossEvents
>>>       )
>>>
>>>
>>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>>> StreamingListener which writes values of the accumulators in
>>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>>> ZERO!
>>>
>>> When I added log statements in the updateAcrossEvents, I could see that
>>> these accumulators are getting incremented as expected.
>>>
>>> This only happens when I run in the 'Cluster' mode. In Local mode it
>>> works fine which implies that the Accumulators are not getting distributed
>>> correctly - or something like that!
>>>
>>> Note: I've seen quite a few answers on the Web that tell me to perform
>>> an "Action". That's not a solution here. This is a 'Stateful Structured
>>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>>
>>>
>>>
>>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Srinivas V <sr...@gmail.com>.
Hello,
Even for me it comes as 0 when I print in OnQueryProgress. I use
LongAccumulator as well. Yes, it prints on my local but not on cluster.
But one consolation is that when I send metrics to Graphana, the values are
coming there.

On Tue, May 26, 2020 at 3:10 AM Something Something <
mailinglists19@gmail.com> wrote:

> No this is not working even if I use LongAccumulator.
>
> On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com> wrote:
>
>> There is a restriction in AccumulatorV2 API [1], the OUT type should be
>> atomic or thread safe. I'm wondering if the implementation for
>> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
>> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
>> and test if the StreamingListener and other codes are able to work?
>>
>> ---
>> Cheers,
>> -z
>> [1]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
>> [2]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
>> [3]
>> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>>
>> ________________________________________
>> From: Something Something <ma...@gmail.com>
>> Sent: Saturday, May 16, 2020 0:38
>> To: spark-user
>> Subject: Re: Using Spark Accumulators with Structured Streaming
>>
>> Can someone from Spark Development team tell me if this functionality is
>> supported and tested? I've spent a lot of time on this but can't get it to
>> work. Just to add more context, we've our own Accumulator class that
>> extends from AccumulatorV2. In this class we keep track of one or more
>> accumulators. Here's the definition:
>>
>>
>> class CollectionLongAccumulator[T]
>>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>>
>> When the job begins we register an instance of this class:
>>
>> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>>
>> Is this working under Structured Streaming?
>>
>> I will keep looking for alternate approaches but any help would be
>> greatly appreciated. Thanks.
>>
>>
>>
>> On Thu, May 14, 2020 at 2:36 PM Something Something <
>> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
>>
>> In my structured streaming job I am updating Spark Accumulators in the
>> updateAcrossEvents method but they are always 0 when I try to print them in
>> my StreamingListener. Here's the code:
>>
>> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>>         updateAcrossEvents
>>       )
>>
>>
>> The accumulators get incremented in 'updateAcrossEvents'. I've a
>> StreamingListener which writes values of the accumulators in
>> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
>> ZERO!
>>
>> When I added log statements in the updateAcrossEvents, I could see that
>> these accumulators are getting incremented as expected.
>>
>> This only happens when I run in the 'Cluster' mode. In Local mode it
>> works fine which implies that the Accumulators are not getting distributed
>> correctly - or something like that!
>>
>> Note: I've seen quite a few answers on the Web that tell me to perform an
>> "Action". That's not a solution here. This is a 'Stateful Structured
>> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>>
>>
>>
>>

Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
No this is not working even if I use LongAccumulator.

On Fri, May 15, 2020 at 9:54 PM ZHANG Wei <zw...@msn.com> wrote:

> There is a restriction in AccumulatorV2 API [1], the OUT type should be
> atomic or thread safe. I'm wondering if the implementation for
> `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace
> CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3]
> and test if the StreamingListener and other codes are able to work?
>
> ---
> Cheers,
> -z
> [1]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
> [2]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
> [3]
> http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator
>
> ________________________________________
> From: Something Something <ma...@gmail.com>
> Sent: Saturday, May 16, 2020 0:38
> To: spark-user
> Subject: Re: Using Spark Accumulators with Structured Streaming
>
> Can someone from Spark Development team tell me if this functionality is
> supported and tested? I've spent a lot of time on this but can't get it to
> work. Just to add more context, we've our own Accumulator class that
> extends from AccumulatorV2. In this class we keep track of one or more
> accumulators. Here's the definition:
>
>
> class CollectionLongAccumulator[T]
>     extends AccumulatorV2[T, java.util.Map[T, Long]]
>
> When the job begins we register an instance of this class:
>
> spark.sparkContext.register(myAccumulator, "MyAccumulator")
>
> Is this working under Structured Streaming?
>
> I will keep looking for alternate approaches but any help would be greatly
> appreciated. Thanks.
>
>
>
> On Thu, May 14, 2020 at 2:36 PM Something Something <
> mailinglists19@gmail.com<ma...@gmail.com>> wrote:
>
> In my structured streaming job I am updating Spark Accumulators in the
> updateAcrossEvents method but they are always 0 when I try to print them in
> my StreamingListener. Here's the code:
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>         updateAcrossEvents
>       )
>
>
> The accumulators get incremented in 'updateAcrossEvents'. I've a
> StreamingListener which writes values of the accumulators in
> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> ZERO!
>
> When I added log statements in the updateAcrossEvents, I could see that
> these accumulators are getting incremented as expected.
>
> This only happens when I run in the 'Cluster' mode. In Local mode it works
> fine which implies that the Accumulators are not getting distributed
> correctly - or something like that!
>
> Note: I've seen quite a few answers on the Web that tell me to perform an
> "Action". That's not a solution here. This is a 'Stateful Structured
> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>
>
>
>

Re: Using Spark Accumulators with Structured Streaming

Posted by ZHANG Wei <zw...@msn.com>.
There is a restriction in AccumulatorV2 API [1], the OUT type should be atomic or thread safe. I'm wondering if the implementation for `java.util.Map[T, Long]` can meet it or not. Is there any chance to replace CollectionLongAccumulator by CollectionAccumulator[2] or LongAccumulator[3] and test if the StreamingListener and other codes are able to work?

---
Cheers,
-z
[1] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.AccumulatorV2
[2] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.CollectionAccumulator
[3] http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.util.LongAccumulator

________________________________________
From: Something Something <ma...@gmail.com>
Sent: Saturday, May 16, 2020 0:38
To: spark-user
Subject: Re: Using Spark Accumulators with Structured Streaming

Can someone from Spark Development team tell me if this functionality is supported and tested? I've spent a lot of time on this but can't get it to work. Just to add more context, we've our own Accumulator class that extends from AccumulatorV2. In this class we keep track of one or more accumulators. Here's the definition:


class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be greatly appreciated. Thanks.



On Thu, May 14, 2020 at 2:36 PM Something Something <ma...@gmail.com>> wrote:

In my structured streaming job I am updating Spark Accumulators in the updateAcrossEvents method but they are always 0 when I try to print them in my StreamingListener. Here's the code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
        updateAcrossEvents
      )


The accumulators get incremented in 'updateAcrossEvents'. I've a StreamingListener which writes values of the accumulators in 'onQueryProgress' method but in this method the Accumulators are ALWAYS ZERO!

When I added log statements in the updateAcrossEvents, I could see that these accumulators are getting incremented as expected.

This only happens when I run in the 'Cluster' mode. In Local mode it works fine which implies that the Accumulators are not getting distributed correctly - or something like that!

Note: I've seen quite a few answers on the Web that tell me to perform an "Action". That's not a solution here. This is a 'Stateful Structured Streaming' job. Yes, I am also 'registering' them in SparkContext.




---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: Using Spark Accumulators with Structured Streaming

Posted by Something Something <ma...@gmail.com>.
Can someone from Spark Development team tell me if this functionality is
supported and tested? I've spent a lot of time on this but can't get it
to work. Just to add more context, we've our own Accumulator class that
extends from AccumulatorV2. In this class we keep track of one or more
accumulators. Here's the definition:

class CollectionLongAccumulator[T]
    extends AccumulatorV2[T, java.util.Map[T, Long]]

When the job begins we register an instance of this class:

spark.sparkContext.register(myAccumulator, "MyAccumulator")

Is this working under Structured Streaming?

I will keep looking for alternate approaches but any help would be
greatly appreciated. Thanks.




On Thu, May 14, 2020 at 2:36 PM Something Something <
mailinglists19@gmail.com> wrote:

> In my structured streaming job I am updating Spark Accumulators in the
> updateAcrossEvents method but they are always 0 when I try to print them in
> my StreamingListener. Here's the code:
>
> .mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
>         updateAcrossEvents
>       )
>
> The accumulators get incremented in 'updateAcrossEvents'. I've a
> StreamingListener which writes values of the accumulators in
> 'onQueryProgress' method but in this method the Accumulators are ALWAYS
> ZERO!
>
> When I added log statements in the updateAcrossEvents, I could see that
> these accumulators are getting incremented as expected.
>
> This only happens when I run in the 'Cluster' mode. In Local mode it works
> fine which implies that the Accumulators are not getting distributed
> correctly - or something like that!
>
> Note: I've seen quite a few answers on the Web that tell me to perform an
> "Action". That's not a solution here. This is a 'Stateful Structured
> Streaming' job. Yes, I am also 'registering' them in SparkContext.
>
>
>
>