You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@eagle.apache.org by "Zhang, Edward (GDI Hadoop)" <yo...@ebay.com> on 2015/12/11 02:49:01 UTC

[Discuss] Eagle Policy State Management

This topic has been discussed offline for a while and it is time we document problems and solutions. With clear problem statements and proposed solutions, I believe we can do better before we start implementing.

[Problem Statement] For Eagle as real-time big data monitoring framework evaluating policies efficiently is the core functionality. Most of meaningful polices are stateful in that policy evaluation is not based on a single event but on both previous events and current event. This potentially brings 2 fundamental problems, one is policy state loss upon machine failures or topology restart, the other is lacking history data upon fresh start. One simple example is for a policy like “from userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt group by user having cnt > 1000”, if the task is restarted, the state of accumulated user/count map is missing. Also when the topology is started at the first time, the window is empty even if we have historic data in database.

[Proposed Solutions] The natural way of solving the above 2 problems is
1) do policy state persist periodically and restore policy state after task is restarted
Underlying policy engine should support snapshot and restore operations. In Siddhi 3.x, it already supports snapshot and restore, though I found some bugs with their state management. https://wso2.org/jira/browse/CEP-1433
For restore, it is not that straight-forward unless all input events to policy evaluator are backed by a reliable and rewindable storage like Kafka.
If input events to policy evaluator is backed by Kafka, then each time when EAGLE takes snapshot, we records the current offset together and persist both of them to deep storage.
If input events to policy evaluator is not backed by Kafka, then we need record every event since last snapshot. That looks very expensive. Apache Flink uses efficient algorithm called stream barrier to do group acknowledgement, but in Storm we don’t have this feature. But remember Apache Flink requires that each task do snapshot not only for policy evaluator.

2) transparently load historical data when topology is started at the first time
If policy evaluator accepts data which is already persisted in database or Kafka, we can provide API to retrieve data from database or Kafka. This loading is transparent to developer, but developer/user needs to specify the deep storage for storing historical data.

Also be aware that if we have the right solution for policy evaluator, the solution should be also applied to event aggregation.https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze

Another aggressive way is to use Flink stream barrier similar solution http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/ to take snapshot to all eagle tasks(typically spout and bolt) but turn off storm ACK mechanism.

trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], R] {
  def prepareConfig(config : Config)
  def init
  def fields : Array[String]
}


==>


trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef], R] {
  def prepareConfig(config : Config)
  def init
  def fields : Array[String]

  def snapshot : Array[Byte]

  def restore(state: Array[Byte])
}

This is pretty much important to Eagle if we want Eagle to be a monitoring framework with fault-tolerance.

Thanks
Edward
From: Sriskandarajah Suhothayan <su...@wso2.com>>
Date: Thursday, December 10, 2015 at 9:30
To: "Zhang, Edward (GDI Hadoop)" <yo...@ebay.com>>
Cc: "dev@eagle.incubator.apache.org<ma...@eagle.incubator.apache.org>" <de...@eagle.incubator.apache.org>>, Edward Zhang <yo...@apache.org>>, Srinath Perera <sr...@wso2.com>>, WSO2 Developers' List <de...@wso2.org>>
Subject: Re: [Dev] [Siddhi] what events is left in the window

Thanks for pointing it out,

We are looking into this.
Will update you ASAP

Suho

On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <yo...@ebay.com>> wrote:
By the way, we use siddhi version 3.0.2.

Also for tracking this issue, I created jira
https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for
aggregation on time based window

Thanks
Edward

On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <yo...@ebay.com>> wrote:

>Thanks for this suggestion, Suho.
>
>I did some testing on state persist and restore, looks most of use cases
>are working except group by. I am not if Siddhi team has known this.
>
>Please look at my test cases : testTimeSlideWindowWithGroupby
>https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d
>f9a1f85758168efcb
>
>The query is like the following
>String cseEventStream = "define stream testStream (timeStamp long, user
>string, cmd string);";
>                + String query = "@info(name = 'query1') from
>testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>                + + " select user, timeStamp, count() as cnt"
>                + + " group by user"
>                + + " having cnt > 2"
>                + + " insert all events into outputStream;";
>
>The basic issue could be the following:
>1) when taking snapshot, it will persist all Count executor per key. But
>looks Siddhi adds same Count executor into snapshot list multiple
>times(The count aggregator elementId is $planName+keyname)
>2) when restoring snapshot, it will not restore the Count executor for
>key because snopshotableList does not have the above key.  That key only
>is generated when event comes in. When do restoration, we don¹t know
>previous events.
>
>for (Snapshotable snapshotable : snapshotableList) {
>    snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>}
>
>Please let me know if there is some issue with my test program or
>something is wrong with Siddhi group by/aggregator snapshot
>
>Thanks
>Edward
>
>From: Sriskandarajah Suhothayan <su...@wso2.com>>>
>Date: Wednesday, November 25, 2015 at 21:07
>To: Edward Zhang <yo...@apache.org>>>
>Cc: Srinath Perera <sr...@wso2.com>>>, WSO2
>Developers' List <de...@wso2.org>>>
>Subject: Re: [Dev] [Siddhi] what events is left in the window
>
>Hi
>
>Currently the concept of current event & expired events live within the
>query and all events going out to a stream are converted back to current
>events. So its hard for the application to keep track of the window and
>aggregation states like count, avg, std, etc...
>Further window implementations can very based on its implementations
>hence in some cases knowing what events entered and existed will not be
>enough to recreate the window during failure.
>
>The recommended way to keep track of state in Siddhi is via snapshots,
>you can take snapshots of the siddhi Runtime with a reasonable time
>frame. and also buffer a copy of the events sent to siddhi after that
>snapshot, with this method when there is a failure we should restore the
>latest snapshot and replay the events which are sent after the last
>snapshot. The tricky part is the way you buffer events after snapshot,
>using Kafka and replaying is one option.
>
>Regards
>Suho
>
>On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
><yo...@apache.org>>> wrote:
>I tried expired events before, it only works for the query without
>groupby. If the query contains groupby and having clause, then it only
>emit just expired event when having conditions is satisfied.
>
>For example
>
>String cseEventStream = "define stream TempStream (user string, cmd
>string);";
>String query = "@info(name = 'query1') from TempStream#window.length(4) "
>        + " select user, cmd, count(user) as cnt " +
>        " group by user " +
>        "having cnt > 3 "
>        + " insert all events into DelayedTempStream";
>
>If we send events as follows, it will not generate expired events at all.
>
>inputHandler.send(new Object[]{"user", "open1"});
>inputHandler.send(new Object[]{"user", "open2"});
>inputHandler.send(new Object[]{"user", "open3"});
>inputHandler.send(new Object[]{"user", "open4"});
>inputHandler.send(new Object[]{"user", "open5"});
>
>
>Thanks
>Edward Zhang
>
>On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
><sr...@wso2.com>>> wrote:
>Adding Suho
>
>Hi Edward,
>
>Each window give you a stream of expired events as well. Would that work?
>
>https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W
>indow
>
>Thank
>Srinath
>
>On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
><yo...@apache.org>>> wrote:
>Hi Siddhi team,
>
>Do we have anyway of tracking what events are removed from any type of
>windows, length(batch), or time(batch)? I investigated that removeEvents
>may not be the right solution.
>
>We have one requirement of migrating policy from one machine to another
>machine but keeping internal state there.
>
>Eagle uses policy in storm infrastructure, but one machine which holds
>the policy fails, then already-populated events in the window also are
>gone. Sometimes it is bad especially when we have built up a long window
>like monthly data.
>
>One possible way is to keep all events in the siddhi window to be
>snapshotted into application domain. Another way is to keep tracking what
>events are coming in and out, so application can track what are left in
>siddhi window.
>
>Here is the ticket for Eagle
>https://issues.apache.org/jira/browse/EAGLE-39
>
>Do you have similar request before? Or what do you suggest?
>
>Thanks
>Edward Zhang
>
>_______________________________________________
>Dev mailing list
>Dev@wso2.org<ma...@wso2.org>>
>http://wso2.org/cgi-bin/mailman/listinfo/dev
>
>
>
>
>--
>============================
>Srinath Perera, Ph.D.
>   http://people.apache.org/~hemapani/
>   http://srinathsview.blogspot.com/
>
>
>
>
>--
>S. Suhothayan
>Technical Lead & Team Lead of WSO2 Complex Event Processor
>WSO2 Inc. http://wso2.com<http://wso2.com/>
><http://wso2.com/>
>lean . enterprise . middleware
>
>cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog: http://suhothayan.blogspot.com/
>twitter: http://twitter.com/suhothayan | linked-in:
>http://lk.linkedin.com/in/suhothayan




--
S. Suhothayan
Technical Lead & Team Lead of WSO2 Complex Event Processor
WSO2 Inc. http://wso2.com<http://wso2.com/>
<http://wso2.com/>
lean . enterprise . middleware

cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
twitter: http://twitter.com/suhothayan | linked-in: http://lk.linkedin.com/in/suhothayan

Re: [Discuss] Eagle Policy State Management

Posted by "Zhang, Edward (GDI Hadoop)" <yo...@ebay.com>.
That is a valid concern which we also are aware. It is perfect if you can
introduce some leaders of those projects for the discussions.

The good thing is that Eagle has abstracted the interface so it is
possible to change underlying streaming infrastructure (but it is not
trivial work)

But as Eagle’s policy framework is the core part which should be resilient
to faults, Eagle still needs some investigation on persisting/restoring
policy state. That is why we have asked Siddhi team to fix some bugs.

Thanks
Edward

On 12/11/15, 11:27, "Julian Hyde" <jh...@apache.org> wrote:

>State management of streams (including what I’d call “derived streams”)
>is a hard distributed systems problem. Ideally it would be solved by the
>stream provider, not by the Eagle project. I think you should talk to the
>various streaming projects ― Storm, Samza, Kafka, Flink ― and find out
>whether they can solve this, or whether it is on their roadmap.
>
>I can make introductions to the leaders of those projects if needed.
>
>If the problem is solved at source, Eagle can focus on the actual problem
>rather than infrastructure.
>
>Julian
>
>
>> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <su...@gmail.com> wrote:
>> 
>> Great proposal, this is important and could be general served for policy
>> capability and analytic feature.
>> 
>> Periodically taken the snapshot independently on each bolt could make
>> status recoverable from recent history, but from whole topology store
>>point
>> of view, this could not hand bolt status dependency exactly.
>> 
>> Another point is should the state restore be triggered not only when
>> topology restarts but also when
>> a. topology re-balance
>> b. single bolt movement by underling stream framework for one executor
>>to
>> another?
>> 
>> Thanks,
>> Ralph
>> 
>> 
>> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
>> yonzhang@ebay.com> wrote:
>> 
>>> This topic has been discussed offline for a while and it is time we
>>> document problems and solutions. With clear problem statements and
>>>proposed
>>> solutions, I believe we can do better before we start implementing.
>>> 
>>> [Problem Statement] For Eagle as real-time big data monitoring
>>>framework
>>> evaluating policies efficiently is the core functionality. Most of
>>> meaningful polices are stateful in that policy evaluation is not based
>>>on a
>>> single event but on both previous events and current event. This
>>> potentially brings 2 fundamental problems, one is policy state loss
>>>upon
>>> machine failures or topology restart, the other is lacking history data
>>> upon fresh start. One simple example is for a policy like “from
>>> userActivity[cmd==‘delete’]time.window(1 month) select user, count()
>>>as cnt
>>> group by user having cnt > 1000”, if the task is restarted, the state
>>>of
>>> accumulated user/count map is missing. Also when the topology is
>>>started at
>>> the first time, the window is empty even if we have historic data in
>>> database.
>>> 
>>> [Proposed Solutions] The natural way of solving the above 2 problems is
>>> 1) do policy state persist periodically and restore policy state after
>>> task is restarted
>>> Underlying policy engine should support snapshot and restore
>>>operations.
>>> In Siddhi 3.x, it already supports snapshot and restore, though I found
>>> some bugs with their state management.
>>> https://wso2.org/jira/browse/CEP-1433
>>> For restore, it is not that straight-forward unless all input events to
>>> policy evaluator are backed by a reliable and rewindable storage like
>>>Kafka.
>>> If input events to policy evaluator is backed by Kafka, then each time
>>> when EAGLE takes snapshot, we records the current offset together and
>>> persist both of them to deep storage.
>>> If input events to policy evaluator is not backed by Kafka, then we
>>>need
>>> record every event since last snapshot. That looks very expensive.
>>>Apache
>>> Flink uses efficient algorithm called stream barrier to do group
>>> acknowledgement, but in Storm we don’t have this feature. But remember
>>> Apache Flink requires that each task do snapshot not only for policy
>>> evaluator.
>>> 
>>> 2) transparently load historical data when topology is started at the
>>> first time
>>> If policy evaluator accepts data which is already persisted in
>>>database or
>>> Kafka, we can provide API to retrieve data from database or Kafka. This
>>> loading is transparent to developer, but developer/user needs to
>>>specify
>>> the deep storage for storing historical data.
>>> 
>>> Also be aware that if we have the right solution for policy evaluator,
>>>the
>>> solution should be also applied to event aggregation.
>>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>>> 
>>> Another aggressive way is to use Flink stream barrier similar solution
>>> 
>>>http://data-artisans.com/high-throughput-low-latency-and-exactly-once-st
>>>ream-processing-with-apache-flink/
>>> to take snapshot to all eagle tasks(typically spout and bolt) but turn
>>>off
>>> storm ACK mechanism.
>>> 
>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>FlatMapper[Seq[AnyRef],
>>> R] {
>>>  def prepareConfig(config : Config)
>>>  def init
>>>  def fields : Array[String]
>>> }
>>> 
>>> 
>>> ==>
>>> 
>>> 
>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>FlatMapper[Seq[AnyRef],
>>> R] {
>>>  def prepareConfig(config : Config)
>>>  def init
>>>  def fields : Array[String]
>>> 
>>>  def snapshot : Array[Byte]
>>> 
>>>  def restore(state: Array[Byte])
>>> }
>>> 
>>> This is pretty much important to Eagle if we want Eagle to be a
>>>monitoring
>>> framework with fault-tolerance.
>>> 
>>> Thanks
>>> Edward
>>> From: Sriskandarajah Suhothayan <su...@wso2.com>>
>>> Date: Thursday, December 10, 2015 at 9:30
>>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>> yonzhang@ebay.com>>
>>> Cc: 
>>>"dev@eagle.incubator.apache.org<ma...@eagle.incubator.apache.org>"
>>> 
>>><de...@eagle.incubator.apache.org>>,
>>> Edward Zhang <yo...@apache.org>>,
>>> Srinath Perera <sr...@wso2.com>>, WSO2
>>> Developers' List <de...@wso2.org>>
>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>> 
>>> Thanks for pointing it out,
>>> 
>>> We are looking into this.
>>> Will update you ASAP
>>> 
>>> Suho
>>> 
>>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>>> yonzhang@ebay.com<ma...@ebay.com>> wrote:
>>> By the way, we use siddhi version 3.0.2.
>>> 
>>> Also for tracking this issue, I created jira
>>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work
>>>for
>>> aggregation on time based window
>>> 
>>> Thanks
>>> Edward
>>> 
>>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)"
>>><yonzhang@ebay.com<mailto:
>>> yonzhang@ebay.com>> wrote:
>>> 
>>>> Thanks for this suggestion, Suho.
>>>> 
>>>> I did some testing on state persist and restore, looks most of use
>>>>cases
>>>> are working except group by. I am not if Siddhi team has known this.
>>>> 
>>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>>> 
>>> 
>>>https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a2
>>>0d
>>>> f9a1f85758168efcb
>>>> 
>>>> The query is like the following
>>>> String cseEventStream = "define stream testStream (timeStamp long,
>>>>user
>>>> string, cmd string);";
>>>>               + String query = "@info(name = 'query1') from
>>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>>               + + " select user, timeStamp, count() as cnt"
>>>>               + + " group by user"
>>>>               + + " having cnt > 2"
>>>>               + + " insert all events into outputStream;";
>>>> 
>>>> The basic issue could be the following:
>>>> 1) when taking snapshot, it will persist all Count executor per key.
>>>>But
>>>> looks Siddhi adds same Count executor into snapshot list multiple
>>>> times(The count aggregator elementId is $planName+keyname)
>>>> 2) when restoring snapshot, it will not restore the Count executor for
>>>> key because snopshotableList does not have the above key.  That key
>>>>only
>>>> is generated when event comes in. When do restoration, we don¹t know
>>>> previous events.
>>>> 
>>>> for (Snapshotable snapshotable : snapshotableList) {
>>>>   
>>>>snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>>>> }
>>>> 
>>>> Please let me know if there is some issue with my test program or
>>>> something is wrong with Siddhi group by/aggregator snapshot
>>>> 
>>>> Thanks
>>>> Edward
>>>> 
>>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>>> <ma...@wso2.com>>>
>>>> Date: Wednesday, November 25, 2015 at 21:07
>>>> To: Edward Zhang
>>>><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>>> <ma...@apache.org>>>
>>>> Cc: Srinath Perera <sr...@wso2.com><mailto:
>>> srinath@wso2.com<ma...@wso2.com>>>, WSO2
>>>> Developers' List
>>>><de...@wso2.org><mailto:dev@wso2.org
>>> <ma...@wso2.org>>>
>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>> 
>>>> Hi
>>>> 
>>>> Currently the concept of current event & expired events live within
>>>>the
>>>> query and all events going out to a stream are converted back to
>>>>current
>>>> events. So its hard for the application to keep track of the window
>>>>and
>>>> aggregation states like count, avg, std, etc...
>>>> Further window implementations can very based on its implementations
>>>> hence in some cases knowing what events entered and existed will not
>>>>be
>>>> enough to recreate the window during failure.
>>>> 
>>>> The recommended way to keep track of state in Siddhi is via snapshots,
>>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>>> frame. and also buffer a copy of the events sent to siddhi after that
>>>> snapshot, with this method when there is a failure we should restore
>>>>the
>>>> latest snapshot and replay the events which are sent after the last
>>>> snapshot. The tricky part is the way you buffer events after snapshot,
>>>> using Kafka and replaying is one option.
>>>> 
>>>> Regards
>>>> Suho
>>>> 
>>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>>> <yo...@apache.org><mailto:
>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>> I tried expired events before, it only works for the query without
>>>> groupby. If the query contains groupby and having clause, then it only
>>>> emit just expired event when having conditions is satisfied.
>>>> 
>>>> For example
>>>> 
>>>> String cseEventStream = "define stream TempStream (user string, cmd
>>>> string);";
>>>> String query = "@info(name = 'query1') from
>>>>TempStream#window.length(4) "
>>>>       + " select user, cmd, count(user) as cnt " +
>>>>       " group by user " +
>>>>       "having cnt > 3 "
>>>>       + " insert all events into DelayedTempStream";
>>>> 
>>>> If we send events as follows, it will not generate expired events at
>>>>all.
>>>> 
>>>> inputHandler.send(new Object[]{"user", "open1"});
>>>> inputHandler.send(new Object[]{"user", "open2"});
>>>> inputHandler.send(new Object[]{"user", "open3"});
>>>> inputHandler.send(new Object[]{"user", "open4"});
>>>> inputHandler.send(new Object[]{"user", "open5"});
>>>> 
>>>> 
>>>> Thanks
>>>> Edward Zhang
>>>> 
>>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>>> <sr...@wso2.com><mailto:srinath@wso2.com
>>> <ma...@wso2.com>>> wrote:
>>>> Adding Suho
>>>> 
>>>> Hi Edward,
>>>> 
>>>> Each window give you a stream of expired events as well. Would that
>>>>work?
>>>> 
>>>> 
>>> 
>>>https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0
>>>-W
>>>> indow
>>>> 
>>>> Thank
>>>> Srinath
>>>> 
>>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>>> <yo...@apache.org><mailto:
>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>> Hi Siddhi team,
>>>> 
>>>> Do we have anyway of tracking what events are removed from any type of
>>>> windows, length(batch), or time(batch)? I investigated that
>>>>removeEvents
>>>> may not be the right solution.
>>>> 
>>>> We have one requirement of migrating policy from one machine to
>>>>another
>>>> machine but keeping internal state there.
>>>> 
>>>> Eagle uses policy in storm infrastructure, but one machine which holds
>>>> the policy fails, then already-populated events in the window also are
>>>> gone. Sometimes it is bad especially when we have built up a long
>>>>window
>>>> like monthly data.
>>>> 
>>>> One possible way is to keep all events in the siddhi window to be
>>>> snapshotted into application domain. Another way is to keep tracking
>>>>what
>>>> events are coming in and out, so application can track what are left
>>>>in
>>>> siddhi window.
>>>> 
>>>> Here is the ticket for Eagle
>>>> https://issues.apache.org/jira/browse/EAGLE-39
>>>> 
>>>> Do you have similar request before? Or what do you suggest?
>>>> 
>>>> Thanks
>>>> Edward Zhang
>>>> 
>>>> _______________________________________________
>>>> Dev mailing list
>>>> 
>>>>Dev@wso2.org<ma...@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.o
>>>>rg
>>>>> 
>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> ============================
>>>> Srinath Perera, Ph.D.
>>>>  http://people.apache.org/~hemapani/
>>>>  http://srinathsview.blogspot.com/
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> S. Suhothayan
>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>> <http://wso2.com/>
>>>> lean . enterprise . middleware
>>>> 
>>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
>>> http://suhothayan.blogspot.com/
>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>> http://lk.linkedin.com/in/suhothayan
>>> 
>>> 
>>> 
>>> 
>>> --
>>> S. Suhothayan
>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>> <http://wso2.com/>
>>> lean . enterprise . middleware
>>> 
>>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>>> twitter: http://twitter.com/suhothayan | linked-in:
>>> http://lk.linkedin.com/in/suhothayan
>>> 
>


