You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Fabian Hueske <fh...@gmail.com> on 2017/12/04 10:51:17 UTC

Re: Maintain heavy hitters in Flink application

Hi Max,

state (keyed or operator state) is always local to the task.
By default it is not accessible (read or write) from the outside or other
tasks of the application.

You can expose keyed state as queryable state [1] to perform key look ups.
This feature was designed for external application to access the state of
Flink applications.
However, that should also work from inside the same job.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/queryable_state.html

2017-11-30 6:01 GMT+01:00 m@xi <ma...@gmail.com>:

> Hello everyone!
>
> I want to implement a streaming algorithm like Misa-Gries or Space Saving
> in
> Flink. The goal is to maintain the heavy hitters for my (possibly
> unbounded)
> input streams throughout all the time my app runs. More precisely, I want
> to
> have a non-stop running task that runs the Space Saving algorithm and
> updates a data structure that should be accessible by other tasks like map,
> flatmap of my Flink application at ad-hoc times. Although I am not so sure
> of how I can achieve the aforementioned goal.
>
> First is it possible to have a structure in my main function that is
> updated
> by a task at all times and to be also accesible by others transformations
> at
> ad-hoc times??
>
> Any ideas on how I can implement the above are more than welcome.
>
> Thanks in advance.
>
> Best,
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Maintain heavy hitters in Flink application

Posted by "m@xi" <ma...@gmail.com>.
Hi Timo,

Thanks a lot for the advice. I am working on it.

Cheers,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Maintain heavy hitters in Flink application

Posted by Timo Walther <tw...@apache.org>.
Hi,

I think it would be easier to implement a custom key selector and 
introduce some artifical key that spreads the load more evenly. This 
would also allow you to use keyed state. You could use a ProcessFunction 
and set timers to define the "every now and then". Keyed state would 
also ease the state redistribution in case the parallelism changes. 
Maybe could could also do the summary merge in some downstream 
operators. Maybe this talk [1] gives you some additional inspiration.

Regards,
Timo

[1] https://www.youtube.com/watch?v=Do7C4UJyWCM



Am 2/1/18 um 9:31 AM schrieb m@xi:
> Anyone, someone, somebody?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Maintain heavy hitters in Flink application

Posted by "m@xi" <ma...@gmail.com>.
Anyone, someone, somebody? 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Maintain heavy hitters in Flink application

Posted by "m@xi" <ma...@gmail.com>.
Hello everyone and Happy New Year!

Regarding the Heavy Hitter tracking...I wanna do it in a distributed manner. 

Thus,
1 -- Round Robin the input stream to a number of parallel map instances (say
p = env.parallelism)
2 -- Each one of the p mappers maintains approximately the HH of its
corresponding portion of the input, utilizing an algorithm like Space
Saving, Misha-Gries etc etc.
3 -- Every now and then I would like to concatenate the state of all the p
mappers into one, thus producing the global Space Saving summary for the
entire input stream.
4 -- Due to the fact that I wanna balance out things given to the p mappers
in the beginning, I wanna use rebalance(), i.e. round robin algorithm -->
Thus, its is not possible to use Keyed State.
5 -- So, I am going to use ListCheckpointed state as described in [1].
6 -- When the "every now and then" happens, I wanna merge the partial
summaries and I will emit them through a side output, as described in [2].

The question is the following: [1] shows an example of state-redistribution.
So...can I change the parallelism of the p instance parallel .map() from
within the operator, and merge the summaries for the HH there just before
emitting them to the side output???

Essentially, how should I implement the 6th bullet is my question.

Any advice, on it or on the general guideline implementation for getting the
aforementioned thing done, is more than welcome.

Cheer,
Max

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/api/java/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/side_output.html



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Maintain heavy hitters in Flink application

Posted by "m@xi" <ma...@gmail.com>.
Kostas and Fabian,

Thanks for the advice. I guess I will find a workaround to do the state
redistribution.

I also read about side outputs in this thread, which might be also an option
that I will consider.

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Share-state-across-operators-td17031.html

Best,
Makis



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Maintain heavy hitters in Flink application

Posted by Kostas Kloudas <k....@data-artisans.com>.
Hi Max,

You are right that Queryable State is not designed to be used as a means for a job to query its own state.
In fact, given that you do not know the jobId of your job from within the job itself, I do not think you can use 
queryable state in your scenario.

What you can do is to have a flatMap computing the hot keys or heavy hitters, and emit as main output the 
elements themselves for further processing, and as a side output the computed statistics. The side output 
is a data stream itself so you can store it in an external storage system (e.g. a KV store) and use AsyncIO to 
query that system downstream. This will solve the problem of having access to the state from all tasks. 

This is a simple solution but I am not sure about the performance implications. 
You can try it to see if it actually fits your needs.

Thanks, 
Kostas


> On Dec 5, 2017, at 10:32 AM, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi,
> 
> I haven't done that before either. The query API will change with the next version (Flink 1.4.0) which is currently being prepared for releasing.
> Kostas (in CC) might be able to help you.
> 
> Best, Fabian
> 
> 2017-12-05 9:52 GMT+01:00 m@xi <makisntpap@gmail.com <ma...@gmail.com>>:
> Hi Fabian,
> 
> Thanks for your answer. Initially, I have excluded Queryable State as an
> option as it explicitly mentioned that it is used for querying state outside
> flink.
> 
> Now that I am reading the documentation I am not sure how I may achieve
> that. I have to set ports and addresses which I am not sure I should since I
> am reading the queryable state from inside the same job.
> 
> Can you or someone elaborate further how can I read the queryable state of a
> specific task from another task (e.g. map).
> 
> Best,
> Max
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
> 


Re: Maintain heavy hitters in Flink application

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I haven't done that before either. The query API will change with the next
version (Flink 1.4.0) which is currently being prepared for releasing.
Kostas (in CC) might be able to help you.

Best, Fabian

2017-12-05 9:52 GMT+01:00 m@xi <ma...@gmail.com>:

> Hi Fabian,
>
> Thanks for your answer. Initially, I have excluded Queryable State as an
> option as it explicitly mentioned that it is used for querying state
> outside
> flink.
>
> Now that I am reading the documentation I am not sure how I may achieve
> that. I have to set ports and addresses which I am not sure I should since
> I
> am reading the queryable state from inside the same job.
>
> Can you or someone elaborate further how can I read the queryable state of
> a
> specific task from another task (e.g. map).
>
> Best,
> Max
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>

Re: Maintain heavy hitters in Flink application

Posted by "m@xi" <ma...@gmail.com>.
Hi Fabian,

Thanks for your answer. Initially, I have excluded Queryable State as an
option as it explicitly mentioned that it is used for querying state outside
flink.

Now that I am reading the documentation I am not sure how I may achieve
that. I have to set ports and addresses which I am not sure I should since I
am reading the queryable state from inside the same job.

Can you or someone elaborate further how can I read the queryable state of a
specific task from another task (e.g. map).

Best,
Max



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/