You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Metzger <rm...@apache.org> on 2016/12/05 11:30:44 UTC

Re: CEP issue

Hi Kieran,

which statebackend are you using for your CEP job? Using RocksDB as a state
backend could potentially fix the issue.
What's the number of keys in your stream?


On Tue, Nov 29, 2016 at 3:18 PM, kieran . <ki...@hotmail.com> wrote:

> Hello,
>
> I am currently building a multi-tenant monitoring application and
> exploring the effectiveness of different Complex Event Processors (CEP) and
> whether or not this would be a potential solution for what I want to
> achieve. I have created a small test application which utilises Flink and
> its CEP but I have come across some issues when dealing with a large number
> of metrics to monitor when using patterns/pattern streams. Flink seems to
> operate as expected with one, or several patterns each consuming it's own
> PatternStream, but as soon as more are introduced the memory usage of Flink
> seems to rise rather quickly and eventually throw an OutOfMemoryError. My
> initial idea was to create one pattern/pattern stream for each metric that
> I need to monitor, but there could be many thousands of these.
>
> I create the PatternStream per Pattern like this to monitor a metric:
>
> *  Pattern<MetricData, ?> pattern = Pattern.<MetricData> begin(
> patternName ).subtype( MetricData.class )*
>
> *                .where(*
>
> *                (evt -> evt.getValues().get( "max" ).longValue() > 50.0*
>
> *                        && evt.account_id.equals( accountName )) );*
>
>
> *        check.withPattern( pattern )*
>
> *                .withTimePeriod( Integer.valueOf( 1 ) )*
>
> *                .withCooldown( Integer.valueOf( 1 ) )*
>
> *                .withName( checkName )*
>
> *                .withAlertStatus( AlertStatus.OK )*
>
> *
> .setPatternStream(CEP.pattern(messageStream.keyBy("account_id"), pattern));*
>
>
> To trigger these patterns, I use
>
> *        PatternSelectFunction<MetricData, MetricWarning> psf = new
> PatternSelectFunction<MetricData, MetricWarning>()*
>
> *        {*
>
> *            @Override*
>
> *            public MetricWarning select( Map<String, MetricData> map )
> throws Exception*
>
> *            {*
>
> *                return new MetricWarning(map.get(patternKey), name,
> accountId);*
>
> *            }*
>
>
> *        };*
>
>
> *        try*
>
> *        {*
>
> *            check.getPatternStream().select(psf);*
>
> *        }*
>
> *        catch( Exception exception )*
>
> *        {*
>
> *            exception.printStackTrace();*
>
> *        }*
>
>
>
> The pattern in the above example is tied to a specific stream which would
> result in one stream per pattern and this seems to be an issue using this
> approach. If it would be possible to run one pattern stream and switching
> out the patterns when needed, then perhaps this would be a viable solution.
> Am I approaching this in the right way by creating a stream for each
> pattern?
>
> Would it be possible to create a set of Pattern processors that could be
> run against a single PatternStream or is there anything you could suggest
> which would allow me to do this with Flink?
>
> Thanks,
> - Kieran
>
> <http://aka.ms/weboutlook>
>