Re: [Discuss] Eagle Policy State Management

Posted by "Zhang, Edward (GDI Hadoop)" <yo...@ebay.com>.
That’s cool, I will look into those new features. But does that provide
automatically snapshot and restore?

Thanks
Edward

On 12/11/15, 12:18, "P. Taylor Goetz" <pt...@gmail.com> wrote:

>In Storm 1.0 (which we hope to release in the next month or so) adds
>distributed cache/blobstore functionality that could be leveraged to
>solve a lot of the problems described in this thread. Another relevant
>feature is native windowing with persistent state (currently under
>development).
>
>Documentation of these features is a little light, but I’ll try to
>forward it on to this list when it’s more fully baked.
>
>-Taylor
>
>> On Dec 11, 2015, at 2:27 PM, Julian Hyde <jh...@apache.org> wrote:
>> 
>> State management of streams (including what I’d call “derived streams”)
>>is a hard distributed systems problem. Ideally it would be solved by the
>>stream provider, not by the Eagle project. I think you should talk to
>>the various streaming projects ― Storm, Samza, Kafka, Flink ― and find
>>out whether they can solve this, or whether it is on their roadmap.
>> 
>> I can make introductions to the leaders of those projects if needed.
>> 
>> If the problem is solved at source, Eagle can focus on the actual
>>problem rather than infrastructure.
>> 
>> Julian
>> 
>> 
>>> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <su...@gmail.com> wrote:
>>> 
>>> Great proposal, this is important and could be general served for
>>>policy
>>> capability and analytic feature.
>>> 
>>> Periodically taken the snapshot independently on each bolt could make
>>> status recoverable from recent history, but from whole topology store
>>>point
>>> of view, this could not hand bolt status dependency exactly.
>>> 
>>> Another point is should the state restore be triggered not only when
>>> topology restarts but also when
>>> a. topology re-balance
>>> b. single bolt movement by underling stream framework for one executor
>>>to
>>> another?
>>> 
>>> Thanks,
>>> Ralph
>>> 
>>> 
>>> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
>>> yonzhang@ebay.com> wrote:
>>> 
>>>> This topic has been discussed offline for a while and it is time we
>>>> document problems and solutions. With clear problem statements and
>>>>proposed
>>>> solutions, I believe we can do better before we start implementing.
>>>> 
>>>> [Problem Statement] For Eagle as real-time big data monitoring
>>>>framework
>>>> evaluating policies efficiently is the core functionality. Most of
>>>> meaningful polices are stateful in that policy evaluation is not
>>>>based on a
>>>> single event but on both previous events and current event. This
>>>> potentially brings 2 fundamental problems, one is policy state loss
>>>>upon
>>>> machine failures or topology restart, the other is lacking history
>>>>data
>>>> upon fresh start. One simple example is for a policy like “from
>>>> userActivity[cmd==‘delete’]time.window(1 month) select user, count()
>>>>as cnt
>>>> group by user having cnt > 1000”, if the task is restarted, the state
>>>>of
>>>> accumulated user/count map is missing. Also when the topology is
>>>>started at
>>>> the first time, the window is empty even if we have historic data in
>>>> database.
>>>> 
>>>> [Proposed Solutions] The natural way of solving the above 2 problems
>>>>is
>>>> 1) do policy state persist periodically and restore policy state after
>>>> task is restarted
>>>> Underlying policy engine should support snapshot and restore
>>>>operations.
>>>> In Siddhi 3.x, it already supports snapshot and restore, though I
>>>>found
>>>> some bugs with their state management.
>>>> https://wso2.org/jira/browse/CEP-1433
>>>> For restore, it is not that straight-forward unless all input events
>>>>to
>>>> policy evaluator are backed by a reliable and rewindable storage like
>>>>Kafka.
>>>> If input events to policy evaluator is backed by Kafka, then each time
>>>> when EAGLE takes snapshot, we records the current offset together and
>>>> persist both of them to deep storage.
>>>> If input events to policy evaluator is not backed by Kafka, then we
>>>>need
>>>> record every event since last snapshot. That looks very expensive.
>>>>Apache
>>>> Flink uses efficient algorithm called stream barrier to do group
>>>> acknowledgement, but in Storm we don’t have this feature. But remember
>>>> Apache Flink requires that each task do snapshot not only for policy
>>>> evaluator.
>>>> 
>>>> 2) transparently load historical data when topology is started at the
>>>> first time
>>>> If policy evaluator accepts data which is already persisted in
>>>>database or
>>>> Kafka, we can provide API to retrieve data from database or Kafka.
>>>>This
>>>> loading is transparent to developer, but developer/user needs to
>>>>specify
>>>> the deep storage for storing historical data.
>>>> 
>>>> Also be aware that if we have the right solution for policy
>>>>evaluator, the
>>>> solution should be also applied to event aggregation.
>>>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>>>> 
>>>> Another aggressive way is to use Flink stream barrier similar solution
>>>> 
>>>>http://data-artisans.com/high-throughput-low-latency-and-exactly-once-s
>>>>tream-processing-with-apache-flink/
>>>> to take snapshot to all eagle tasks(typically spout and bolt) but
>>>>turn off
>>>> storm ACK mechanism.
>>>> 
>>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>>FlatMapper[Seq[AnyRef],
>>>> R] {
>>>> def prepareConfig(config : Config)
>>>> def init
>>>> def fields : Array[String]
>>>> }
>>>> 
>>>> 
>>>> ==>
>>>> 
>>>> 
>>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>>FlatMapper[Seq[AnyRef],
>>>> R] {
>>>> def prepareConfig(config : Config)
>>>> def init
>>>> def fields : Array[String]
>>>> 
>>>> def snapshot : Array[Byte]
>>>> 
>>>> def restore(state: Array[Byte])
>>>> }
>>>> 
>>>> This is pretty much important to Eagle if we want Eagle to be a
>>>>monitoring
>>>> framework with fault-tolerance.
>>>> 
>>>> Thanks
>>>> Edward
>>>> From: Sriskandarajah Suhothayan <su...@wso2.com>>
>>>> Date: Thursday, December 10, 2015 at 9:30
>>>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>>> yonzhang@ebay.com>>
>>>> Cc: 
>>>>"dev@eagle.incubator.apache.org<ma...@eagle.incubator.apache.org>"
>>>> 
>>>><de...@eagle.incubator.apache.org>>
>>>>,
>>>> Edward Zhang 
>>>><yo...@apache.org>>,
>>>> Srinath Perera <sr...@wso2.com>>, WSO2
>>>> Developers' List <de...@wso2.org>>
>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>> 
>>>> Thanks for pointing it out,
>>>> 
>>>> We are looking into this.
>>>> Will update you ASAP
>>>> 
>>>> Suho
>>>> 
>>>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>>>> yonzhang@ebay.com<ma...@ebay.com>> wrote:
>>>> By the way, we use siddhi version 3.0.2.
>>>> 
>>>> Also for tracking this issue, I created jira
>>>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work
>>>>for
>>>> aggregation on time based window
>>>> 
>>>> Thanks
>>>> Edward
>>>> 
>>>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)"
>>>><yonzhang@ebay.com<mailto:
>>>> yonzhang@ebay.com>> wrote:
>>>> 
>>>>> Thanks for this suggestion, Suho.
>>>>> 
>>>>> I did some testing on state persist and restore, looks most of use
>>>>>cases
>>>>> are working except group by. I am not if Siddhi team has known this.
>>>>> 
>>>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>>>> 
>>>> 
>>>>https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a
>>>>20d
>>>>> f9a1f85758168efcb
>>>>> 
>>>>> The query is like the following
>>>>> String cseEventStream = "define stream testStream (timeStamp long,
>>>>>user
>>>>> string, cmd string);";
>>>>>              + String query = "@info(name = 'query1') from
>>>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>>>              + + " select user, timeStamp, count() as cnt"
>>>>>              + + " group by user"
>>>>>              + + " having cnt > 2"
>>>>>              + + " insert all events into outputStream;";
>>>>> 
>>>>> The basic issue could be the following:
>>>>> 1) when taking snapshot, it will persist all Count executor per key.
>>>>>But
>>>>> looks Siddhi adds same Count executor into snapshot list multiple
>>>>> times(The count aggregator elementId is $planName+keyname)
>>>>> 2) when restoring snapshot, it will not restore the Count executor
>>>>>for
>>>>> key because snopshotableList does not have the above key.  That key
>>>>>only
>>>>> is generated when event comes in. When do restoration, we don¹t know
>>>>> previous events.
>>>>> 
>>>>> for (Snapshotable snapshotable : snapshotableList) {
>>>>>  
>>>>>snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>>>>> }
>>>>> 
>>>>> Please let me know if there is some issue with my test program or
>>>>> something is wrong with Siddhi group by/aggregator snapshot
>>>>> 
>>>>> Thanks
>>>>> Edward
>>>>> 
>>>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>>>> <ma...@wso2.com>>>
>>>>> Date: Wednesday, November 25, 2015 at 21:07
>>>>> To: Edward Zhang
>>>>><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>>>> <ma...@apache.org>>>
>>>>> Cc: Srinath Perera <sr...@wso2.com><mailto:
>>>> srinath@wso2.com<ma...@wso2.com>>>, WSO2
>>>>> Developers' List
>>>>><de...@wso2.org><mailto:dev@wso2.org
>>>> <ma...@wso2.org>>>
>>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>>> 
>>>>> Hi
>>>>> 
>>>>> Currently the concept of current event & expired events live within
>>>>>the
>>>>> query and all events going out to a stream are converted back to
>>>>>current
>>>>> events. So its hard for the application to keep track of the window
>>>>>and
>>>>> aggregation states like count, avg, std, etc...
>>>>> Further window implementations can very based on its implementations
>>>>> hence in some cases knowing what events entered and existed will not
>>>>>be
>>>>> enough to recreate the window during failure.
>>>>> 
>>>>> The recommended way to keep track of state in Siddhi is via
>>>>>snapshots,
>>>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>>>> frame. and also buffer a copy of the events sent to siddhi after that
>>>>> snapshot, with this method when there is a failure we should restore
>>>>>the
>>>>> latest snapshot and replay the events which are sent after the last
>>>>> snapshot. The tricky part is the way you buffer events after
>>>>>snapshot,
>>>>> using Kafka and replaying is one option.
>>>>> 
>>>>> Regards
>>>>> Suho
>>>>> 
>>>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>>>> <yo...@apache.org><mailto:
>>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>>> I tried expired events before, it only works for the query without
>>>>> groupby. If the query contains groupby and having clause, then it
>>>>>only
>>>>> emit just expired event when having conditions is satisfied.
>>>>> 
>>>>> For example
>>>>> 
>>>>> String cseEventStream = "define stream TempStream (user string, cmd
>>>>> string);";
>>>>> String query = "@info(name = 'query1') from
>>>>>TempStream#window.length(4) "
>>>>>      + " select user, cmd, count(user) as cnt " +
>>>>>      " group by user " +
>>>>>      "having cnt > 3 "
>>>>>      + " insert all events into DelayedTempStream";
>>>>> 
>>>>> If we send events as follows, it will not generate expired events at
>>>>>all.
>>>>> 
>>>>> inputHandler.send(new Object[]{"user", "open1"});
>>>>> inputHandler.send(new Object[]{"user", "open2"});
>>>>> inputHandler.send(new Object[]{"user", "open3"});
>>>>> inputHandler.send(new Object[]{"user", "open4"});
>>>>> inputHandler.send(new Object[]{"user", "open5"});
>>>>> 
>>>>> 
>>>>> Thanks
>>>>> Edward Zhang
>>>>> 
>>>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>>>> <sr...@wso2.com><mailto:srinath@wso2.com
>>>> <ma...@wso2.com>>> wrote:
>>>>> Adding Suho
>>>>> 
>>>>> Hi Edward,
>>>>> 
>>>>> Each window give you a stream of expired events as well. Would that
>>>>>work?
>>>>> 
>>>>> 
>>>> 
>>>>https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.
>>>>0-W
>>>>> indow
>>>>> 
>>>>> Thank
>>>>> Srinath
>>>>> 
>>>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>>>> <yo...@apache.org><mailto:
>>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>>> Hi Siddhi team,
>>>>> 
>>>>> Do we have anyway of tracking what events are removed from any type
>>>>>of
>>>>> windows, length(batch), or time(batch)? I investigated that
>>>>>removeEvents
>>>>> may not be the right solution.
>>>>> 
>>>>> We have one requirement of migrating policy from one machine to
>>>>>another
>>>>> machine but keeping internal state there.
>>>>> 
>>>>> Eagle uses policy in storm infrastructure, but one machine which
>>>>>holds
>>>>> the policy fails, then already-populated events in the window also
>>>>>are
>>>>> gone. Sometimes it is bad especially when we have built up a long
>>>>>window
>>>>> like monthly data.
>>>>> 
>>>>> One possible way is to keep all events in the siddhi window to be
>>>>> snapshotted into application domain. Another way is to keep tracking
>>>>>what
>>>>> events are coming in and out, so application can track what are left
>>>>>in
>>>>> siddhi window.
>>>>> 
>>>>> Here is the ticket for Eagle
>>>>> https://issues.apache.org/jira/browse/EAGLE-39
>>>>> 
>>>>> Do you have similar request before? Or what do you suggest?
>>>>> 
>>>>> Thanks
>>>>> Edward Zhang
>>>>> 
>>>>> _______________________________________________
>>>>> Dev mailing list
>>>>> 
>>>>>Dev@wso2.org<ma...@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.
>>>>>org
>>>>>> 
>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> ============================
>>>>> Srinath Perera, Ph.D.
>>>>> http://people.apache.org/~hemapani/
>>>>> http://srinathsview.blogspot.com/
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> S. Suhothayan
>>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>>> <http://wso2.com/>
>>>>> lean . enterprise . middleware
>>>>> 
>>>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
>>>> http://suhothayan.blogspot.com/
>>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>>> http://lk.linkedin.com/in/suhothayan
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> S. Suhothayan
>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>> <http://wso2.com/>
>>>> lean . enterprise . middleware
>>>> 
>>>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>> http://lk.linkedin.com/in/suhothayan
>>>> 
>> 
>


Re: [Discuss] Eagle Policy State Management

Posted by "Zhang, Edward (GDI Hadoop)" <yo...@ebay.com>.
That’s cool, I will look into those new features. But does that provide
automatically snapshot and restore?

Thanks
Edward

On 12/11/15, 12:18, "P. Taylor Goetz" <pt...@gmail.com> wrote:

>In Storm 1.0 (which we hope to release in the next month or so) adds
>distributed cache/blobstore functionality that could be leveraged to
>solve a lot of the problems described in this thread. Another relevant
>feature is native windowing with persistent state (currently under
>development).
>
>Documentation of these features is a little light, but I’ll try to
>forward it on to this list when it’s more fully baked.
>
>-Taylor
>
>> On Dec 11, 2015, at 2:27 PM, Julian Hyde <jh...@apache.org> wrote:
>> 
>> State management of streams (including what I’d call “derived streams”)
>>is a hard distributed systems problem. Ideally it would be solved by the
>>stream provider, not by the Eagle project. I think you should talk to
>>the various streaming projects ― Storm, Samza, Kafka, Flink ― and find
>>out whether they can solve this, or whether it is on their roadmap.
>> 
>> I can make introductions to the leaders of those projects if needed.
>> 
>> If the problem is solved at source, Eagle can focus on the actual
>>problem rather than infrastructure.
>> 
>> Julian
>> 
>> 
>>> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <su...@gmail.com> wrote:
>>> 
>>> Great proposal, this is important and could be general served for
>>>policy
>>> capability and analytic feature.
>>> 
>>> Periodically taken the snapshot independently on each bolt could make
>>> status recoverable from recent history, but from whole topology store
>>>point
>>> of view, this could not hand bolt status dependency exactly.
>>> 
>>> Another point is should the state restore be triggered not only when
>>> topology restarts but also when
>>> a. topology re-balance
>>> b. single bolt movement by underling stream framework for one executor
>>>to
>>> another?
>>> 
>>> Thanks,
>>> Ralph
>>> 
>>> 
>>> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
>>> yonzhang@ebay.com> wrote:
>>> 
>>>> This topic has been discussed offline for a while and it is time we
>>>> document problems and solutions. With clear problem statements and
>>>>proposed
>>>> solutions, I believe we can do better before we start implementing.
>>>> 
>>>> [Problem Statement] For Eagle as real-time big data monitoring
>>>>framework
>>>> evaluating policies efficiently is the core functionality. Most of
>>>> meaningful polices are stateful in that policy evaluation is not
>>>>based on a
>>>> single event but on both previous events and current event. This
>>>> potentially brings 2 fundamental problems, one is policy state loss
>>>>upon
>>>> machine failures or topology restart, the other is lacking history
>>>>data
>>>> upon fresh start. One simple example is for a policy like “from
>>>> userActivity[cmd==‘delete’]time.window(1 month) select user, count()
>>>>as cnt
>>>> group by user having cnt > 1000”, if the task is restarted, the state
>>>>of
>>>> accumulated user/count map is missing. Also when the topology is
>>>>started at
>>>> the first time, the window is empty even if we have historic data in
>>>> database.
>>>> 
>>>> [Proposed Solutions] The natural way of solving the above 2 problems
>>>>is
>>>> 1) do policy state persist periodically and restore policy state after
>>>> task is restarted
>>>> Underlying policy engine should support snapshot and restore
>>>>operations.
>>>> In Siddhi 3.x, it already supports snapshot and restore, though I
>>>>found
>>>> some bugs with their state management.
>>>> https://wso2.org/jira/browse/CEP-1433
>>>> For restore, it is not that straight-forward unless all input events
>>>>to
>>>> policy evaluator are backed by a reliable and rewindable storage like
>>>>Kafka.
>>>> If input events to policy evaluator is backed by Kafka, then each time
>>>> when EAGLE takes snapshot, we records the current offset together and
>>>> persist both of them to deep storage.
>>>> If input events to policy evaluator is not backed by Kafka, then we
>>>>need
>>>> record every event since last snapshot. That looks very expensive.
>>>>Apache
>>>> Flink uses efficient algorithm called stream barrier to do group
>>>> acknowledgement, but in Storm we don’t have this feature. But remember
>>>> Apache Flink requires that each task do snapshot not only for policy
>>>> evaluator.
>>>> 
>>>> 2) transparently load historical data when topology is started at the
>>>> first time
>>>> If policy evaluator accepts data which is already persisted in
>>>>database or
>>>> Kafka, we can provide API to retrieve data from database or Kafka.
>>>>This
>>>> loading is transparent to developer, but developer/user needs to
>>>>specify
>>>> the deep storage for storing historical data.
>>>> 
>>>> Also be aware that if we have the right solution for policy
>>>>evaluator, the
>>>> solution should be also applied to event aggregation.
>>>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>>>> 
>>>> Another aggressive way is to use Flink stream barrier similar solution
>>>> 
>>>>http://data-artisans.com/high-throughput-low-latency-and-exactly-once-s
>>>>tream-processing-with-apache-flink/
>>>> to take snapshot to all eagle tasks(typically spout and bolt) but
>>>>turn off
>>>> storm ACK mechanism.
>>>> 
>>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>>FlatMapper[Seq[AnyRef],
>>>> R] {
>>>> def prepareConfig(config : Config)
>>>> def init
>>>> def fields : Array[String]
>>>> }
>>>> 
>>>> 
>>>> ==>
>>>> 
>>>> 
>>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>>FlatMapper[Seq[AnyRef],
>>>> R] {
>>>> def prepareConfig(config : Config)
>>>> def init
>>>> def fields : Array[String]
>>>> 
>>>> def snapshot : Array[Byte]
>>>> 
>>>> def restore(state: Array[Byte])
>>>> }
>>>> 
>>>> This is pretty much important to Eagle if we want Eagle to be a
>>>>monitoring
>>>> framework with fault-tolerance.
>>>> 
>>>> Thanks
>>>> Edward
>>>> From: Sriskandarajah Suhothayan <su...@wso2.com>>
>>>> Date: Thursday, December 10, 2015 at 9:30
>>>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>>> yonzhang@ebay.com>>
>>>> Cc: 
>>>>"dev@eagle.incubator.apache.org<ma...@eagle.incubator.apache.org>"
>>>> 
>>>><de...@eagle.incubator.apache.org>>
>>>>,
>>>> Edward Zhang 
>>>><yo...@apache.org>>,
>>>> Srinath Perera <sr...@wso2.com>>, WSO2
>>>> Developers' List <de...@wso2.org>>
>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>> 
>>>> Thanks for pointing it out,
>>>> 
>>>> We are looking into this.
>>>> Will update you ASAP
>>>> 
>>>> Suho
>>>> 
>>>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>>>> yonzhang@ebay.com<ma...@ebay.com>> wrote:
>>>> By the way, we use siddhi version 3.0.2.
>>>> 
>>>> Also for tracking this issue, I created jira
>>>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work
>>>>for
>>>> aggregation on time based window
>>>> 
>>>> Thanks
>>>> Edward
>>>> 
>>>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)"
>>>><yonzhang@ebay.com<mailto:
>>>> yonzhang@ebay.com>> wrote:
>>>> 
>>>>> Thanks for this suggestion, Suho.
>>>>> 
>>>>> I did some testing on state persist and restore, looks most of use
>>>>>cases
>>>>> are working except group by. I am not if Siddhi team has known this.
>>>>> 
>>>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>>>> 
>>>> 
>>>>https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a
>>>>20d
>>>>> f9a1f85758168efcb
>>>>> 
>>>>> The query is like the following
>>>>> String cseEventStream = "define stream testStream (timeStamp long,
>>>>>user
>>>>> string, cmd string);";
>>>>>              + String query = "@info(name = 'query1') from
>>>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>>>              + + " select user, timeStamp, count() as cnt"
>>>>>              + + " group by user"
>>>>>              + + " having cnt > 2"
>>>>>              + + " insert all events into outputStream;";
>>>>> 
>>>>> The basic issue could be the following:
>>>>> 1) when taking snapshot, it will persist all Count executor per key.
>>>>>But
>>>>> looks Siddhi adds same Count executor into snapshot list multiple
>>>>> times(The count aggregator elementId is $planName+keyname)
>>>>> 2) when restoring snapshot, it will not restore the Count executor
>>>>>for
>>>>> key because snopshotableList does not have the above key.  That key
>>>>>only
>>>>> is generated when event comes in. When do restoration, we don¹t know
>>>>> previous events.
>>>>> 
>>>>> for (Snapshotable snapshotable : snapshotableList) {
>>>>>  
>>>>>snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>>>>> }
>>>>> 
>>>>> Please let me know if there is some issue with my test program or
>>>>> something is wrong with Siddhi group by/aggregator snapshot
>>>>> 
>>>>> Thanks
>>>>> Edward
>>>>> 
>>>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>>>> <ma...@wso2.com>>>
>>>>> Date: Wednesday, November 25, 2015 at 21:07
>>>>> To: Edward Zhang
>>>>><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>>>> <ma...@apache.org>>>
>>>>> Cc: Srinath Perera <sr...@wso2.com><mailto:
>>>> srinath@wso2.com<ma...@wso2.com>>>, WSO2
>>>>> Developers' List
>>>>><de...@wso2.org><mailto:dev@wso2.org
>>>> <ma...@wso2.org>>>
>>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>>> 
>>>>> Hi
>>>>> 
>>>>> Currently the concept of current event & expired events live within
>>>>>the
>>>>> query and all events going out to a stream are converted back to
>>>>>current
>>>>> events. So its hard for the application to keep track of the window
>>>>>and
>>>>> aggregation states like count, avg, std, etc...
>>>>> Further window implementations can very based on its implementations
>>>>> hence in some cases knowing what events entered and existed will not
>>>>>be
>>>>> enough to recreate the window during failure.
>>>>> 
>>>>> The recommended way to keep track of state in Siddhi is via
>>>>>snapshots,
>>>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>>>> frame. and also buffer a copy of the events sent to siddhi after that
>>>>> snapshot, with this method when there is a failure we should restore
>>>>>the
>>>>> latest snapshot and replay the events which are sent after the last
>>>>> snapshot. The tricky part is the way you buffer events after
>>>>>snapshot,
>>>>> using Kafka and replaying is one option.
>>>>> 
>>>>> Regards
>>>>> Suho
>>>>> 
>>>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>>>> <yo...@apache.org><mailto:
>>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>>> I tried expired events before, it only works for the query without
>>>>> groupby. If the query contains groupby and having clause, then it
>>>>>only
>>>>> emit just expired event when having conditions is satisfied.
>>>>> 
>>>>> For example
>>>>> 
>>>>> String cseEventStream = "define stream TempStream (user string, cmd
>>>>> string);";
>>>>> String query = "@info(name = 'query1') from
>>>>>TempStream#window.length(4) "
>>>>>      + " select user, cmd, count(user) as cnt " +
>>>>>      " group by user " +
>>>>>      "having cnt > 3 "
>>>>>      + " insert all events into DelayedTempStream";
>>>>> 
>>>>> If we send events as follows, it will not generate expired events at
>>>>>all.
>>>>> 
>>>>> inputHandler.send(new Object[]{"user", "open1"});
>>>>> inputHandler.send(new Object[]{"user", "open2"});
>>>>> inputHandler.send(new Object[]{"user", "open3"});
>>>>> inputHandler.send(new Object[]{"user", "open4"});
>>>>> inputHandler.send(new Object[]{"user", "open5"});
>>>>> 
>>>>> 
>>>>> Thanks
>>>>> Edward Zhang
>>>>> 
>>>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>>>> <sr...@wso2.com><mailto:srinath@wso2.com
>>>> <ma...@wso2.com>>> wrote:
>>>>> Adding Suho
>>>>> 
>>>>> Hi Edward,
>>>>> 
>>>>> Each window give you a stream of expired events as well. Would that
>>>>>work?
>>>>> 
>>>>> 
>>>> 
>>>>https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.
>>>>0-W
>>>>> indow
>>>>> 
>>>>> Thank
>>>>> Srinath
>>>>> 
>>>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>>>> <yo...@apache.org><mailto:
>>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>>> Hi Siddhi team,
>>>>> 
>>>>> Do we have anyway of tracking what events are removed from any type
>>>>>of
>>>>> windows, length(batch), or time(batch)? I investigated that
>>>>>removeEvents
>>>>> may not be the right solution.
>>>>> 
>>>>> We have one requirement of migrating policy from one machine to
>>>>>another
>>>>> machine but keeping internal state there.
>>>>> 
>>>>> Eagle uses policy in storm infrastructure, but one machine which
>>>>>holds
>>>>> the policy fails, then already-populated events in the window also
>>>>>are
>>>>> gone. Sometimes it is bad especially when we have built up a long
>>>>>window
>>>>> like monthly data.
>>>>> 
>>>>> One possible way is to keep all events in the siddhi window to be
>>>>> snapshotted into application domain. Another way is to keep tracking
>>>>>what
>>>>> events are coming in and out, so application can track what are left
>>>>>in
>>>>> siddhi window.
>>>>> 
>>>>> Here is the ticket for Eagle
>>>>> https://issues.apache.org/jira/browse/EAGLE-39
>>>>> 
>>>>> Do you have similar request before? Or what do you suggest?
>>>>> 
>>>>> Thanks
>>>>> Edward Zhang
>>>>> 
>>>>> _______________________________________________
>>>>> Dev mailing list
>>>>> 
>>>>>Dev@wso2.org<ma...@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.
>>>>>org
>>>>>> 
>>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> ============================
>>>>> Srinath Perera, Ph.D.
>>>>> http://people.apache.org/~hemapani/
>>>>> http://srinathsview.blogspot.com/
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>>> --
>>>>> S. Suhothayan
>>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>>> <http://wso2.com/>
>>>>> lean . enterprise . middleware
>>>>> 
>>>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
>>>> http://suhothayan.blogspot.com/
>>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>>> http://lk.linkedin.com/in/suhothayan
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> S. Suhothayan
>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>> <http://wso2.com/>
>>>> lean . enterprise . middleware
>>>> 
>>>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>> http://lk.linkedin.com/in/suhothayan
>>>> 
>> 
>


Re: [Discuss] Eagle Policy State Management

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
In Storm 1.0 (which we hope to release in the next month or so) adds distributed cache/blobstore functionality that could be leveraged to solve a lot of the problems described in this thread. Another relevant feature is native windowing with persistent state (currently under development).

Documentation of these features is a little light, but I’ll try to forward it on to this list when it’s more fully baked.

-Taylor

> On Dec 11, 2015, at 2:27 PM, Julian Hyde <jh...@apache.org> wrote:
> 
> State management of streams (including what I’d call “derived streams”) is a hard distributed systems problem. Ideally it would be solved by the stream provider, not by the Eagle project. I think you should talk to the various streaming projects — Storm, Samza, Kafka, Flink — and find out whether they can solve this, or whether it is on their roadmap.
> 
> I can make introductions to the leaders of those projects if needed.
> 
> If the problem is solved at source, Eagle can focus on the actual problem rather than infrastructure.
> 
> Julian
> 
> 
>> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <su...@gmail.com> wrote:
>> 
>> Great proposal, this is important and could be general served for policy
>> capability and analytic feature.
>> 
>> Periodically taken the snapshot independently on each bolt could make
>> status recoverable from recent history, but from whole topology store point
>> of view, this could not hand bolt status dependency exactly.
>> 
>> Another point is should the state restore be triggered not only when
>> topology restarts but also when
>> a. topology re-balance
>> b. single bolt movement by underling stream framework for one executor to
>> another?
>> 
>> Thanks,
>> Ralph
>> 
>> 
>> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
>> yonzhang@ebay.com> wrote:
>> 
>>> This topic has been discussed offline for a while and it is time we
>>> document problems and solutions. With clear problem statements and proposed
>>> solutions, I believe we can do better before we start implementing.
>>> 
>>> [Problem Statement] For Eagle as real-time big data monitoring framework
>>> evaluating policies efficiently is the core functionality. Most of
>>> meaningful polices are stateful in that policy evaluation is not based on a
>>> single event but on both previous events and current event. This
>>> potentially brings 2 fundamental problems, one is policy state loss upon
>>> machine failures or topology restart, the other is lacking history data
>>> upon fresh start. One simple example is for a policy like “from
>>> userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt
>>> group by user having cnt > 1000”, if the task is restarted, the state of
>>> accumulated user/count map is missing. Also when the topology is started at
>>> the first time, the window is empty even if we have historic data in
>>> database.
>>> 
>>> [Proposed Solutions] The natural way of solving the above 2 problems is
>>> 1) do policy state persist periodically and restore policy state after
>>> task is restarted
>>> Underlying policy engine should support snapshot and restore operations.
>>> In Siddhi 3.x, it already supports snapshot and restore, though I found
>>> some bugs with their state management.
>>> https://wso2.org/jira/browse/CEP-1433
>>> For restore, it is not that straight-forward unless all input events to
>>> policy evaluator are backed by a reliable and rewindable storage like Kafka.
>>> If input events to policy evaluator is backed by Kafka, then each time
>>> when EAGLE takes snapshot, we records the current offset together and
>>> persist both of them to deep storage.
>>> If input events to policy evaluator is not backed by Kafka, then we need
>>> record every event since last snapshot. That looks very expensive. Apache
>>> Flink uses efficient algorithm called stream barrier to do group
>>> acknowledgement, but in Storm we don’t have this feature. But remember
>>> Apache Flink requires that each task do snapshot not only for policy
>>> evaluator.
>>> 
>>> 2) transparently load historical data when topology is started at the
>>> first time
>>> If policy evaluator accepts data which is already persisted in database or
>>> Kafka, we can provide API to retrieve data from database or Kafka. This
>>> loading is transparent to developer, but developer/user needs to specify
>>> the deep storage for storing historical data.
>>> 
>>> Also be aware that if we have the right solution for policy evaluator, the
>>> solution should be also applied to event aggregation.
>>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>>> 
>>> Another aggressive way is to use Flink stream barrier similar solution
>>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
>>> to take snapshot to all eagle tasks(typically spout and bolt) but turn off
>>> storm ACK mechanism.
>>> 
>>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>>> R] {
>>> def prepareConfig(config : Config)
>>> def init
>>> def fields : Array[String]
>>> }
>>> 
>>> 
>>> ==>
>>> 
>>> 
>>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>>> R] {
>>> def prepareConfig(config : Config)
>>> def init
>>> def fields : Array[String]
>>> 
>>> def snapshot : Array[Byte]
>>> 
>>> def restore(state: Array[Byte])
>>> }
>>> 
>>> This is pretty much important to Eagle if we want Eagle to be a monitoring
>>> framework with fault-tolerance.
>>> 
>>> Thanks
>>> Edward
>>> From: Sriskandarajah Suhothayan <su...@wso2.com>>
>>> Date: Thursday, December 10, 2015 at 9:30
>>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>> yonzhang@ebay.com>>
>>> Cc: "dev@eagle.incubator.apache.org<ma...@eagle.incubator.apache.org>"
>>> <de...@eagle.incubator.apache.org>>,
>>> Edward Zhang <yo...@apache.org>>,
>>> Srinath Perera <sr...@wso2.com>>, WSO2
>>> Developers' List <de...@wso2.org>>
>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>> 
>>> Thanks for pointing it out,
>>> 
>>> We are looking into this.
>>> Will update you ASAP
>>> 
>>> Suho
>>> 
>>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>>> yonzhang@ebay.com<ma...@ebay.com>> wrote:
>>> By the way, we use siddhi version 3.0.2.
>>> 
>>> Also for tracking this issue, I created jira
>>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for
>>> aggregation on time based window
>>> 
>>> Thanks
>>> Edward
>>> 
>>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>> yonzhang@ebay.com>> wrote:
>>> 
>>>> Thanks for this suggestion, Suho.
>>>> 
>>>> I did some testing on state persist and restore, looks most of use cases
>>>> are working except group by. I am not if Siddhi team has known this.
>>>> 
>>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>>> 
>>> https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d
>>>> f9a1f85758168efcb
>>>> 
>>>> The query is like the following
>>>> String cseEventStream = "define stream testStream (timeStamp long, user
>>>> string, cmd string);";
>>>>              + String query = "@info(name = 'query1') from
>>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>>              + + " select user, timeStamp, count() as cnt"
>>>>              + + " group by user"
>>>>              + + " having cnt > 2"
>>>>              + + " insert all events into outputStream;";
>>>> 
>>>> The basic issue could be the following:
>>>> 1) when taking snapshot, it will persist all Count executor per key. But
>>>> looks Siddhi adds same Count executor into snapshot list multiple
>>>> times(The count aggregator elementId is $planName+keyname)
>>>> 2) when restoring snapshot, it will not restore the Count executor for
>>>> key because snopshotableList does not have the above key.  That key only
>>>> is generated when event comes in. When do restoration, we don¹t know
>>>> previous events.
>>>> 
>>>> for (Snapshotable snapshotable : snapshotableList) {
>>>>  snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>>>> }
>>>> 
>>>> Please let me know if there is some issue with my test program or
>>>> something is wrong with Siddhi group by/aggregator snapshot
>>>> 
>>>> Thanks
>>>> Edward
>>>> 
>>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>>> <ma...@wso2.com>>>
>>>> Date: Wednesday, November 25, 2015 at 21:07
>>>> To: Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>>> <ma...@apache.org>>>
>>>> Cc: Srinath Perera <sr...@wso2.com><mailto:
>>> srinath@wso2.com<ma...@wso2.com>>>, WSO2
>>>> Developers' List <de...@wso2.org><mailto:dev@wso2.org
>>> <ma...@wso2.org>>>
>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>> 
>>>> Hi
>>>> 
>>>> Currently the concept of current event & expired events live within the
>>>> query and all events going out to a stream are converted back to current
>>>> events. So its hard for the application to keep track of the window and
>>>> aggregation states like count, avg, std, etc...
>>>> Further window implementations can very based on its implementations
>>>> hence in some cases knowing what events entered and existed will not be
>>>> enough to recreate the window during failure.
>>>> 
>>>> The recommended way to keep track of state in Siddhi is via snapshots,
>>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>>> frame. and also buffer a copy of the events sent to siddhi after that
>>>> snapshot, with this method when there is a failure we should restore the
>>>> latest snapshot and replay the events which are sent after the last
>>>> snapshot. The tricky part is the way you buffer events after snapshot,
>>>> using Kafka and replaying is one option.
>>>> 
>>>> Regards
>>>> Suho
>>>> 
>>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>>> <yo...@apache.org><mailto:
>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>> I tried expired events before, it only works for the query without
>>>> groupby. If the query contains groupby and having clause, then it only
>>>> emit just expired event when having conditions is satisfied.
>>>> 
>>>> For example
>>>> 
>>>> String cseEventStream = "define stream TempStream (user string, cmd
>>>> string);";
>>>> String query = "@info(name = 'query1') from TempStream#window.length(4) "
>>>>      + " select user, cmd, count(user) as cnt " +
>>>>      " group by user " +
>>>>      "having cnt > 3 "
>>>>      + " insert all events into DelayedTempStream";
>>>> 
>>>> If we send events as follows, it will not generate expired events at all.
>>>> 
>>>> inputHandler.send(new Object[]{"user", "open1"});
>>>> inputHandler.send(new Object[]{"user", "open2"});
>>>> inputHandler.send(new Object[]{"user", "open3"});
>>>> inputHandler.send(new Object[]{"user", "open4"});
>>>> inputHandler.send(new Object[]{"user", "open5"});
>>>> 
>>>> 
>>>> Thanks
>>>> Edward Zhang
>>>> 
>>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>>> <sr...@wso2.com><mailto:srinath@wso2.com
>>> <ma...@wso2.com>>> wrote:
>>>> Adding Suho
>>>> 
>>>> Hi Edward,
>>>> 
>>>> Each window give you a stream of expired events as well. Would that work?
>>>> 
>>>> 
>>> https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W
>>>> indow
>>>> 
>>>> Thank
>>>> Srinath
>>>> 
>>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>>> <yo...@apache.org><mailto:
>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>> Hi Siddhi team,
>>>> 
>>>> Do we have anyway of tracking what events are removed from any type of
>>>> windows, length(batch), or time(batch)? I investigated that removeEvents
>>>> may not be the right solution.
>>>> 
>>>> We have one requirement of migrating policy from one machine to another
>>>> machine but keeping internal state there.
>>>> 
>>>> Eagle uses policy in storm infrastructure, but one machine which holds
>>>> the policy fails, then already-populated events in the window also are
>>>> gone. Sometimes it is bad especially when we have built up a long window
>>>> like monthly data.
>>>> 
>>>> One possible way is to keep all events in the siddhi window to be
>>>> snapshotted into application domain. Another way is to keep tracking what
>>>> events are coming in and out, so application can track what are left in
>>>> siddhi window.
>>>> 
>>>> Here is the ticket for Eagle
>>>> https://issues.apache.org/jira/browse/EAGLE-39
>>>> 
>>>> Do you have similar request before? Or what do you suggest?
>>>> 
>>>> Thanks
>>>> Edward Zhang
>>>> 
>>>> _______________________________________________
>>>> Dev mailing list
>>>> Dev@wso2.org<ma...@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.org
>>>>> 
>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> ============================
>>>> Srinath Perera, Ph.D.
>>>> http://people.apache.org/~hemapani/
>>>> http://srinathsview.blogspot.com/
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> S. Suhothayan
>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>> <http://wso2.com/>
>>>> lean . enterprise . middleware
>>>> 
>>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
>>> http://suhothayan.blogspot.com/
>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>> http://lk.linkedin.com/in/suhothayan
>>> 
>>> 
>>> 
>>> 
>>> --
>>> S. Suhothayan
>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>> <http://wso2.com/>
>>> lean . enterprise . middleware
>>> 
>>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>>> twitter: http://twitter.com/suhothayan | linked-in:
>>> http://lk.linkedin.com/in/suhothayan
>>> 
> 


Re: [Discuss] Eagle Policy State Management

Posted by "P. Taylor Goetz" <pt...@gmail.com>.
In Storm 1.0 (which we hope to release in the next month or so) adds distributed cache/blobstore functionality that could be leveraged to solve a lot of the problems described in this thread. Another relevant feature is native windowing with persistent state (currently under development).

Documentation of these features is a little light, but I’ll try to forward it on to this list when it’s more fully baked.

-Taylor

> On Dec 11, 2015, at 2:27 PM, Julian Hyde <jh...@apache.org> wrote:
> 
> State management of streams (including what I’d call “derived streams”) is a hard distributed systems problem. Ideally it would be solved by the stream provider, not by the Eagle project. I think you should talk to the various streaming projects — Storm, Samza, Kafka, Flink — and find out whether they can solve this, or whether it is on their roadmap.
> 
> I can make introductions to the leaders of those projects if needed.
> 
> If the problem is solved at source, Eagle can focus on the actual problem rather than infrastructure.
> 
> Julian
> 
> 
>> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <su...@gmail.com> wrote:
>> 
>> Great proposal, this is important and could be general served for policy
>> capability and analytic feature.
>> 
>> Periodically taken the snapshot independently on each bolt could make
>> status recoverable from recent history, but from whole topology store point
>> of view, this could not hand bolt status dependency exactly.
>> 
>> Another point is should the state restore be triggered not only when
>> topology restarts but also when
>> a. topology re-balance
>> b. single bolt movement by underling stream framework for one executor to
>> another?
>> 
>> Thanks,
>> Ralph
>> 
>> 
>> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
>> yonzhang@ebay.com> wrote:
>> 
>>> This topic has been discussed offline for a while and it is time we
>>> document problems and solutions. With clear problem statements and proposed
>>> solutions, I believe we can do better before we start implementing.
>>> 
>>> [Problem Statement] For Eagle as real-time big data monitoring framework
>>> evaluating policies efficiently is the core functionality. Most of
>>> meaningful polices are stateful in that policy evaluation is not based on a
>>> single event but on both previous events and current event. This
>>> potentially brings 2 fundamental problems, one is policy state loss upon
>>> machine failures or topology restart, the other is lacking history data
>>> upon fresh start. One simple example is for a policy like “from
>>> userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt
>>> group by user having cnt > 1000”, if the task is restarted, the state of
>>> accumulated user/count map is missing. Also when the topology is started at
>>> the first time, the window is empty even if we have historic data in
>>> database.
>>> 
>>> [Proposed Solutions] The natural way of solving the above 2 problems is
>>> 1) do policy state persist periodically and restore policy state after
>>> task is restarted
>>> Underlying policy engine should support snapshot and restore operations.
>>> In Siddhi 3.x, it already supports snapshot and restore, though I found
>>> some bugs with their state management.
>>> https://wso2.org/jira/browse/CEP-1433
>>> For restore, it is not that straight-forward unless all input events to
>>> policy evaluator are backed by a reliable and rewindable storage like Kafka.
>>> If input events to policy evaluator is backed by Kafka, then each time
>>> when EAGLE takes snapshot, we records the current offset together and
>>> persist both of them to deep storage.
>>> If input events to policy evaluator is not backed by Kafka, then we need
>>> record every event since last snapshot. That looks very expensive. Apache
>>> Flink uses efficient algorithm called stream barrier to do group
>>> acknowledgement, but in Storm we don’t have this feature. But remember
>>> Apache Flink requires that each task do snapshot not only for policy
>>> evaluator.
>>> 
>>> 2) transparently load historical data when topology is started at the
>>> first time
>>> If policy evaluator accepts data which is already persisted in database or
>>> Kafka, we can provide API to retrieve data from database or Kafka. This
>>> loading is transparent to developer, but developer/user needs to specify
>>> the deep storage for storing historical data.
>>> 
>>> Also be aware that if we have the right solution for policy evaluator, the
>>> solution should be also applied to event aggregation.
>>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>>> 
>>> Another aggressive way is to use Flink stream barrier similar solution
>>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
>>> to take snapshot to all eagle tasks(typically spout and bolt) but turn off
>>> storm ACK mechanism.
>>> 
>>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>>> R] {
>>> def prepareConfig(config : Config)
>>> def init
>>> def fields : Array[String]
>>> }
>>> 
>>> 
>>> ==>
>>> 
>>> 
>>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>>> R] {
>>> def prepareConfig(config : Config)
>>> def init
>>> def fields : Array[String]
>>> 
>>> def snapshot : Array[Byte]
>>> 
>>> def restore(state: Array[Byte])
>>> }
>>> 
>>> This is pretty much important to Eagle if we want Eagle to be a monitoring
>>> framework with fault-tolerance.
>>> 
>>> Thanks
>>> Edward
>>> From: Sriskandarajah Suhothayan <su...@wso2.com>>
>>> Date: Thursday, December 10, 2015 at 9:30
>>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>> yonzhang@ebay.com>>
>>> Cc: "dev@eagle.incubator.apache.org<ma...@eagle.incubator.apache.org>"
>>> <de...@eagle.incubator.apache.org>>,
>>> Edward Zhang <yo...@apache.org>>,
>>> Srinath Perera <sr...@wso2.com>>, WSO2
>>> Developers' List <de...@wso2.org>>
>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>> 
>>> Thanks for pointing it out,
>>> 
>>> We are looking into this.
>>> Will update you ASAP
>>> 
>>> Suho
>>> 
>>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>>> yonzhang@ebay.com<ma...@ebay.com>> wrote:
>>> By the way, we use siddhi version 3.0.2.
>>> 
>>> Also for tracking this issue, I created jira
>>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for
>>> aggregation on time based window
>>> 
>>> Thanks
>>> Edward
>>> 
>>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>> yonzhang@ebay.com>> wrote:
>>> 
>>>> Thanks for this suggestion, Suho.
>>>> 
>>>> I did some testing on state persist and restore, looks most of use cases
>>>> are working except group by. I am not if Siddhi team has known this.
>>>> 
>>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>>> 
>>> https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d
>>>> f9a1f85758168efcb
>>>> 
>>>> The query is like the following
>>>> String cseEventStream = "define stream testStream (timeStamp long, user
>>>> string, cmd string);";
>>>>              + String query = "@info(name = 'query1') from
>>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>>              + + " select user, timeStamp, count() as cnt"
>>>>              + + " group by user"
>>>>              + + " having cnt > 2"
>>>>              + + " insert all events into outputStream;";
>>>> 
>>>> The basic issue could be the following:
>>>> 1) when taking snapshot, it will persist all Count executor per key. But
>>>> looks Siddhi adds same Count executor into snapshot list multiple
>>>> times(The count aggregator elementId is $planName+keyname)
>>>> 2) when restoring snapshot, it will not restore the Count executor for
>>>> key because snopshotableList does not have the above key.  That key only
>>>> is generated when event comes in. When do restoration, we don¹t know
>>>> previous events.
>>>> 
>>>> for (Snapshotable snapshotable : snapshotableList) {
>>>>  snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>>>> }
>>>> 
>>>> Please let me know if there is some issue with my test program or
>>>> something is wrong with Siddhi group by/aggregator snapshot
>>>> 
>>>> Thanks
>>>> Edward
>>>> 
>>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>>> <ma...@wso2.com>>>
>>>> Date: Wednesday, November 25, 2015 at 21:07
>>>> To: Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>>> <ma...@apache.org>>>
>>>> Cc: Srinath Perera <sr...@wso2.com><mailto:
>>> srinath@wso2.com<ma...@wso2.com>>>, WSO2
>>>> Developers' List <de...@wso2.org><mailto:dev@wso2.org
>>> <ma...@wso2.org>>>
>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>> 
>>>> Hi
>>>> 
>>>> Currently the concept of current event & expired events live within the
>>>> query and all events going out to a stream are converted back to current
>>>> events. So its hard for the application to keep track of the window and
>>>> aggregation states like count, avg, std, etc...
>>>> Further window implementations can very based on its implementations
>>>> hence in some cases knowing what events entered and existed will not be
>>>> enough to recreate the window during failure.
>>>> 
>>>> The recommended way to keep track of state in Siddhi is via snapshots,
>>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>>> frame. and also buffer a copy of the events sent to siddhi after that
>>>> snapshot, with this method when there is a failure we should restore the
>>>> latest snapshot and replay the events which are sent after the last
>>>> snapshot. The tricky part is the way you buffer events after snapshot,
>>>> using Kafka and replaying is one option.
>>>> 
>>>> Regards
>>>> Suho
>>>> 
>>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>>> <yo...@apache.org><mailto:
>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>> I tried expired events before, it only works for the query without
>>>> groupby. If the query contains groupby and having clause, then it only
>>>> emit just expired event when having conditions is satisfied.
>>>> 
>>>> For example
>>>> 
>>>> String cseEventStream = "define stream TempStream (user string, cmd
>>>> string);";
>>>> String query = "@info(name = 'query1') from TempStream#window.length(4) "
>>>>      + " select user, cmd, count(user) as cnt " +
>>>>      " group by user " +
>>>>      "having cnt > 3 "
>>>>      + " insert all events into DelayedTempStream";
>>>> 
>>>> If we send events as follows, it will not generate expired events at all.
>>>> 
>>>> inputHandler.send(new Object[]{"user", "open1"});
>>>> inputHandler.send(new Object[]{"user", "open2"});
>>>> inputHandler.send(new Object[]{"user", "open3"});
>>>> inputHandler.send(new Object[]{"user", "open4"});
>>>> inputHandler.send(new Object[]{"user", "open5"});
>>>> 
>>>> 
>>>> Thanks
>>>> Edward Zhang
>>>> 
>>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>>> <sr...@wso2.com><mailto:srinath@wso2.com
>>> <ma...@wso2.com>>> wrote:
>>>> Adding Suho
>>>> 
>>>> Hi Edward,
>>>> 
>>>> Each window give you a stream of expired events as well. Would that work?
>>>> 
>>>> 
>>> https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W
>>>> indow
>>>> 
>>>> Thank
>>>> Srinath
>>>> 
>>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>>> <yo...@apache.org><mailto:
>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>> Hi Siddhi team,
>>>> 
>>>> Do we have anyway of tracking what events are removed from any type of
>>>> windows, length(batch), or time(batch)? I investigated that removeEvents
>>>> may not be the right solution.
>>>> 
>>>> We have one requirement of migrating policy from one machine to another
>>>> machine but keeping internal state there.
>>>> 
>>>> Eagle uses policy in storm infrastructure, but one machine which holds
>>>> the policy fails, then already-populated events in the window also are
>>>> gone. Sometimes it is bad especially when we have built up a long window
>>>> like monthly data.
>>>> 
>>>> One possible way is to keep all events in the siddhi window to be
>>>> snapshotted into application domain. Another way is to keep tracking what
>>>> events are coming in and out, so application can track what are left in
>>>> siddhi window.
>>>> 
>>>> Here is the ticket for Eagle
>>>> https://issues.apache.org/jira/browse/EAGLE-39
>>>> 
>>>> Do you have similar request before? Or what do you suggest?
>>>> 
>>>> Thanks
>>>> Edward Zhang
>>>> 
>>>> _______________________________________________
>>>> Dev mailing list
>>>> Dev@wso2.org<ma...@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.org
>>>>> 
>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> ============================
>>>> Srinath Perera, Ph.D.
>>>> http://people.apache.org/~hemapani/
>>>> http://srinathsview.blogspot.com/
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> S. Suhothayan
>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>> <http://wso2.com/>
>>>> lean . enterprise . middleware
>>>> 
>>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
>>> http://suhothayan.blogspot.com/
>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>> http://lk.linkedin.com/in/suhothayan
>>> 
>>> 
>>> 
>>> 
>>> --
>>> S. Suhothayan
>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>> <http://wso2.com/>
>>> lean . enterprise . middleware
>>> 
>>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>>> twitter: http://twitter.com/suhothayan | linked-in:
>>> http://lk.linkedin.com/in/suhothayan
>>> 
> 


Re: [Discuss] Eagle Policy State Management

Posted by "Zhang, Edward (GDI Hadoop)" <yo...@ebay.com>.
That is a valid concern which we also are aware. It is perfect if you can
introduce some leaders of those projects for the discussions.

The good thing is that Eagle has abstracted the interface so it is
possible to change underlying streaming infrastructure (but it is not
trivial work)

But as Eagle’s policy framework is the core part which should be resilient
to faults, Eagle still needs some investigation on persisting/restoring
policy state. That is why we have asked Siddhi team to fix some bugs.

Thanks
Edward

On 12/11/15, 11:27, "Julian Hyde" <jh...@apache.org> wrote:

>State management of streams (including what I’d call “derived streams”)
>is a hard distributed systems problem. Ideally it would be solved by the
>stream provider, not by the Eagle project. I think you should talk to the
>various streaming projects ― Storm, Samza, Kafka, Flink ― and find out
>whether they can solve this, or whether it is on their roadmap.
>
>I can make introductions to the leaders of those projects if needed.
>
>If the problem is solved at source, Eagle can focus on the actual problem
>rather than infrastructure.
>
>Julian
>
>
>> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <su...@gmail.com> wrote:
>> 
>> Great proposal, this is important and could be general served for policy
>> capability and analytic feature.
>> 
>> Periodically taken the snapshot independently on each bolt could make
>> status recoverable from recent history, but from whole topology store
>>point
>> of view, this could not hand bolt status dependency exactly.
>> 
>> Another point is should the state restore be triggered not only when
>> topology restarts but also when
>> a. topology re-balance
>> b. single bolt movement by underling stream framework for one executor
>>to
>> another?
>> 
>> Thanks,
>> Ralph
>> 
>> 
>> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
>> yonzhang@ebay.com> wrote:
>> 
>>> This topic has been discussed offline for a while and it is time we
>>> document problems and solutions. With clear problem statements and
>>>proposed
>>> solutions, I believe we can do better before we start implementing.
>>> 
>>> [Problem Statement] For Eagle as real-time big data monitoring
>>>framework
>>> evaluating policies efficiently is the core functionality. Most of
>>> meaningful polices are stateful in that policy evaluation is not based
>>>on a
>>> single event but on both previous events and current event. This
>>> potentially brings 2 fundamental problems, one is policy state loss
>>>upon
>>> machine failures or topology restart, the other is lacking history data
>>> upon fresh start. One simple example is for a policy like “from
>>> userActivity[cmd==‘delete’]time.window(1 month) select user, count()
>>>as cnt
>>> group by user having cnt > 1000”, if the task is restarted, the state
>>>of
>>> accumulated user/count map is missing. Also when the topology is
>>>started at
>>> the first time, the window is empty even if we have historic data in
>>> database.
>>> 
>>> [Proposed Solutions] The natural way of solving the above 2 problems is
>>> 1) do policy state persist periodically and restore policy state after
>>> task is restarted
>>> Underlying policy engine should support snapshot and restore
>>>operations.
>>> In Siddhi 3.x, it already supports snapshot and restore, though I found
>>> some bugs with their state management.
>>> https://wso2.org/jira/browse/CEP-1433
>>> For restore, it is not that straight-forward unless all input events to
>>> policy evaluator are backed by a reliable and rewindable storage like
>>>Kafka.
>>> If input events to policy evaluator is backed by Kafka, then each time
>>> when EAGLE takes snapshot, we records the current offset together and
>>> persist both of them to deep storage.
>>> If input events to policy evaluator is not backed by Kafka, then we
>>>need
>>> record every event since last snapshot. That looks very expensive.
>>>Apache
>>> Flink uses efficient algorithm called stream barrier to do group
>>> acknowledgement, but in Storm we don’t have this feature. But remember
>>> Apache Flink requires that each task do snapshot not only for policy
>>> evaluator.
>>> 
>>> 2) transparently load historical data when topology is started at the
>>> first time
>>> If policy evaluator accepts data which is already persisted in
>>>database or
>>> Kafka, we can provide API to retrieve data from database or Kafka. This
>>> loading is transparent to developer, but developer/user needs to
>>>specify
>>> the deep storage for storing historical data.
>>> 
>>> Also be aware that if we have the right solution for policy evaluator,
>>>the
>>> solution should be also applied to event aggregation.
>>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>>> 
>>> Another aggressive way is to use Flink stream barrier similar solution
>>> 
>>>http://data-artisans.com/high-throughput-low-latency-and-exactly-once-st
>>>ream-processing-with-apache-flink/
>>> to take snapshot to all eagle tasks(typically spout and bolt) but turn
>>>off
>>> storm ACK mechanism.
>>> 
>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>FlatMapper[Seq[AnyRef],
>>> R] {
>>>  def prepareConfig(config : Config)
>>>  def init
>>>  def fields : Array[String]
>>> }
>>> 
>>> 
>>> ==>
>>> 
>>> 
>>> trait StormStreamExecutor[R <: EagleTuple] extends
>>>FlatMapper[Seq[AnyRef],
>>> R] {
>>>  def prepareConfig(config : Config)
>>>  def init
>>>  def fields : Array[String]
>>> 
>>>  def snapshot : Array[Byte]
>>> 
>>>  def restore(state: Array[Byte])
>>> }
>>> 
>>> This is pretty much important to Eagle if we want Eagle to be a
>>>monitoring
>>> framework with fault-tolerance.
>>> 
>>> Thanks
>>> Edward
>>> From: Sriskandarajah Suhothayan <su...@wso2.com>>
>>> Date: Thursday, December 10, 2015 at 9:30
>>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>>> yonzhang@ebay.com>>
>>> Cc: 
>>>"dev@eagle.incubator.apache.org<ma...@eagle.incubator.apache.org>"
>>> 
>>><de...@eagle.incubator.apache.org>>,
>>> Edward Zhang <yo...@apache.org>>,
>>> Srinath Perera <sr...@wso2.com>>, WSO2
>>> Developers' List <de...@wso2.org>>
>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>> 
>>> Thanks for pointing it out,
>>> 
>>> We are looking into this.
>>> Will update you ASAP
>>> 
>>> Suho
>>> 
>>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>>> yonzhang@ebay.com<ma...@ebay.com>> wrote:
>>> By the way, we use siddhi version 3.0.2.
>>> 
>>> Also for tracking this issue, I created jira
>>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work
>>>for
>>> aggregation on time based window
>>> 
>>> Thanks
>>> Edward
>>> 
>>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)"
>>><yonzhang@ebay.com<mailto:
>>> yonzhang@ebay.com>> wrote:
>>> 
>>>> Thanks for this suggestion, Suho.
>>>> 
>>>> I did some testing on state persist and restore, looks most of use
>>>>cases
>>>> are working except group by. I am not if Siddhi team has known this.
>>>> 
>>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>>> 
>>> 
>>>https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a2
>>>0d
>>>> f9a1f85758168efcb
>>>> 
>>>> The query is like the following
>>>> String cseEventStream = "define stream testStream (timeStamp long,
>>>>user
>>>> string, cmd string);";
>>>>               + String query = "@info(name = 'query1') from
>>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>>               + + " select user, timeStamp, count() as cnt"
>>>>               + + " group by user"
>>>>               + + " having cnt > 2"
>>>>               + + " insert all events into outputStream;";
>>>> 
>>>> The basic issue could be the following:
>>>> 1) when taking snapshot, it will persist all Count executor per key.
>>>>But
>>>> looks Siddhi adds same Count executor into snapshot list multiple
>>>> times(The count aggregator elementId is $planName+keyname)
>>>> 2) when restoring snapshot, it will not restore the Count executor for
>>>> key because snopshotableList does not have the above key.  That key
>>>>only
>>>> is generated when event comes in. When do restoration, we don¹t know
>>>> previous events.
>>>> 
>>>> for (Snapshotable snapshotable : snapshotableList) {
>>>>   
>>>>snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>>>> }
>>>> 
>>>> Please let me know if there is some issue with my test program or
>>>> something is wrong with Siddhi group by/aggregator snapshot
>>>> 
>>>> Thanks
>>>> Edward
>>>> 
>>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>>> <ma...@wso2.com>>>
>>>> Date: Wednesday, November 25, 2015 at 21:07
>>>> To: Edward Zhang
>>>><yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>>> <ma...@apache.org>>>
>>>> Cc: Srinath Perera <sr...@wso2.com><mailto:
>>> srinath@wso2.com<ma...@wso2.com>>>, WSO2
>>>> Developers' List
>>>><de...@wso2.org><mailto:dev@wso2.org
>>> <ma...@wso2.org>>>
>>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>>> 
>>>> Hi
>>>> 
>>>> Currently the concept of current event & expired events live within
>>>>the
>>>> query and all events going out to a stream are converted back to
>>>>current
>>>> events. So its hard for the application to keep track of the window
>>>>and
>>>> aggregation states like count, avg, std, etc...
>>>> Further window implementations can very based on its implementations
>>>> hence in some cases knowing what events entered and existed will not
>>>>be
>>>> enough to recreate the window during failure.
>>>> 
>>>> The recommended way to keep track of state in Siddhi is via snapshots,
>>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>>> frame. and also buffer a copy of the events sent to siddhi after that
>>>> snapshot, with this method when there is a failure we should restore
>>>>the
>>>> latest snapshot and replay the events which are sent after the last
>>>> snapshot. The tricky part is the way you buffer events after snapshot,
>>>> using Kafka and replaying is one option.
>>>> 
>>>> Regards
>>>> Suho
>>>> 
>>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>>> <yo...@apache.org><mailto:
>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>> I tried expired events before, it only works for the query without
>>>> groupby. If the query contains groupby and having clause, then it only
>>>> emit just expired event when having conditions is satisfied.
>>>> 
>>>> For example
>>>> 
>>>> String cseEventStream = "define stream TempStream (user string, cmd
>>>> string);";
>>>> String query = "@info(name = 'query1') from
>>>>TempStream#window.length(4) "
>>>>       + " select user, cmd, count(user) as cnt " +
>>>>       " group by user " +
>>>>       "having cnt > 3 "
>>>>       + " insert all events into DelayedTempStream";
>>>> 
>>>> If we send events as follows, it will not generate expired events at
>>>>all.
>>>> 
>>>> inputHandler.send(new Object[]{"user", "open1"});
>>>> inputHandler.send(new Object[]{"user", "open2"});
>>>> inputHandler.send(new Object[]{"user", "open3"});
>>>> inputHandler.send(new Object[]{"user", "open4"});
>>>> inputHandler.send(new Object[]{"user", "open5"});
>>>> 
>>>> 
>>>> Thanks
>>>> Edward Zhang
>>>> 
>>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>>> <sr...@wso2.com><mailto:srinath@wso2.com
>>> <ma...@wso2.com>>> wrote:
>>>> Adding Suho
>>>> 
>>>> Hi Edward,
>>>> 
>>>> Each window give you a stream of expired events as well. Would that
>>>>work?
>>>> 
>>>> 
>>> 
>>>https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0
>>>-W
>>>> indow
>>>> 
>>>> Thank
>>>> Srinath
>>>> 
>>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>>> <yo...@apache.org><mailto:
>>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>>> Hi Siddhi team,
>>>> 
>>>> Do we have anyway of tracking what events are removed from any type of
>>>> windows, length(batch), or time(batch)? I investigated that
>>>>removeEvents
>>>> may not be the right solution.
>>>> 
>>>> We have one requirement of migrating policy from one machine to
>>>>another
>>>> machine but keeping internal state there.
>>>> 
>>>> Eagle uses policy in storm infrastructure, but one machine which holds
>>>> the policy fails, then already-populated events in the window also are
>>>> gone. Sometimes it is bad especially when we have built up a long
>>>>window
>>>> like monthly data.
>>>> 
>>>> One possible way is to keep all events in the siddhi window to be
>>>> snapshotted into application domain. Another way is to keep tracking
>>>>what
>>>> events are coming in and out, so application can track what are left
>>>>in
>>>> siddhi window.
>>>> 
>>>> Here is the ticket for Eagle
>>>> https://issues.apache.org/jira/browse/EAGLE-39
>>>> 
>>>> Do you have similar request before? Or what do you suggest?
>>>> 
>>>> Thanks
>>>> Edward Zhang
>>>> 
>>>> _______________________________________________
>>>> Dev mailing list
>>>> 
>>>>Dev@wso2.org<ma...@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.o
>>>>rg
>>>>> 
>>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> ============================
>>>> Srinath Perera, Ph.D.
>>>>  http://people.apache.org/~hemapani/
>>>>  http://srinathsview.blogspot.com/
>>>> 
>>>> 
>>>> 
>>>> 
>>>> --
>>>> S. Suhothayan
>>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>>> <http://wso2.com/>
>>>> lean . enterprise . middleware
>>>> 
>>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
>>> http://suhothayan.blogspot.com/
>>>> twitter: http://twitter.com/suhothayan | linked-in:
>>>> http://lk.linkedin.com/in/suhothayan
>>> 
>>> 
>>> 
>>> 
>>> --
>>> S. Suhothayan
>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>> <http://wso2.com/>
>>> lean . enterprise . middleware
>>> 
>>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>>> twitter: http://twitter.com/suhothayan | linked-in:
>>> http://lk.linkedin.com/in/suhothayan
>>> 
>


Re: [Discuss] Eagle Policy State Management

Posted by Julian Hyde <jh...@apache.org>.
State management of streams (including what I’d call “derived streams”) is a hard distributed systems problem. Ideally it would be solved by the stream provider, not by the Eagle project. I think you should talk to the various streaming projects — Storm, Samza, Kafka, Flink — and find out whether they can solve this, or whether it is on their roadmap. 

I can make introductions to the leaders of those projects if needed.

If the problem is solved at source, Eagle can focus on the actual problem rather than infrastructure.

Julian


> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <su...@gmail.com> wrote:
> 
> Great proposal, this is important and could be general served for policy
> capability and analytic feature.
> 
> Periodically taken the snapshot independently on each bolt could make
> status recoverable from recent history, but from whole topology store point
> of view, this could not hand bolt status dependency exactly.
> 
> Another point is should the state restore be triggered not only when
> topology restarts but also when
> a. topology re-balance
> b. single bolt movement by underling stream framework for one executor to
> another?
> 
> Thanks,
> Ralph
> 
> 
> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
> yonzhang@ebay.com> wrote:
> 
>> This topic has been discussed offline for a while and it is time we
>> document problems and solutions. With clear problem statements and proposed
>> solutions, I believe we can do better before we start implementing.
>> 
>> [Problem Statement] For Eagle as real-time big data monitoring framework
>> evaluating policies efficiently is the core functionality. Most of
>> meaningful polices are stateful in that policy evaluation is not based on a
>> single event but on both previous events and current event. This
>> potentially brings 2 fundamental problems, one is policy state loss upon
>> machine failures or topology restart, the other is lacking history data
>> upon fresh start. One simple example is for a policy like “from
>> userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt
>> group by user having cnt > 1000”, if the task is restarted, the state of
>> accumulated user/count map is missing. Also when the topology is started at
>> the first time, the window is empty even if we have historic data in
>> database.
>> 
>> [Proposed Solutions] The natural way of solving the above 2 problems is
>> 1) do policy state persist periodically and restore policy state after
>> task is restarted
>> Underlying policy engine should support snapshot and restore operations.
>> In Siddhi 3.x, it already supports snapshot and restore, though I found
>> some bugs with their state management.
>> https://wso2.org/jira/browse/CEP-1433
>> For restore, it is not that straight-forward unless all input events to
>> policy evaluator are backed by a reliable and rewindable storage like Kafka.
>> If input events to policy evaluator is backed by Kafka, then each time
>> when EAGLE takes snapshot, we records the current offset together and
>> persist both of them to deep storage.
>> If input events to policy evaluator is not backed by Kafka, then we need
>> record every event since last snapshot. That looks very expensive. Apache
>> Flink uses efficient algorithm called stream barrier to do group
>> acknowledgement, but in Storm we don’t have this feature. But remember
>> Apache Flink requires that each task do snapshot not only for policy
>> evaluator.
>> 
>> 2) transparently load historical data when topology is started at the
>> first time
>> If policy evaluator accepts data which is already persisted in database or
>> Kafka, we can provide API to retrieve data from database or Kafka. This
>> loading is transparent to developer, but developer/user needs to specify
>> the deep storage for storing historical data.
>> 
>> Also be aware that if we have the right solution for policy evaluator, the
>> solution should be also applied to event aggregation.
>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>> 
>> Another aggressive way is to use Flink stream barrier similar solution
>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
>> to take snapshot to all eagle tasks(typically spout and bolt) but turn off
>> storm ACK mechanism.
>> 
>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>> R] {
>>  def prepareConfig(config : Config)
>>  def init
>>  def fields : Array[String]
>> }
>> 
>> 
>> ==>
>> 
>> 
>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>> R] {
>>  def prepareConfig(config : Config)
>>  def init
>>  def fields : Array[String]
>> 
>>  def snapshot : Array[Byte]
>> 
>>  def restore(state: Array[Byte])
>> }
>> 
>> This is pretty much important to Eagle if we want Eagle to be a monitoring
>> framework with fault-tolerance.
>> 
>> Thanks
>> Edward
>> From: Sriskandarajah Suhothayan <su...@wso2.com>>
>> Date: Thursday, December 10, 2015 at 9:30
>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>> yonzhang@ebay.com>>
>> Cc: "dev@eagle.incubator.apache.org<ma...@eagle.incubator.apache.org>"
>> <de...@eagle.incubator.apache.org>>,
>> Edward Zhang <yo...@apache.org>>,
>> Srinath Perera <sr...@wso2.com>>, WSO2
>> Developers' List <de...@wso2.org>>
>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>> 
>> Thanks for pointing it out,
>> 
>> We are looking into this.
>> Will update you ASAP
>> 
>> Suho
>> 
>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>> yonzhang@ebay.com<ma...@ebay.com>> wrote:
>> By the way, we use siddhi version 3.0.2.
>> 
>> Also for tracking this issue, I created jira
>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for
>> aggregation on time based window
>> 
>> Thanks
>> Edward
>> 
>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>> yonzhang@ebay.com>> wrote:
>> 
>>> Thanks for this suggestion, Suho.
>>> 
>>> I did some testing on state persist and restore, looks most of use cases
>>> are working except group by. I am not if Siddhi team has known this.
>>> 
>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>> 
>> https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d
>>> f9a1f85758168efcb
>>> 
>>> The query is like the following
>>> String cseEventStream = "define stream testStream (timeStamp long, user
>>> string, cmd string);";
>>>               + String query = "@info(name = 'query1') from
>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>               + + " select user, timeStamp, count() as cnt"
>>>               + + " group by user"
>>>               + + " having cnt > 2"
>>>               + + " insert all events into outputStream;";
>>> 
>>> The basic issue could be the following:
>>> 1) when taking snapshot, it will persist all Count executor per key. But
>>> looks Siddhi adds same Count executor into snapshot list multiple
>>> times(The count aggregator elementId is $planName+keyname)
>>> 2) when restoring snapshot, it will not restore the Count executor for
>>> key because snopshotableList does not have the above key.  That key only
>>> is generated when event comes in. When do restoration, we don¹t know
>>> previous events.
>>> 
>>> for (Snapshotable snapshotable : snapshotableList) {
>>>   snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>>> }
>>> 
>>> Please let me know if there is some issue with my test program or
>>> something is wrong with Siddhi group by/aggregator snapshot
>>> 
>>> Thanks
>>> Edward
>>> 
>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>> <ma...@wso2.com>>>
>>> Date: Wednesday, November 25, 2015 at 21:07
>>> To: Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>> <ma...@apache.org>>>
>>> Cc: Srinath Perera <sr...@wso2.com><mailto:
>> srinath@wso2.com<ma...@wso2.com>>>, WSO2
>>> Developers' List <de...@wso2.org><mailto:dev@wso2.org
>> <ma...@wso2.org>>>
>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>> 
>>> Hi
>>> 
>>> Currently the concept of current event & expired events live within the
>>> query and all events going out to a stream are converted back to current
>>> events. So its hard for the application to keep track of the window and
>>> aggregation states like count, avg, std, etc...
>>> Further window implementations can very based on its implementations
>>> hence in some cases knowing what events entered and existed will not be
>>> enough to recreate the window during failure.
>>> 
>>> The recommended way to keep track of state in Siddhi is via snapshots,
>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>> frame. and also buffer a copy of the events sent to siddhi after that
>>> snapshot, with this method when there is a failure we should restore the
>>> latest snapshot and replay the events which are sent after the last
>>> snapshot. The tricky part is the way you buffer events after snapshot,
>>> using Kafka and replaying is one option.
>>> 
>>> Regards
>>> Suho
>>> 
>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>> <yo...@apache.org><mailto:
>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>> I tried expired events before, it only works for the query without
>>> groupby. If the query contains groupby and having clause, then it only
>>> emit just expired event when having conditions is satisfied.
>>> 
>>> For example
>>> 
>>> String cseEventStream = "define stream TempStream (user string, cmd
>>> string);";
>>> String query = "@info(name = 'query1') from TempStream#window.length(4) "
>>>       + " select user, cmd, count(user) as cnt " +
>>>       " group by user " +
>>>       "having cnt > 3 "
>>>       + " insert all events into DelayedTempStream";
>>> 
>>> If we send events as follows, it will not generate expired events at all.
>>> 
>>> inputHandler.send(new Object[]{"user", "open1"});
>>> inputHandler.send(new Object[]{"user", "open2"});
>>> inputHandler.send(new Object[]{"user", "open3"});
>>> inputHandler.send(new Object[]{"user", "open4"});
>>> inputHandler.send(new Object[]{"user", "open5"});
>>> 
>>> 
>>> Thanks
>>> Edward Zhang
>>> 
>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>> <sr...@wso2.com><mailto:srinath@wso2.com
>> <ma...@wso2.com>>> wrote:
>>> Adding Suho
>>> 
>>> Hi Edward,
>>> 
>>> Each window give you a stream of expired events as well. Would that work?
>>> 
>>> 
>> https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W
>>> indow
>>> 
>>> Thank
>>> Srinath
>>> 
>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>> <yo...@apache.org><mailto:
>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>> Hi Siddhi team,
>>> 
>>> Do we have anyway of tracking what events are removed from any type of
>>> windows, length(batch), or time(batch)? I investigated that removeEvents
>>> may not be the right solution.
>>> 
>>> We have one requirement of migrating policy from one machine to another
>>> machine but keeping internal state there.
>>> 
>>> Eagle uses policy in storm infrastructure, but one machine which holds
>>> the policy fails, then already-populated events in the window also are
>>> gone. Sometimes it is bad especially when we have built up a long window
>>> like monthly data.
>>> 
>>> One possible way is to keep all events in the siddhi window to be
>>> snapshotted into application domain. Another way is to keep tracking what
>>> events are coming in and out, so application can track what are left in
>>> siddhi window.
>>> 
>>> Here is the ticket for Eagle
>>> https://issues.apache.org/jira/browse/EAGLE-39
>>> 
>>> Do you have similar request before? Or what do you suggest?
>>> 
>>> Thanks
>>> Edward Zhang
>>> 
>>> _______________________________________________
>>> Dev mailing list
>>> Dev@wso2.org<ma...@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.org
>>>> 
>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>> 
>>> 
>>> 
>>> 
>>> --
>>> ============================
>>> Srinath Perera, Ph.D.
>>>  http://people.apache.org/~hemapani/
>>>  http://srinathsview.blogspot.com/
>>> 
>>> 
>>> 
>>> 
>>> --
>>> S. Suhothayan
>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>> <http://wso2.com/>
>>> lean . enterprise . middleware
>>> 
>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
>> http://suhothayan.blogspot.com/
>>> twitter: http://twitter.com/suhothayan | linked-in:
>>> http://lk.linkedin.com/in/suhothayan
>> 
>> 
>> 
>> 
>> --
>> S. Suhothayan
>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>> <http://wso2.com/>
>> lean . enterprise . middleware
>> 
>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>> twitter: http://twitter.com/suhothayan | linked-in:
>> http://lk.linkedin.com/in/suhothayan
>> 


Re: [Discuss] Eagle Policy State Management

Posted by Julian Hyde <jh...@apache.org>.
State management of streams (including what I’d call “derived streams”) is a hard distributed systems problem. Ideally it would be solved by the stream provider, not by the Eagle project. I think you should talk to the various streaming projects — Storm, Samza, Kafka, Flink — and find out whether they can solve this, or whether it is on their roadmap. 

I can make introductions to the leaders of those projects if needed.

If the problem is solved at source, Eagle can focus on the actual problem rather than infrastructure.

Julian


> On Dec 10, 2015, at 7:48 PM, Liangfei.Su <su...@gmail.com> wrote:
> 
> Great proposal, this is important and could be general served for policy
> capability and analytic feature.
> 
> Periodically taken the snapshot independently on each bolt could make
> status recoverable from recent history, but from whole topology store point
> of view, this could not hand bolt status dependency exactly.
> 
> Another point is should the state restore be triggered not only when
> topology restarts but also when
> a. topology re-balance
> b. single bolt movement by underling stream framework for one executor to
> another?
> 
> Thanks,
> Ralph
> 
> 
> On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
> yonzhang@ebay.com> wrote:
> 
>> This topic has been discussed offline for a while and it is time we
>> document problems and solutions. With clear problem statements and proposed
>> solutions, I believe we can do better before we start implementing.
>> 
>> [Problem Statement] For Eagle as real-time big data monitoring framework
>> evaluating policies efficiently is the core functionality. Most of
>> meaningful polices are stateful in that policy evaluation is not based on a
>> single event but on both previous events and current event. This
>> potentially brings 2 fundamental problems, one is policy state loss upon
>> machine failures or topology restart, the other is lacking history data
>> upon fresh start. One simple example is for a policy like “from
>> userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt
>> group by user having cnt > 1000”, if the task is restarted, the state of
>> accumulated user/count map is missing. Also when the topology is started at
>> the first time, the window is empty even if we have historic data in
>> database.
>> 
>> [Proposed Solutions] The natural way of solving the above 2 problems is
>> 1) do policy state persist periodically and restore policy state after
>> task is restarted
>> Underlying policy engine should support snapshot and restore operations.
>> In Siddhi 3.x, it already supports snapshot and restore, though I found
>> some bugs with their state management.
>> https://wso2.org/jira/browse/CEP-1433
>> For restore, it is not that straight-forward unless all input events to
>> policy evaluator are backed by a reliable and rewindable storage like Kafka.
>> If input events to policy evaluator is backed by Kafka, then each time
>> when EAGLE takes snapshot, we records the current offset together and
>> persist both of them to deep storage.
>> If input events to policy evaluator is not backed by Kafka, then we need
>> record every event since last snapshot. That looks very expensive. Apache
>> Flink uses efficient algorithm called stream barrier to do group
>> acknowledgement, but in Storm we don’t have this feature. But remember
>> Apache Flink requires that each task do snapshot not only for policy
>> evaluator.
>> 
>> 2) transparently load historical data when topology is started at the
>> first time
>> If policy evaluator accepts data which is already persisted in database or
>> Kafka, we can provide API to retrieve data from database or Kafka. This
>> loading is transparent to developer, but developer/user needs to specify
>> the deep storage for storing historical data.
>> 
>> Also be aware that if we have the right solution for policy evaluator, the
>> solution should be also applied to event aggregation.
>> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>> 
>> Another aggressive way is to use Flink stream barrier similar solution
>> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
>> to take snapshot to all eagle tasks(typically spout and bolt) but turn off
>> storm ACK mechanism.
>> 
>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>> R] {
>>  def prepareConfig(config : Config)
>>  def init
>>  def fields : Array[String]
>> }
>> 
>> 
>> ==>
>> 
>> 
>> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
>> R] {
>>  def prepareConfig(config : Config)
>>  def init
>>  def fields : Array[String]
>> 
>>  def snapshot : Array[Byte]
>> 
>>  def restore(state: Array[Byte])
>> }
>> 
>> This is pretty much important to Eagle if we want Eagle to be a monitoring
>> framework with fault-tolerance.
>> 
>> Thanks
>> Edward
>> From: Sriskandarajah Suhothayan <su...@wso2.com>>
>> Date: Thursday, December 10, 2015 at 9:30
>> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>> yonzhang@ebay.com>>
>> Cc: "dev@eagle.incubator.apache.org<ma...@eagle.incubator.apache.org>"
>> <de...@eagle.incubator.apache.org>>,
>> Edward Zhang <yo...@apache.org>>,
>> Srinath Perera <sr...@wso2.com>>, WSO2
>> Developers' List <de...@wso2.org>>
>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>> 
>> Thanks for pointing it out,
>> 
>> We are looking into this.
>> Will update you ASAP
>> 
>> Suho
>> 
>> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
>> yonzhang@ebay.com<ma...@ebay.com>> wrote:
>> By the way, we use siddhi version 3.0.2.
>> 
>> Also for tracking this issue, I created jira
>> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for
>> aggregation on time based window
>> 
>> Thanks
>> Edward
>> 
>> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
>> yonzhang@ebay.com>> wrote:
>> 
>>> Thanks for this suggestion, Suho.
>>> 
>>> I did some testing on state persist and restore, looks most of use cases
>>> are working except group by. I am not if Siddhi team has known this.
>>> 
>>> Please look at my test cases : testTimeSlideWindowWithGroupby
>>> 
>> https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d
>>> f9a1f85758168efcb
>>> 
>>> The query is like the following
>>> String cseEventStream = "define stream testStream (timeStamp long, user
>>> string, cmd string);";
>>>               + String query = "@info(name = 'query1') from
>>> testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
>>>               + + " select user, timeStamp, count() as cnt"
>>>               + + " group by user"
>>>               + + " having cnt > 2"
>>>               + + " insert all events into outputStream;";
>>> 
>>> The basic issue could be the following:
>>> 1) when taking snapshot, it will persist all Count executor per key. But
>>> looks Siddhi adds same Count executor into snapshot list multiple
>>> times(The count aggregator elementId is $planName+keyname)
>>> 2) when restoring snapshot, it will not restore the Count executor for
>>> key because snopshotableList does not have the above key.  That key only
>>> is generated when event comes in. When do restoration, we don¹t know
>>> previous events.
>>> 
>>> for (Snapshotable snapshotable : snapshotableList) {
>>>   snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
>>> }
>>> 
>>> Please let me know if there is some issue with my test program or
>>> something is wrong with Siddhi group by/aggregator snapshot
>>> 
>>> Thanks
>>> Edward
>>> 
>>> From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
>>> <ma...@wso2.com>>>
>>> Date: Wednesday, November 25, 2015 at 21:07
>>> To: Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
>>> <ma...@apache.org>>>
>>> Cc: Srinath Perera <sr...@wso2.com><mailto:
>> srinath@wso2.com<ma...@wso2.com>>>, WSO2
>>> Developers' List <de...@wso2.org><mailto:dev@wso2.org
>> <ma...@wso2.org>>>
>>> Subject: Re: [Dev] [Siddhi] what events is left in the window
>>> 
>>> Hi
>>> 
>>> Currently the concept of current event & expired events live within the
>>> query and all events going out to a stream are converted back to current
>>> events. So its hard for the application to keep track of the window and
>>> aggregation states like count, avg, std, etc...
>>> Further window implementations can very based on its implementations
>>> hence in some cases knowing what events entered and existed will not be
>>> enough to recreate the window during failure.
>>> 
>>> The recommended way to keep track of state in Siddhi is via snapshots,
>>> you can take snapshots of the siddhi Runtime with a reasonable time
>>> frame. and also buffer a copy of the events sent to siddhi after that
>>> snapshot, with this method when there is a failure we should restore the
>>> latest snapshot and replay the events which are sent after the last
>>> snapshot. The tricky part is the way you buffer events after snapshot,
>>> using Kafka and replaying is one option.
>>> 
>>> Regards
>>> Suho
>>> 
>>> On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
>>> <yo...@apache.org><mailto:
>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>> I tried expired events before, it only works for the query without
>>> groupby. If the query contains groupby and having clause, then it only
>>> emit just expired event when having conditions is satisfied.
>>> 
>>> For example
>>> 
>>> String cseEventStream = "define stream TempStream (user string, cmd
>>> string);";
>>> String query = "@info(name = 'query1') from TempStream#window.length(4) "
>>>       + " select user, cmd, count(user) as cnt " +
>>>       " group by user " +
>>>       "having cnt > 3 "
>>>       + " insert all events into DelayedTempStream";
>>> 
>>> If we send events as follows, it will not generate expired events at all.
>>> 
>>> inputHandler.send(new Object[]{"user", "open1"});
>>> inputHandler.send(new Object[]{"user", "open2"});
>>> inputHandler.send(new Object[]{"user", "open3"});
>>> inputHandler.send(new Object[]{"user", "open4"});
>>> inputHandler.send(new Object[]{"user", "open5"});
>>> 
>>> 
>>> Thanks
>>> Edward Zhang
>>> 
>>> On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
>>> <sr...@wso2.com><mailto:srinath@wso2.com
>> <ma...@wso2.com>>> wrote:
>>> Adding Suho
>>> 
>>> Hi Edward,
>>> 
>>> Each window give you a stream of expired events as well. Would that work?
>>> 
>>> 
>> https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W
>>> indow
>>> 
>>> Thank
>>> Srinath
>>> 
>>> On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
>>> <yo...@apache.org><mailto:
>> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
>>> Hi Siddhi team,
>>> 
>>> Do we have anyway of tracking what events are removed from any type of
>>> windows, length(batch), or time(batch)? I investigated that removeEvents
>>> may not be the right solution.
>>> 
>>> We have one requirement of migrating policy from one machine to another
>>> machine but keeping internal state there.
>>> 
>>> Eagle uses policy in storm infrastructure, but one machine which holds
>>> the policy fails, then already-populated events in the window also are
>>> gone. Sometimes it is bad especially when we have built up a long window
>>> like monthly data.
>>> 
>>> One possible way is to keep all events in the siddhi window to be
>>> snapshotted into application domain. Another way is to keep tracking what
>>> events are coming in and out, so application can track what are left in
>>> siddhi window.
>>> 
>>> Here is the ticket for Eagle
>>> https://issues.apache.org/jira/browse/EAGLE-39
>>> 
>>> Do you have similar request before? Or what do you suggest?
>>> 
>>> Thanks
>>> Edward Zhang
>>> 
>>> _______________________________________________
>>> Dev mailing list
>>> Dev@wso2.org<ma...@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.org
>>>> 
>>> http://wso2.org/cgi-bin/mailman/listinfo/dev
>>> 
>>> 
>>> 
>>> 
>>> --
>>> ============================
>>> Srinath Perera, Ph.D.
>>>  http://people.apache.org/~hemapani/
>>>  http://srinathsview.blogspot.com/
>>> 
>>> 
>>> 
>>> 
>>> --
>>> S. Suhothayan
>>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>>> <http://wso2.com/>
>>> lean . enterprise . middleware
>>> 
>>> cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
>> http://suhothayan.blogspot.com/
>>> twitter: http://twitter.com/suhothayan | linked-in:
>>> http://lk.linkedin.com/in/suhothayan
>> 
>> 
>> 
>> 
>> --
>> S. Suhothayan
>> Technical Lead & Team Lead of WSO2 Complex Event Processor
>> WSO2 Inc. http://wso2.com<http://wso2.com/>
>> <http://wso2.com/>
>> lean . enterprise . middleware
>> 
>> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
>> twitter: http://twitter.com/suhothayan | linked-in:
>> http://lk.linkedin.com/in/suhothayan
>> 


Re: [Discuss] Eagle Policy State Management

Posted by "Liangfei.Su" <su...@gmail.com>.
Great proposal, this is important and could be general served for policy
capability and analytic feature.

Periodically taken the snapshot independently on each bolt could make
status recoverable from recent history, but from whole topology store point
of view, this could not hand bolt status dependency exactly.

Another point is should the state restore be triggered not only when
topology restarts but also when
a. topology re-balance
b. single bolt movement by underling stream framework for one executor to
another?

Thanks,
Ralph


On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
yonzhang@ebay.com> wrote:

> This topic has been discussed offline for a while and it is time we
> document problems and solutions. With clear problem statements and proposed
> solutions, I believe we can do better before we start implementing.
>
> [Problem Statement] For Eagle as real-time big data monitoring framework
> evaluating policies efficiently is the core functionality. Most of
> meaningful polices are stateful in that policy evaluation is not based on a
> single event but on both previous events and current event. This
> potentially brings 2 fundamental problems, one is policy state loss upon
> machine failures or topology restart, the other is lacking history data
> upon fresh start. One simple example is for a policy like “from
> userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt
> group by user having cnt > 1000”, if the task is restarted, the state of
> accumulated user/count map is missing. Also when the topology is started at
> the first time, the window is empty even if we have historic data in
> database.
>
> [Proposed Solutions] The natural way of solving the above 2 problems is
> 1) do policy state persist periodically and restore policy state after
> task is restarted
> Underlying policy engine should support snapshot and restore operations.
> In Siddhi 3.x, it already supports snapshot and restore, though I found
> some bugs with their state management.
> https://wso2.org/jira/browse/CEP-1433
> For restore, it is not that straight-forward unless all input events to
> policy evaluator are backed by a reliable and rewindable storage like Kafka.
> If input events to policy evaluator is backed by Kafka, then each time
> when EAGLE takes snapshot, we records the current offset together and
> persist both of them to deep storage.
> If input events to policy evaluator is not backed by Kafka, then we need
> record every event since last snapshot. That looks very expensive. Apache
> Flink uses efficient algorithm called stream barrier to do group
> acknowledgement, but in Storm we don’t have this feature. But remember
> Apache Flink requires that each task do snapshot not only for policy
> evaluator.
>
> 2) transparently load historical data when topology is started at the
> first time
> If policy evaluator accepts data which is already persisted in database or
> Kafka, we can provide API to retrieve data from database or Kafka. This
> loading is transparent to developer, but developer/user needs to specify
> the deep storage for storing historical data.
>
> Also be aware that if we have the right solution for policy evaluator, the
> solution should be also applied to event aggregation.
> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>
> Another aggressive way is to use Flink stream barrier similar solution
> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
> to take snapshot to all eagle tasks(typically spout and bolt) but turn off
> storm ACK mechanism.
>
> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
> R] {
>   def prepareConfig(config : Config)
>   def init
>   def fields : Array[String]
> }
>
>
> ==>
>
>
> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
> R] {
>   def prepareConfig(config : Config)
>   def init
>   def fields : Array[String]
>
>   def snapshot : Array[Byte]
>
>   def restore(state: Array[Byte])
> }
>
> This is pretty much important to Eagle if we want Eagle to be a monitoring
> framework with fault-tolerance.
>
> Thanks
> Edward
> From: Sriskandarajah Suhothayan <su...@wso2.com>>
> Date: Thursday, December 10, 2015 at 9:30
> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
> yonzhang@ebay.com>>
> Cc: "dev@eagle.incubator.apache.org<ma...@eagle.incubator.apache.org>"
> <de...@eagle.incubator.apache.org>>,
> Edward Zhang <yo...@apache.org>>,
> Srinath Perera <sr...@wso2.com>>, WSO2
> Developers' List <de...@wso2.org>>
> Subject: Re: [Dev] [Siddhi] what events is left in the window
>
> Thanks for pointing it out,
>
> We are looking into this.
> Will update you ASAP
>
> Suho
>
> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
> yonzhang@ebay.com<ma...@ebay.com>> wrote:
> By the way, we use siddhi version 3.0.2.
>
> Also for tracking this issue, I created jira
> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for
> aggregation on time based window
>
> Thanks
> Edward
>
> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
> yonzhang@ebay.com>> wrote:
>
> >Thanks for this suggestion, Suho.
> >
> >I did some testing on state persist and restore, looks most of use cases
> >are working except group by. I am not if Siddhi team has known this.
> >
> >Please look at my test cases : testTimeSlideWindowWithGroupby
> >
> https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d
> >f9a1f85758168efcb
> >
> >The query is like the following
> >String cseEventStream = "define stream testStream (timeStamp long, user
> >string, cmd string);";
> >                + String query = "@info(name = 'query1') from
> >testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
> >                + + " select user, timeStamp, count() as cnt"
> >                + + " group by user"
> >                + + " having cnt > 2"
> >                + + " insert all events into outputStream;";
> >
> >The basic issue could be the following:
> >1) when taking snapshot, it will persist all Count executor per key. But
> >looks Siddhi adds same Count executor into snapshot list multiple
> >times(The count aggregator elementId is $planName+keyname)
> >2) when restoring snapshot, it will not restore the Count executor for
> >key because snopshotableList does not have the above key.  That key only
> >is generated when event comes in. When do restoration, we don¹t know
> >previous events.
> >
> >for (Snapshotable snapshotable : snapshotableList) {
> >    snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
> >}
> >
> >Please let me know if there is some issue with my test program or
> >something is wrong with Siddhi group by/aggregator snapshot
> >
> >Thanks
> >Edward
> >
> >From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
> ><ma...@wso2.com>>>
> >Date: Wednesday, November 25, 2015 at 21:07
> >To: Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
> ><ma...@apache.org>>>
> >Cc: Srinath Perera <sr...@wso2.com><mailto:
> srinath@wso2.com<ma...@wso2.com>>>, WSO2
> >Developers' List <de...@wso2.org><mailto:dev@wso2.org
> <ma...@wso2.org>>>
> >Subject: Re: [Dev] [Siddhi] what events is left in the window
> >
> >Hi
> >
> >Currently the concept of current event & expired events live within the
> >query and all events going out to a stream are converted back to current
> >events. So its hard for the application to keep track of the window and
> >aggregation states like count, avg, std, etc...
> >Further window implementations can very based on its implementations
> >hence in some cases knowing what events entered and existed will not be
> >enough to recreate the window during failure.
> >
> >The recommended way to keep track of state in Siddhi is via snapshots,
> >you can take snapshots of the siddhi Runtime with a reasonable time
> >frame. and also buffer a copy of the events sent to siddhi after that
> >snapshot, with this method when there is a failure we should restore the
> >latest snapshot and replay the events which are sent after the last
> >snapshot. The tricky part is the way you buffer events after snapshot,
> >using Kafka and replaying is one option.
> >
> >Regards
> >Suho
> >
> >On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
> ><yo...@apache.org><mailto:
> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
> >I tried expired events before, it only works for the query without
> >groupby. If the query contains groupby and having clause, then it only
> >emit just expired event when having conditions is satisfied.
> >
> >For example
> >
> >String cseEventStream = "define stream TempStream (user string, cmd
> >string);";
> >String query = "@info(name = 'query1') from TempStream#window.length(4) "
> >        + " select user, cmd, count(user) as cnt " +
> >        " group by user " +
> >        "having cnt > 3 "
> >        + " insert all events into DelayedTempStream";
> >
> >If we send events as follows, it will not generate expired events at all.
> >
> >inputHandler.send(new Object[]{"user", "open1"});
> >inputHandler.send(new Object[]{"user", "open2"});
> >inputHandler.send(new Object[]{"user", "open3"});
> >inputHandler.send(new Object[]{"user", "open4"});
> >inputHandler.send(new Object[]{"user", "open5"});
> >
> >
> >Thanks
> >Edward Zhang
> >
> >On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
> ><sr...@wso2.com><mailto:srinath@wso2.com
> <ma...@wso2.com>>> wrote:
> >Adding Suho
> >
> >Hi Edward,
> >
> >Each window give you a stream of expired events as well. Would that work?
> >
> >
> https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W
> >indow
> >
> >Thank
> >Srinath
> >
> >On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
> ><yo...@apache.org><mailto:
> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
> >Hi Siddhi team,
> >
> >Do we have anyway of tracking what events are removed from any type of
> >windows, length(batch), or time(batch)? I investigated that removeEvents
> >may not be the right solution.
> >
> >We have one requirement of migrating policy from one machine to another
> >machine but keeping internal state there.
> >
> >Eagle uses policy in storm infrastructure, but one machine which holds
> >the policy fails, then already-populated events in the window also are
> >gone. Sometimes it is bad especially when we have built up a long window
> >like monthly data.
> >
> >One possible way is to keep all events in the siddhi window to be
> >snapshotted into application domain. Another way is to keep tracking what
> >events are coming in and out, so application can track what are left in
> >siddhi window.
> >
> >Here is the ticket for Eagle
> >https://issues.apache.org/jira/browse/EAGLE-39
> >
> >Do you have similar request before? Or what do you suggest?
> >
> >Thanks
> >Edward Zhang
> >
> >_______________________________________________
> >Dev mailing list
> >Dev@wso2.org<ma...@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.org
> >>
> >http://wso2.org/cgi-bin/mailman/listinfo/dev
> >
> >
> >
> >
> >--
> >============================
> >Srinath Perera, Ph.D.
> >   http://people.apache.org/~hemapani/
> >   http://srinathsview.blogspot.com/
> >
> >
> >
> >
> >--
> >S. Suhothayan
> >Technical Lead & Team Lead of WSO2 Complex Event Processor
> >WSO2 Inc. http://wso2.com<http://wso2.com/>
> ><http://wso2.com/>
> >lean . enterprise . middleware
> >
> >cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
> http://suhothayan.blogspot.com/
> >twitter: http://twitter.com/suhothayan | linked-in:
> >http://lk.linkedin.com/in/suhothayan
>
>
>
>
> --
> S. Suhothayan
> Technical Lead & Team Lead of WSO2 Complex Event Processor
> WSO2 Inc. http://wso2.com<http://wso2.com/>
> <http://wso2.com/>
> lean . enterprise . middleware
>
> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
> twitter: http://twitter.com/suhothayan | linked-in:
> http://lk.linkedin.com/in/suhothayan
>

Re: [Discuss] Eagle Policy State Management

Posted by "Liangfei.Su" <su...@gmail.com>.
Great proposal, this is important and could be general served for policy
capability and analytic feature.

Periodically taken the snapshot independently on each bolt could make
status recoverable from recent history, but from whole topology store point
of view, this could not hand bolt status dependency exactly.

Another point is should the state restore be triggered not only when
topology restarts but also when
a. topology re-balance
b. single bolt movement by underling stream framework for one executor to
another?

Thanks,
Ralph


On Fri, Dec 11, 2015 at 9:49 AM, Zhang, Edward (GDI Hadoop) <
yonzhang@ebay.com> wrote:

> This topic has been discussed offline for a while and it is time we
> document problems and solutions. With clear problem statements and proposed
> solutions, I believe we can do better before we start implementing.
>
> [Problem Statement] For Eagle as real-time big data monitoring framework
> evaluating policies efficiently is the core functionality. Most of
> meaningful polices are stateful in that policy evaluation is not based on a
> single event but on both previous events and current event. This
> potentially brings 2 fundamental problems, one is policy state loss upon
> machine failures or topology restart, the other is lacking history data
> upon fresh start. One simple example is for a policy like “from
> userActivity[cmd==‘delete’]time.window(1 month) select user, count() as cnt
> group by user having cnt > 1000”, if the task is restarted, the state of
> accumulated user/count map is missing. Also when the topology is started at
> the first time, the window is empty even if we have historic data in
> database.
>
> [Proposed Solutions] The natural way of solving the above 2 problems is
> 1) do policy state persist periodically and restore policy state after
> task is restarted
> Underlying policy engine should support snapshot and restore operations.
> In Siddhi 3.x, it already supports snapshot and restore, though I found
> some bugs with their state management.
> https://wso2.org/jira/browse/CEP-1433
> For restore, it is not that straight-forward unless all input events to
> policy evaluator are backed by a reliable and rewindable storage like Kafka.
> If input events to policy evaluator is backed by Kafka, then each time
> when EAGLE takes snapshot, we records the current offset together and
> persist both of them to deep storage.
> If input events to policy evaluator is not backed by Kafka, then we need
> record every event since last snapshot. That looks very expensive. Apache
> Flink uses efficient algorithm called stream barrier to do group
> acknowledgement, but in Storm we don’t have this feature. But remember
> Apache Flink requires that each task do snapshot not only for policy
> evaluator.
>
> 2) transparently load historical data when topology is started at the
> first time
> If policy evaluator accepts data which is already persisted in database or
> Kafka, we can provide API to retrieve data from database or Kafka. This
> loading is transparent to developer, but developer/user needs to specify
> the deep storage for storing historical data.
>
> Also be aware that if we have the right solution for policy evaluator, the
> solution should be also applied to event aggregation.
> https://cwiki.apache.org/confluence/display/EAG/Stream+Analyze
>
> Another aggressive way is to use Flink stream barrier similar solution
> http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
> to take snapshot to all eagle tasks(typically spout and bolt) but turn off
> storm ACK mechanism.
>
> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
> R] {
>   def prepareConfig(config : Config)
>   def init
>   def fields : Array[String]
> }
>
>
> ==>
>
>
> trait StormStreamExecutor[R <: EagleTuple] extends FlatMapper[Seq[AnyRef],
> R] {
>   def prepareConfig(config : Config)
>   def init
>   def fields : Array[String]
>
>   def snapshot : Array[Byte]
>
>   def restore(state: Array[Byte])
> }
>
> This is pretty much important to Eagle if we want Eagle to be a monitoring
> framework with fault-tolerance.
>
> Thanks
> Edward
> From: Sriskandarajah Suhothayan <su...@wso2.com>>
> Date: Thursday, December 10, 2015 at 9:30
> To: "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
> yonzhang@ebay.com>>
> Cc: "dev@eagle.incubator.apache.org<ma...@eagle.incubator.apache.org>"
> <de...@eagle.incubator.apache.org>>,
> Edward Zhang <yo...@apache.org>>,
> Srinath Perera <sr...@wso2.com>>, WSO2
> Developers' List <de...@wso2.org>>
> Subject: Re: [Dev] [Siddhi] what events is left in the window
>
> Thanks for pointing it out,
>
> We are looking into this.
> Will update you ASAP
>
> Suho
>
> On Thu, Dec 10, 2015 at 12:58 AM, Zhang, Edward (GDI Hadoop) <
> yonzhang@ebay.com<ma...@ebay.com>> wrote:
> By the way, we use siddhi version 3.0.2.
>
> Also for tracking this issue, I created jira
> https://wso2.org/jira/browse/CEP-1433 snapshot/restore does not work for
> aggregation on time based window
>
> Thanks
> Edward
>
> On 12/8/15, 17:57, "Zhang, Edward (GDI Hadoop)" <yonzhang@ebay.com<mailto:
> yonzhang@ebay.com>> wrote:
>
> >Thanks for this suggestion, Suho.
> >
> >I did some testing on state persist and restore, looks most of use cases
> >are working except group by. I am not if Siddhi team has known this.
> >
> >Please look at my test cases : testTimeSlideWindowWithGroupby
> >
> https://github.com/yonzhang/incubator-eagle/commit/606b65705ea20ce1592a20d
> >f9a1f85758168efcb
> >
> >The query is like the following
> >String cseEventStream = "define stream testStream (timeStamp long, user
> >string, cmd string);";
> >                + String query = "@info(name = 'query1') from
> >testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
> >                + + " select user, timeStamp, count() as cnt"
> >                + + " group by user"
> >                + + " having cnt > 2"
> >                + + " insert all events into outputStream;";
> >
> >The basic issue could be the following:
> >1) when taking snapshot, it will persist all Count executor per key. But
> >looks Siddhi adds same Count executor into snapshot list multiple
> >times(The count aggregator elementId is $planName+keyname)
> >2) when restoring snapshot, it will not restore the Count executor for
> >key because snopshotableList does not have the above key.  That key only
> >is generated when event comes in. When do restoration, we don¹t know
> >previous events.
> >
> >for (Snapshotable snapshotable : snapshotableList) {
> >    snapshotable.restoreState(snapshots.get(snapshotable.getElementId()));
> >}
> >
> >Please let me know if there is some issue with my test program or
> >something is wrong with Siddhi group by/aggregator snapshot
> >
> >Thanks
> >Edward
> >
> >From: Sriskandarajah Suhothayan <suho@wso2.com<mailto:suho@wso2.com
> ><ma...@wso2.com>>>
> >Date: Wednesday, November 25, 2015 at 21:07
> >To: Edward Zhang <yonzhang2012@apache.org<mailto:yonzhang2012@apache.org
> ><ma...@apache.org>>>
> >Cc: Srinath Perera <sr...@wso2.com><mailto:
> srinath@wso2.com<ma...@wso2.com>>>, WSO2
> >Developers' List <de...@wso2.org><mailto:dev@wso2.org
> <ma...@wso2.org>>>
> >Subject: Re: [Dev] [Siddhi] what events is left in the window
> >
> >Hi
> >
> >Currently the concept of current event & expired events live within the
> >query and all events going out to a stream are converted back to current
> >events. So its hard for the application to keep track of the window and
> >aggregation states like count, avg, std, etc...
> >Further window implementations can very based on its implementations
> >hence in some cases knowing what events entered and existed will not be
> >enough to recreate the window during failure.
> >
> >The recommended way to keep track of state in Siddhi is via snapshots,
> >you can take snapshots of the siddhi Runtime with a reasonable time
> >frame. and also buffer a copy of the events sent to siddhi after that
> >snapshot, with this method when there is a failure we should restore the
> >latest snapshot and replay the events which are sent after the last
> >snapshot. The tricky part is the way you buffer events after snapshot,
> >using Kafka and replaying is one option.
> >
> >Regards
> >Suho
> >
> >On Thu, Nov 26, 2015 at 10:01 AM, Edward Zhang
> ><yo...@apache.org><mailto:
> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
> >I tried expired events before, it only works for the query without
> >groupby. If the query contains groupby and having clause, then it only
> >emit just expired event when having conditions is satisfied.
> >
> >For example
> >
> >String cseEventStream = "define stream TempStream (user string, cmd
> >string);";
> >String query = "@info(name = 'query1') from TempStream#window.length(4) "
> >        + " select user, cmd, count(user) as cnt " +
> >        " group by user " +
> >        "having cnt > 3 "
> >        + " insert all events into DelayedTempStream";
> >
> >If we send events as follows, it will not generate expired events at all.
> >
> >inputHandler.send(new Object[]{"user", "open1"});
> >inputHandler.send(new Object[]{"user", "open2"});
> >inputHandler.send(new Object[]{"user", "open3"});
> >inputHandler.send(new Object[]{"user", "open4"});
> >inputHandler.send(new Object[]{"user", "open5"});
> >
> >
> >Thanks
> >Edward Zhang
> >
> >On Wed, Nov 25, 2015 at 6:50 PM, Srinath Perera
> ><sr...@wso2.com><mailto:srinath@wso2.com
> <ma...@wso2.com>>> wrote:
> >Adding Suho
> >
> >Hi Edward,
> >
> >Each window give you a stream of expired events as well. Would that work?
> >
> >
> https://docs.wso2.com/display/CEP400/SiddhiQL+Guide+3.0#SiddhiQLGuide3.0-W
> >indow
> >
> >Thank
> >Srinath
> >
> >On Thu, Nov 19, 2015 at 5:37 AM, Edward Zhang
> ><yo...@apache.org><mailto:
> yonzhang2012@apache.org<ma...@apache.org>>> wrote:
> >Hi Siddhi team,
> >
> >Do we have anyway of tracking what events are removed from any type of
> >windows, length(batch), or time(batch)? I investigated that removeEvents
> >may not be the right solution.
> >
> >We have one requirement of migrating policy from one machine to another
> >machine but keeping internal state there.
> >
> >Eagle uses policy in storm infrastructure, but one machine which holds
> >the policy fails, then already-populated events in the window also are
> >gone. Sometimes it is bad especially when we have built up a long window
> >like monthly data.
> >
> >One possible way is to keep all events in the siddhi window to be
> >snapshotted into application domain. Another way is to keep tracking what
> >events are coming in and out, so application can track what are left in
> >siddhi window.
> >
> >Here is the ticket for Eagle
> >https://issues.apache.org/jira/browse/EAGLE-39
> >
> >Do you have similar request before? Or what do you suggest?
> >
> >Thanks
> >Edward Zhang
> >
> >_______________________________________________
> >Dev mailing list
> >Dev@wso2.org<ma...@wso2.org><mailto:Dev@wso2.org<mailto:Dev@wso2.org
> >>
> >http://wso2.org/cgi-bin/mailman/listinfo/dev
> >
> >
> >
> >
> >--
> >============================
> >Srinath Perera, Ph.D.
> >   http://people.apache.org/~hemapani/
> >   http://srinathsview.blogspot.com/
> >
> >
> >
> >
> >--
> >S. Suhothayan
> >Technical Lead & Team Lead of WSO2 Complex Event Processor
> >WSO2 Inc. http://wso2.com<http://wso2.com/>
> ><http://wso2.com/>
> >lean . enterprise . middleware
> >
> >cell: (+94) 779 756 757<tel:%28%2B94%29%20779%20756%20757> | blog:
> http://suhothayan.blogspot.com/
> >twitter: http://twitter.com/suhothayan | linked-in:
> >http://lk.linkedin.com/in/suhothayan
>
>
>
>
> --
> S. Suhothayan
> Technical Lead & Team Lead of WSO2 Complex Event Processor
> WSO2 Inc. http://wso2.com<http://wso2.com/>
> <http://wso2.com/>
> lean . enterprise . middleware
>
> cell: (+94) 779 756 757 | blog: http://suhothayan.blogspot.com/
> twitter: http://twitter.com/suhothayan | linked-in:
> http://lk.linkedin.com/in/suhothayan
>