You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by Bhupesh Chawda <bh...@datatorrent.com> on 2016/06/27 06:04:47 UTC

APEXMALHAR-1701 Deduper in Malhar

Hi All,

I am working on adding a De-duplication operator in Malhar library based on
managed state APIs. I will be working off the already created JIRA -
https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the initial pull
request for an AbstractDeduper here:
https://github.com/apache/apex-malhar/pull/260/files

I am planning to include the following features in the first version:
1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time
correlation holds.
2. Option to maintain order of incoming tuples.
3. Duplicate and Expired ports to emit duplicate and expired tuples
respectively.

Thanks.

~ Bhupesh

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Mohit Jotwani <mo...@datatorrent.com>.
Dear Community,

+1 to Bhupesh's suggestion.

I would suggest to go ahead with the Managed State and once we have proper
analysis on the windowed operator + large storage backed windowed operator
- we should implement operators such as dedup with it.

Regards,
Mohit

On Mon, Jul 18, 2016 at 12:35 PM, Bhupesh Chawda <bh...@apache.org> wrote:

> I can see that Dedup seems like a case where state is continuously merged
> with older state. State in this case is the set of unique tuples. However,
> for Dedup use case, the event windows are, in a way, fixed, and do not
> depend on the incoming tuples. In-coming tuples are just *assigned* to
> these windows. The point I am trying to make is that the older event
> windows will be purged (depending on the lateness configuration and
> watermarks) irrespective of the incoming tuples. Session windows on the
> other hand depend on the incoming tuples and are not fixed, and change with
> incoming data. Perhaps we should not model this use case as a session
> window.
>
> I agree that we cannot decide the approach to be followed with the current
> memory backed storage implementation. Actually, even when we have seen a
> managed state backed implementation for windowed storage, I am worried that
> the interfaces won't still be flexible enough as compared to direct usage
> of managed state and will need custom changes to fit the Dedup use case. I
> am looking at it from the perspective of asynchronous processing which will
> be necessary once we have disk IO involved for processing incoming tuples.
>
> I will suggest we move ahead with the managed state implementation for
> Deduper. We can pick up the Windowed operator based implementation once we
> have all the necessary features like windowed storage backed by managed
> state, input operators with watermark tuple support etc.
>
> Suggestions?
>
> ~ Bhupesh
>
>
> On Mon, Jul 18, 2016 at 11:29 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> > Hi Bhupesh,
> >
> > Dedup is different with regard to state accumulation. For other windowed
> > operations, we collect state and then emit a result after a period of
> time
> > (trigger or watermark). Here, we only need the state to detect the
> > duplicate. Hence, it is inefficient to collect a list of tuples to
> > determine that a subsequently arriving tuple is a duplicate or not. But
> > isn't this scenario similar to the session window, where state is
> > continuously merged.
> >
> > I would prefer to see more analysis on performance and scalability to
> large
> > key cardinality. The window operator only has the memory backed window
> > store at this time. Until there is a managed state backed implementation
> > that has seen benchmarking, we cannot really use it as baseline for
> further
> > implementations on top of it.
> >
> > Thomas
> >
> >
> > On Thu, Jul 14, 2016 at 7:55 PM, Bhupesh Chawda <bh...@apache.org>
> > wrote:
> >
> > > Hi All,
> > >
> > > I also implemented a De-duplication operator using Windowed Operator.
> Now
> > > we have two implementations, one with Managed state and another using
> > > Windowed operator. Here are their details:
> > >
> > >    1. *With Managed State - *
> > >    - The operator is implemented using managed state as the storage for
> > >       buckets into which the tuples will be stored.
> > >       - *TimeBucketAssigner* is used to assign an incoming tuple to
> > >       different buckets based on the event time. It is also used to
> > > identify
> > >       whether a particular tuple is expired and should be sent to the
> > > expired
> > >       port / dropped.
> > >       - For managed state, the *ManagedTimeUnifiedStateImpl*
> > implementation
> > >       is used which just requires the user to specify the event time
> > > and a bucket
> > >       is automatically assigned based on that. The structure of the
> > bucket
> > > data
> > >       on storage is as follows: /operator_id /time_bucket
> > >       - An advantage of using Managed State approach is that we don't
> > have
> > >       to assume the correlation of event time to the de-duplication key
> > of
> > > the
> > >       tuple. For example, if we get two tuples like: (K1, T1), and (K1,
> > > T2), we
> > >       can still use ManagedStateImpl and conclude that these tuples are
> > >       duplicates based on the Key K1.
> > >       2. *With Windowed Operator - *
> > >    - The operator uses the WindowedOperatorImpl as the base operator.
> > >       - Accumulation, for the deduper, basically amounts to storing a
> > list
> > >       of tuples in the data storage. Every time we get a unique tuple,
> we
> > >       *accumulate* it in the list.
> > >       - Event windows are modeled using the *TimeWindow* option.
> Although
> > >       SlidingTimeWIndows seems to be intuitive for data buckets, it
> seems
> > > to be
> > >       the costly option as the accumulation in this case is not just
> > > an aggregate
> > >       value but a list of values in that bucket.
> > >       - Watermarks are not assumed to be sent from an input operator
> > >       (although it is okay if an upstream operator sends them). The
> > >       *fixedWatermark* feature is used to assume watermarks which are
> > >       relative to the window time.
> > >       - One of the issues I found with using WindowedOperator for Dedup
> > is
> > >       that event time is tightly coupled with the de-duplication key.
> In
> > > the
> > >       above example, (K1, T1), and (K1, T2) *might* be concluded as two
> > >       unique tuples since T1 and T2 may fall into two different time
> > > buckets.
> > >
> > > Here are the PRs for both of them.
> > >
> > >    - Using Managed State:
> https://github.com/apache/apex-malhar/pull/335
> > >    - Using Windowed Operator:
> > > https://github.com/apache/apex-malhar/pull/343
> > >
> > > Please review them and suggest on the correct approach for the final
> > > implementation which should be used to add other features like fault
> > > tolerance, scalability, optimizations etc.
> > > Thanks.
> > >
> > > ~ Bhupesh
> > >
> > > On Fri, Jul 8, 2016 at 11:30 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > No problem.
> > > >
> > > > By the way, I changed the method name to setFixedWatermark. And also,
> > if
> > > > you want to drop any tuples that are considered late, you need to set
> > the
> > > > allowed lateness to be 0.
> > > >
> > > > David
> > > >
> > > > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bh...@apache.org>
> > > wrote:
> > > >
> > > > > Thanks David.
> > > > > I'll try to create an implementation for Deduper which uses
> > > > > WindowedOperator. Will open a PR soon for review.
> > > > >
> > > > > ~ Bhupesh
> > > > >
> > > > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > > >
> > > > > > Hi Bhupesh,
> > > > > >
> > > > > > I just added the method setFixedLateness(long millis) to
> > > > > > AbstractWindowedOperator in my PR. This will allow you to specify
> > the
> > > > > > lateness with respect to the timestamp from the window ID without
> > > > > watermark
> > > > > > tuples from upstream.
> > > > > >
> > > > > > David
> > > > > >
> > > > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <
> david@datatorrent.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Bhupesh,
> > > > > > >
> > > > > > > Yes, the windowed operator currently depends on the watermark
> > > tuples
> > > > > > > upstream for any "lateness" related operation. If there is no
> > > > > watermark,
> > > > > > > nothing will be considered late. We can add support for
> lateness
> > > > > handling
> > > > > > > without incoming watermark tuples. Let me add that to the pull
> > > > request.
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <
> > > bhupesh@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi David,
> > > > > > >>
> > > > > > >> Thanks for your reply.
> > > > > > >>
> > > > > > >> If I am to use a windowed operator for the Dedup operator,
> there
> > > > > should
> > > > > > be
> > > > > > >> some operator (upstream to Deduper) which sends the watermark
> > > > tuples.
> > > > > > >> These
> > > > > > >> tuples (along with allowed lateness), will be the ones
> deciding
> > > > which
> > > > > > >> incoming tuples are too late and will be dropped. I have the
> > > > following
> > > > > > >> questions:
> > > > > > >>
> > > > > > >> Is a windowed operator (which needs watermarks) dependent upon
> > > some
> > > > > > other
> > > > > > >> operator for these tuples? What happens when there are no
> > > watermark
> > > > > > tuples
> > > > > > >> sent from upstream?
> > > > > > >>
> > > > > > >> Can a windowed operator "*assume*" the watermark tuples based
> on
> > > > some
> > > > > > >> notion of time? For example, can the Deduper, use the
> streaming
> > > > window
> > > > > > >> time
> > > > > > >> as the reference to advance the watermark?
> > > > > > >>
> > > > > > >> Thanks.
> > > > > > >>
> > > > > > >> ~ Bhupesh
> > > > > > >>
> > > > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <
> > david@datatorrent.com>
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > Hi Bhupesh,
> > > > > > >> >
> > > > > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > > > > >> WindowedStorage
> > > > > > >> > and WindowedKeyedStorage:
> > > > > > >> >
> > > > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > > > > >> >
> > > > > > >> > We expect either to use ManagedState directly, or Spillable
> > > > > > structures,
> > > > > > >> > which in turn uses ManagedState.
> > > > > > >> >
> > > > > > >> > I'm not very familiar with the dedup operator. but in order
> to
> > > use
> > > > > the
> > > > > > >> > WindowedOperator, it sounds to me that we can use
> > SlidingWindows
> > > > > with
> > > > > > an
> > > > > > >> > implementation of WindowedKeyedStorage that uses a Bloom
> > filter
> > > to
> > > > > > cover
> > > > > > >> > most of the false cases.
> > > > > > >> >
> > > > > > >> > David
> > > > > > >> >
> > > > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> > > > bhupesh@apache.org>
> > > > > > >> wrote:
> > > > > > >> >
> > > > > > >> > > Hi All,
> > > > > > >> > >
> > > > > > >> > > I have looked into Windowing concepts from Apache Beam and
> > the
> > > > PR
> > > > > > >> #319 by
> > > > > > >> > > David. Looks like there are a lot of advanced concepts
> which
> > > > could
> > > > > > be
> > > > > > >> > used
> > > > > > >> > > by operators using event time windowing.
> > > > > > >> > > Additionally I also looked at the Managed State
> > > implementation.
> > > > > > >> > >
> > > > > > >> > > One of the things I noticed is that there is an overlap of
> > > > > > >> functionality
> > > > > > >> > > between Managed State and Windowing Support in terms of
> the
> > > > > > following:
> > > > > > >> > >
> > > > > > >> > >    - *Discarding / Dropping of tuples* from the system -
> > > Managed
> > > > > > State
> > > > > > >> > uses
> > > > > > >> > >    the concept of expiry while a Windowed operator uses
> the
> > > > > concepts
> > > > > > >> of
> > > > > > >> > >    Watermarks and allowed lateness. If I try to reconcile
> > the
> > > > > above
> > > > > > >> two,
> > > > > > >> > it
> > > > > > >> > >    seems like Managed State (through TimeBucketAssigner)
> is
> > > > trying
> > > > > > to
> > > > > > >> > >    implement some sort of implicit heuristic Watermarks
> > based
> > > on
> > > > > > >> either
> > > > > > >> > the
> > > > > > >> > >    user supplied time or the event time.
> > > > > > >> > >    - *Global Window* support - Once we have an option to
> > > disable
> > > > > > >> purging
> > > > > > >> > in
> > > > > > >> > >    Managed State, it will have similar semantics to the
> > Global
> > > > > > Window
> > > > > > >> > > option
> > > > > > >> > >    in Windowing support.
> > > > > > >> > >
> > > > > > >> > > If I understand correctly, is the suggestion to implement
> > the
> > > > > Dedup
> > > > > > >> > > operator as a Windowed operator and to use managed state
> > only
> > > > as a
> > > > > > >> > storage
> > > > > > >> > > medium (through WindowedStorage) ? What could be a better
> > way
> > > of
> > > > > > going
> > > > > > >> > > about this?
> > > > > > >> > >
> > > > > > >> > > Thanks.
> > > > > > >> > >
> > > > > > >> > > ~ Bhupesh
> > > > > > >> > >
> > > > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > > > > bhupesh@apache.org>
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hi Thomas,
> > > > > > >> > > >
> > > > > > >> > > > I agree that the case of processing bounded data is a
> > > special
> > > > > case
> > > > > > >> of
> > > > > > >> > > > unbounded data.
> > > > > > >> > > > Th difference I was pointing out was in terms of expiry.
> > > This
> > > > is
> > > > > > not
> > > > > > >> > > > applicable in case of bounded data sets, while unbounded
> > > data
> > > > > sets
> > > > > > >> will
> > > > > > >> > > > inherently use expiry for limiting the amount of data to
> > be
> > > > > > stored.
> > > > > > >> > > >
> > > > > > >> > > > For idempotency when applying expiry on the streaming
> > data,
> > > I
> > > > > need
> > > > > > >> to
> > > > > > >> > > > explore more on the using the window timestamp that you
> > > > proposed
> > > > > > as
> > > > > > >> > > opposed
> > > > > > >> > > > to the system time which I was planning to use.
> > > > > > >> > > >
> > > > > > >> > > > Thanks.
> > > > > > >> > > > ~ Bhupesh
> > > > > > >> > > >
> > > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > > > > >> thomas@datatorrent.com>
> > > > > > >> > > > wrote:
> > > > > > >> > > >
> > > > > > >> > > >> Bhupesh,
> > > > > > >> > > >>
> > > > > > >> > > >> Why is there a distinction between bounded and
> unbounded
> > > > data?
> > > > > I
> > > > > > >> see
> > > > > > >> > the
> > > > > > >> > > >> former as a special case of the latter?
> > > > > > >> > > >>
> > > > > > >> > > >> When rewinding the stream or reprocessing the stream in
> > > > another
> > > > > > run
> > > > > > >> > the
> > > > > > >> > > >> operator should produce the same result.
> > > > > > >> > > >>
> > > > > > >> > > >> This operator should be idempotent also. That implies
> > that
> > > > code
> > > > > > >> does
> > > > > > >> > not
> > > > > > >> > > >> rely on current system time but the window timestamp
> > > instead.
> > > > > > >> > > >>
> > > > > > >> > > >> All of this should be accomplished by using the
> windowing
> > > > > > support:
> > > > > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > > > > >> > > >>
> > > > > > >> > > >> Thanks,
> > > > > > >> > > >> Thomas
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > > > > >> > > bhupesh@datatorrent.com>
> > > > > > >> > > >> wrote:
> > > > > > >> > > >>
> > > > > > >> > > >> > Hi All,
> > > > > > >> > > >> >
> > > > > > >> > > >> > I want to validate the use cases for de-duplication
> > that
> > > > will
> > > > > > be
> > > > > > >> > going
> > > > > > >> > > >> as
> > > > > > >> > > >> > part of this implementation.
> > > > > > >> > > >> >
> > > > > > >> > > >> >    - *Bounded data set*
> > > > > > >> > > >> >       - This is de-duplication for bounded data. For
> > > > example,
> > > > > > >> data
> > > > > > >> > > sets
> > > > > > >> > > >> >       which are old or fixed or which may not have a
> > time
> > > > > field
> > > > > > >> at
> > > > > > >> > > >> > all. Example:
> > > > > > >> > > >> >       Last year's transaction records or Customer
> data
> > > etc.
> > > > > > >> > > >> >       - Concept of expiry is not needed as this is
> > > bounded
> > > > > data
> > > > > > >> set.
> > > > > > >> > > >> >       - *Unbounded data set*
> > > > > > >> > > >> >       - This is de-duplication of online streaming
> data
> > > > > > >> > > >> >       - Expiry is needed because here incoming tuples
> > may
> > > > > > arrive
> > > > > > >> > later
> > > > > > >> > > >> than
> > > > > > >> > > >> >       what they are expected. Expiry is always
> computed
> > > by
> > > > > > taking
> > > > > > >> > the
> > > > > > >> > > >> > difference
> > > > > > >> > > >> >       in System time and the Event time.
> > > > > > >> > > >> >
> > > > > > >> > > >> > Any feedback is appreciated.
> > > > > > >> > > >> >
> > > > > > >> > > >> > Thanks.
> > > > > > >> > > >> >
> > > > > > >> > > >> > ~ Bhupesh
> > > > > > >> > > >> >
> > > > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > > > > >> > > >> bhupesh@datatorrent.com>
> > > > > > >> > > >> > wrote:
> > > > > > >> > > >> >
> > > > > > >> > > >> > > Hi All,
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > I am working on adding a De-duplication operator in
> > > > Malhar
> > > > > > >> library
> > > > > > >> > > >> based
> > > > > > >> > > >> > > on managed state APIs. I will be working off the
> > > already
> > > > > > >> created
> > > > > > >> > > JIRA
> > > > > > >> > > >> -
> > > > > > >> > > >> > >
> > https://issues.apache.org/jira/browse/APEXMALHAR-1701
> > > > and
> > > > > > the
> > > > > > >> > > initial
> > > > > > >> > > >> > > pull request for an AbstractDeduper here:
> > > > > > >> > > >> > >
> https://github.com/apache/apex-malhar/pull/260/files
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > I am planning to include the following features in
> > the
> > > > > first
> > > > > > >> > > version:
> > > > > > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key
> > ->
> > > > > > >> Tuple_Time
> > > > > > >> > > >> > > correlation holds.
> > > > > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > > > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate
> and
> > > > > expired
> > > > > > >> > tuples
> > > > > > >> > > >> > > respectively.
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > Thanks.
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > ~ Bhupesh
> > > > > > >> > > >> > >
> > > > > > >> > > >> >
> > > > > > >> > > >>
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Fwd: APEXMALHAR-1701 Deduper in Malhar

Posted by Bhupesh Chawda <bh...@apache.org>.
Yes, in that case, it can be modelled as a session window. A session for
every key can start with the first occurrence of that key and lasts for
some specific time duration.

This leads to another question: How exactly do we want to model Dedup
expiry?

   - Are the windows fixed on the event time axis and events just fall into
   these windows?
   - Or are the windows specific to each key and defined by the first
   occurrence of an event key?​​ Also, in the later case, if the key arrives a
   second time, but after the configured expiry time has passed after its
   first occurrence, will the key be considered an expired key or a unique key?

Thanks.

~ Bhupesh


On Mon, Jul 18, 2016 at 12:43 PM, Thomas Weise <th...@datatorrent.com>
wrote:

> +1 on the suggested way forward
>
> No clear why you say the windows are fixed though. What if I want the dedup
> to happen based on the most recent event with a given key + n time units?
>
>
> On Mon, Jul 18, 2016 at 9:05 AM, Bhupesh Chawda <bh...@apache.org>
> wrote:
>
> > I can see that Dedup seems like a case where state is continuously merged
> > with older state. State in this case is the set of unique tuples.
> However,
> > for Dedup use case, the event windows are, in a way, fixed, and do not
> > depend on the incoming tuples. In-coming tuples are just *assigned* to
> > these windows. The point I am trying to make is that the older event
> > windows will be purged (depending on the lateness configuration and
> > watermarks) irrespective of the incoming tuples. Session windows on the
> > other hand depend on the incoming tuples and are not fixed, and change
> with
> > incoming data. Perhaps we should not model this use case as a session
> > window.
> >
> > I agree that we cannot decide the approach to be followed with the
> current
> > memory backed storage implementation. Actually, even when we have seen a
> > managed state backed implementation for windowed storage, I am worried
> that
> > the interfaces won't still be flexible enough as compared to direct usage
> > of managed state and will need custom changes to fit the Dedup use case.
> I
> > am looking at it from the perspective of asynchronous processing which
> will
> > be necessary once we have disk IO involved for processing incoming
> tuples.
> >
> > I will suggest we move ahead with the managed state implementation for
> > Deduper. We can pick up the Windowed operator based implementation once
> we
> > have all the necessary features like windowed storage backed by managed
> > state, input operators with watermark tuple support etc.
> >
> > Suggestions?
> >
> > ~ Bhupesh
> >
> >
> > On Mon, Jul 18, 2016 at 11:29 AM, Thomas Weise <th...@datatorrent.com>
> > wrote:
> >
> > > Hi Bhupesh,
> > >
> > > Dedup is different with regard to state accumulation. For other
> windowed
> > > operations, we collect state and then emit a result after a period of
> > time
> > > (trigger or watermark). Here, we only need the state to detect the
> > > duplicate. Hence, it is inefficient to collect a list of tuples to
> > > determine that a subsequently arriving tuple is a duplicate or not. But
> > > isn't this scenario similar to the session window, where state is
> > > continuously merged.
> > >
> > > I would prefer to see more analysis on performance and scalability to
> > large
> > > key cardinality. The window operator only has the memory backed window
> > > store at this time. Until there is a managed state backed
> implementation
> > > that has seen benchmarking, we cannot really use it as baseline for
> > further
> > > implementations on top of it.
> > >
> > > Thomas
> > >
> > >
> > > On Thu, Jul 14, 2016 at 7:55 PM, Bhupesh Chawda <bh...@apache.org>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I also implemented a De-duplication operator using Windowed Operator.
> > Now
> > > > we have two implementations, one with Managed state and another using
> > > > Windowed operator. Here are their details:
> > > >
> > > >    1. *With Managed State - *
> > > >    - The operator is implemented using managed state as the storage
> for
> > > >       buckets into which the tuples will be stored.
> > > >       - *TimeBucketAssigner* is used to assign an incoming tuple to
> > > >       different buckets based on the event time. It is also used to
> > > > identify
> > > >       whether a particular tuple is expired and should be sent to the
> > > > expired
> > > >       port / dropped.
> > > >       - For managed state, the *ManagedTimeUnifiedStateImpl*
> > > implementation
> > > >       is used which just requires the user to specify the event time
> > > > and a bucket
> > > >       is automatically assigned based on that. The structure of the
> > > bucket
> > > > data
> > > >       on storage is as follows: /operator_id /time_bucket
> > > >       - An advantage of using Managed State approach is that we don't
> > > have
> > > >       to assume the correlation of event time to the de-duplication
> key
> > > of
> > > > the
> > > >       tuple. For example, if we get two tuples like: (K1, T1), and
> (K1,
> > > > T2), we
> > > >       can still use ManagedStateImpl and conclude that these tuples
> are
> > > >       duplicates based on the Key K1.
> > > >       2. *With Windowed Operator - *
> > > >    - The operator uses the WindowedOperatorImpl as the base operator.
> > > >       - Accumulation, for the deduper, basically amounts to storing a
> > > list
> > > >       of tuples in the data storage. Every time we get a unique
> tuple,
> > we
> > > >       *accumulate* it in the list.
> > > >       - Event windows are modeled using the *TimeWindow* option.
> > Although
> > > >       SlidingTimeWIndows seems to be intuitive for data buckets, it
> > seems
> > > > to be
> > > >       the costly option as the accumulation in this case is not just
> > > > an aggregate
> > > >       value but a list of values in that bucket.
> > > >       - Watermarks are not assumed to be sent from an input operator
> > > >       (although it is okay if an upstream operator sends them). The
> > > >       *fixedWatermark* feature is used to assume watermarks which are
> > > >       relative to the window time.
> > > >       - One of the issues I found with using WindowedOperator for
> Dedup
> > > is
> > > >       that event time is tightly coupled with the de-duplication key.
> > In
> > > > the
> > > >       above example, (K1, T1), and (K1, T2) *might* be concluded as
> two
> > > >       unique tuples since T1 and T2 may fall into two different time
> > > > buckets.
> > > >
> > > > Here are the PRs for both of them.
> > > >
> > > >    - Using Managed State:
> > https://github.com/apache/apex-malhar/pull/335
> > > >    - Using Windowed Operator:
> > > > https://github.com/apache/apex-malhar/pull/343
> > > >
> > > > Please review them and suggest on the correct approach for the final
> > > > implementation which should be used to add other features like fault
> > > > tolerance, scalability, optimizations etc.
> > > > Thanks.
> > > >
> > > > ~ Bhupesh
> > > >
> > > > On Fri, Jul 8, 2016 at 11:30 PM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > No problem.
> > > > >
> > > > > By the way, I changed the method name to setFixedWatermark. And
> also,
> > > if
> > > > > you want to drop any tuples that are considered late, you need to
> set
> > > the
> > > > > allowed lateness to be 0.
> > > > >
> > > > > David
> > > > >
> > > > > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bhupesh@apache.org
> >
> > > > wrote:
> > > > >
> > > > > > Thanks David.
> > > > > > I'll try to create an implementation for Deduper which uses
> > > > > > WindowedOperator. Will open a PR soon for review.
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <david@datatorrent.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Bhupesh,
> > > > > > >
> > > > > > > I just added the method setFixedLateness(long millis) to
> > > > > > > AbstractWindowedOperator in my PR. This will allow you to
> specify
> > > the
> > > > > > > lateness with respect to the timestamp from the window ID
> without
> > > > > > watermark
> > > > > > > tuples from upstream.
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <
> > david@datatorrent.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Bhupesh,
> > > > > > > >
> > > > > > > > Yes, the windowed operator currently depends on the watermark
> > > > tuples
> > > > > > > > upstream for any "lateness" related operation. If there is no
> > > > > > watermark,
> > > > > > > > nothing will be considered late. We can add support for
> > lateness
> > > > > > handling
> > > > > > > > without incoming watermark tuples. Let me add that to the
> pull
> > > > > request.
> > > > > > > >
> > > > > > > > David
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <
> > > > bhupesh@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi David,
> > > > > > > >>
> > > > > > > >> Thanks for your reply.
> > > > > > > >>
> > > > > > > >> If I am to use a windowed operator for the Dedup operator,
> > there
> > > > > > should
> > > > > > > be
> > > > > > > >> some operator (upstream to Deduper) which sends the
> watermark
> > > > > tuples.
> > > > > > > >> These
> > > > > > > >> tuples (along with allowed lateness), will be the ones
> > deciding
> > > > > which
> > > > > > > >> incoming tuples are too late and will be dropped. I have the
> > > > > following
> > > > > > > >> questions:
> > > > > > > >>
> > > > > > > >> Is a windowed operator (which needs watermarks) dependent
> upon
> > > > some
> > > > > > > other
> > > > > > > >> operator for these tuples? What happens when there are no
> > > > watermark
> > > > > > > tuples
> > > > > > > >> sent from upstream?
> > > > > > > >>
> > > > > > > >> Can a windowed operator "*assume*" the watermark tuples
> based
> > on
> > > > > some
> > > > > > > >> notion of time? For example, can the Deduper, use the
> > streaming
> > > > > window
> > > > > > > >> time
> > > > > > > >> as the reference to advance the watermark?
> > > > > > > >>
> > > > > > > >> Thanks.
> > > > > > > >>
> > > > > > > >> ~ Bhupesh
> > > > > > > >>
> > > > > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <
> > > david@datatorrent.com>
> > > > > > > wrote:
> > > > > > > >>
> > > > > > > >> > Hi Bhupesh,
> > > > > > > >> >
> > > > > > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > > > > > >> WindowedStorage
> > > > > > > >> > and WindowedKeyedStorage:
> > > > > > > >> >
> > > > > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > > > > > >> >
> > > > > > > >> > We expect either to use ManagedState directly, or
> Spillable
> > > > > > > structures,
> > > > > > > >> > which in turn uses ManagedState.
> > > > > > > >> >
> > > > > > > >> > I'm not very familiar with the dedup operator. but in
> order
> > to
> > > > use
> > > > > > the
> > > > > > > >> > WindowedOperator, it sounds to me that we can use
> > > SlidingWindows
> > > > > > with
> > > > > > > an
> > > > > > > >> > implementation of WindowedKeyedStorage that uses a Bloom
> > > filter
> > > > to
> > > > > > > cover
> > > > > > > >> > most of the false cases.
> > > > > > > >> >
> > > > > > > >> > David
> > > > > > > >> >
> > > > > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> > > > > bhupesh@apache.org>
> > > > > > > >> wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi All,
> > > > > > > >> > >
> > > > > > > >> > > I have looked into Windowing concepts from Apache Beam
> and
> > > the
> > > > > PR
> > > > > > > >> #319 by
> > > > > > > >> > > David. Looks like there are a lot of advanced concepts
> > which
> > > > > could
> > > > > > > be
> > > > > > > >> > used
> > > > > > > >> > > by operators using event time windowing.
> > > > > > > >> > > Additionally I also looked at the Managed State
> > > > implementation.
> > > > > > > >> > >
> > > > > > > >> > > One of the things I noticed is that there is an overlap
> of
> > > > > > > >> functionality
> > > > > > > >> > > between Managed State and Windowing Support in terms of
> > the
> > > > > > > following:
> > > > > > > >> > >
> > > > > > > >> > >    - *Discarding / Dropping of tuples* from the system -
> > > > Managed
> > > > > > > State
> > > > > > > >> > uses
> > > > > > > >> > >    the concept of expiry while a Windowed operator uses
> > the
> > > > > > concepts
> > > > > > > >> of
> > > > > > > >> > >    Watermarks and allowed lateness. If I try to
> reconcile
> > > the
> > > > > > above
> > > > > > > >> two,
> > > > > > > >> > it
> > > > > > > >> > >    seems like Managed State (through TimeBucketAssigner)
> > is
> > > > > trying
> > > > > > > to
> > > > > > > >> > >    implement some sort of implicit heuristic Watermarks
> > > based
> > > > on
> > > > > > > >> either
> > > > > > > >> > the
> > > > > > > >> > >    user supplied time or the event time.
> > > > > > > >> > >    - *Global Window* support - Once we have an option to
> > > > disable
> > > > > > > >> purging
> > > > > > > >> > in
> > > > > > > >> > >    Managed State, it will have similar semantics to the
> > > Global
> > > > > > > Window
> > > > > > > >> > > option
> > > > > > > >> > >    in Windowing support.
> > > > > > > >> > >
> > > > > > > >> > > If I understand correctly, is the suggestion to
> implement
> > > the
> > > > > > Dedup
> > > > > > > >> > > operator as a Windowed operator and to use managed state
> > > only
> > > > > as a
> > > > > > > >> > storage
> > > > > > > >> > > medium (through WindowedStorage) ? What could be a
> better
> > > way
> > > > of
> > > > > > > going
> > > > > > > >> > > about this?
> > > > > > > >> > >
> > > > > > > >> > > Thanks.
> > > > > > > >> > >
> > > > > > > >> > > ~ Bhupesh
> > > > > > > >> > >
> > > > > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > > > > > bhupesh@apache.org>
> > > > > > > >> > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Hi Thomas,
> > > > > > > >> > > >
> > > > > > > >> > > > I agree that the case of processing bounded data is a
> > > > special
> > > > > > case
> > > > > > > >> of
> > > > > > > >> > > > unbounded data.
> > > > > > > >> > > > Th difference I was pointing out was in terms of
> expiry.
> > > > This
> > > > > is
> > > > > > > not
> > > > > > > >> > > > applicable in case of bounded data sets, while
> unbounded
> > > > data
> > > > > > sets
> > > > > > > >> will
> > > > > > > >> > > > inherently use expiry for limiting the amount of data
> to
> > > be
> > > > > > > stored.
> > > > > > > >> > > >
> > > > > > > >> > > > For idempotency when applying expiry on the streaming
> > > data,
> > > > I
> > > > > > need
> > > > > > > >> to
> > > > > > > >> > > > explore more on the using the window timestamp that
> you
> > > > > proposed
> > > > > > > as
> > > > > > > >> > > opposed
> > > > > > > >> > > > to the system time which I was planning to use.
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks.
> > > > > > > >> > > > ~ Bhupesh
> > > > > > > >> > > >
> > > > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > > > > > >> thomas@datatorrent.com>
> > > > > > > >> > > > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > >> Bhupesh,
> > > > > > > >> > > >>
> > > > > > > >> > > >> Why is there a distinction between bounded and
> > unbounded
> > > > > data?
> > > > > > I
> > > > > > > >> see
> > > > > > > >> > the
> > > > > > > >> > > >> former as a special case of the latter?
> > > > > > > >> > > >>
> > > > > > > >> > > >> When rewinding the stream or reprocessing the stream
> in
> > > > > another
> > > > > > > run
> > > > > > > >> > the
> > > > > > > >> > > >> operator should produce the same result.
> > > > > > > >> > > >>
> > > > > > > >> > > >> This operator should be idempotent also. That implies
> > > that
> > > > > code
> > > > > > > >> does
> > > > > > > >> > not
> > > > > > > >> > > >> rely on current system time but the window timestamp
> > > > instead.
> > > > > > > >> > > >>
> > > > > > > >> > > >> All of this should be accomplished by using the
> > windowing
> > > > > > > support:
> > > > > > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > > > > > >> > > >>
> > > > > > > >> > > >> Thanks,
> > > > > > > >> > > >> Thomas
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > > > > > >> > > bhupesh@datatorrent.com>
> > > > > > > >> > > >> wrote:
> > > > > > > >> > > >>
> > > > > > > >> > > >> > Hi All,
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > I want to validate the use cases for de-duplication
> > > that
> > > > > will
> > > > > > > be
> > > > > > > >> > going
> > > > > > > >> > > >> as
> > > > > > > >> > > >> > part of this implementation.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> >    - *Bounded data set*
> > > > > > > >> > > >> >       - This is de-duplication for bounded data.
> For
> > > > > example,
> > > > > > > >> data
> > > > > > > >> > > sets
> > > > > > > >> > > >> >       which are old or fixed or which may not have
> a
> > > time
> > > > > > field
> > > > > > > >> at
> > > > > > > >> > > >> > all. Example:
> > > > > > > >> > > >> >       Last year's transaction records or Customer
> > data
> > > > etc.
> > > > > > > >> > > >> >       - Concept of expiry is not needed as this is
> > > > bounded
> > > > > > data
> > > > > > > >> set.
> > > > > > > >> > > >> >       - *Unbounded data set*
> > > > > > > >> > > >> >       - This is de-duplication of online streaming
> > data
> > > > > > > >> > > >> >       - Expiry is needed because here incoming
> tuples
> > > may
> > > > > > > arrive
> > > > > > > >> > later
> > > > > > > >> > > >> than
> > > > > > > >> > > >> >       what they are expected. Expiry is always
> > computed
> > > > by
> > > > > > > taking
> > > > > > > >> > the
> > > > > > > >> > > >> > difference
> > > > > > > >> > > >> >       in System time and the Event time.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > Any feedback is appreciated.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > Thanks.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > ~ Bhupesh
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > > > > > >> > > >> bhupesh@datatorrent.com>
> > > > > > > >> > > >> > wrote:
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > > Hi All,
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > I am working on adding a De-duplication operator
> in
> > > > > Malhar
> > > > > > > >> library
> > > > > > > >> > > >> based
> > > > > > > >> > > >> > > on managed state APIs. I will be working off the
> > > > already
> > > > > > > >> created
> > > > > > > >> > > JIRA
> > > > > > > >> > > >> -
> > > > > > > >> > > >> > >
> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701
> > > > > and
> > > > > > > the
> > > > > > > >> > > initial
> > > > > > > >> > > >> > > pull request for an AbstractDeduper here:
> > > > > > > >> > > >> > >
> > https://github.com/apache/apex-malhar/pull/260/files
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > I am planning to include the following features
> in
> > > the
> > > > > > first
> > > > > > > >> > > version:
> > > > > > > >> > > >> > > 1. Time based de-duplication. Assumption:
> Tuple_Key
> > > ->
> > > > > > > >> Tuple_Time
> > > > > > > >> > > >> > > correlation holds.
> > > > > > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > > > > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate
> > and
> > > > > > expired
> > > > > > > >> > tuples
> > > > > > > >> > > >> > > respectively.
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > Thanks.
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > ~ Bhupesh
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> >
> > > > > > > >> > > >>
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Thomas Weise <th...@datatorrent.com>.
+1 on the suggested way forward

No clear why you say the windows are fixed though. What if I want the dedup
to happen based on the most recent event with a given key + n time units?


On Mon, Jul 18, 2016 at 9:05 AM, Bhupesh Chawda <bh...@apache.org> wrote:

> I can see that Dedup seems like a case where state is continuously merged
> with older state. State in this case is the set of unique tuples. However,
> for Dedup use case, the event windows are, in a way, fixed, and do not
> depend on the incoming tuples. In-coming tuples are just *assigned* to
> these windows. The point I am trying to make is that the older event
> windows will be purged (depending on the lateness configuration and
> watermarks) irrespective of the incoming tuples. Session windows on the
> other hand depend on the incoming tuples and are not fixed, and change with
> incoming data. Perhaps we should not model this use case as a session
> window.
>
> I agree that we cannot decide the approach to be followed with the current
> memory backed storage implementation. Actually, even when we have seen a
> managed state backed implementation for windowed storage, I am worried that
> the interfaces won't still be flexible enough as compared to direct usage
> of managed state and will need custom changes to fit the Dedup use case. I
> am looking at it from the perspective of asynchronous processing which will
> be necessary once we have disk IO involved for processing incoming tuples.
>
> I will suggest we move ahead with the managed state implementation for
> Deduper. We can pick up the Windowed operator based implementation once we
> have all the necessary features like windowed storage backed by managed
> state, input operators with watermark tuple support etc.
>
> Suggestions?
>
> ~ Bhupesh
>
>
> On Mon, Jul 18, 2016 at 11:29 AM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
> > Hi Bhupesh,
> >
> > Dedup is different with regard to state accumulation. For other windowed
> > operations, we collect state and then emit a result after a period of
> time
> > (trigger or watermark). Here, we only need the state to detect the
> > duplicate. Hence, it is inefficient to collect a list of tuples to
> > determine that a subsequently arriving tuple is a duplicate or not. But
> > isn't this scenario similar to the session window, where state is
> > continuously merged.
> >
> > I would prefer to see more analysis on performance and scalability to
> large
> > key cardinality. The window operator only has the memory backed window
> > store at this time. Until there is a managed state backed implementation
> > that has seen benchmarking, we cannot really use it as baseline for
> further
> > implementations on top of it.
> >
> > Thomas
> >
> >
> > On Thu, Jul 14, 2016 at 7:55 PM, Bhupesh Chawda <bh...@apache.org>
> > wrote:
> >
> > > Hi All,
> > >
> > > I also implemented a De-duplication operator using Windowed Operator.
> Now
> > > we have two implementations, one with Managed state and another using
> > > Windowed operator. Here are their details:
> > >
> > >    1. *With Managed State - *
> > >    - The operator is implemented using managed state as the storage for
> > >       buckets into which the tuples will be stored.
> > >       - *TimeBucketAssigner* is used to assign an incoming tuple to
> > >       different buckets based on the event time. It is also used to
> > > identify
> > >       whether a particular tuple is expired and should be sent to the
> > > expired
> > >       port / dropped.
> > >       - For managed state, the *ManagedTimeUnifiedStateImpl*
> > implementation
> > >       is used which just requires the user to specify the event time
> > > and a bucket
> > >       is automatically assigned based on that. The structure of the
> > bucket
> > > data
> > >       on storage is as follows: /operator_id /time_bucket
> > >       - An advantage of using Managed State approach is that we don't
> > have
> > >       to assume the correlation of event time to the de-duplication key
> > of
> > > the
> > >       tuple. For example, if we get two tuples like: (K1, T1), and (K1,
> > > T2), we
> > >       can still use ManagedStateImpl and conclude that these tuples are
> > >       duplicates based on the Key K1.
> > >       2. *With Windowed Operator - *
> > >    - The operator uses the WindowedOperatorImpl as the base operator.
> > >       - Accumulation, for the deduper, basically amounts to storing a
> > list
> > >       of tuples in the data storage. Every time we get a unique tuple,
> we
> > >       *accumulate* it in the list.
> > >       - Event windows are modeled using the *TimeWindow* option.
> Although
> > >       SlidingTimeWIndows seems to be intuitive for data buckets, it
> seems
> > > to be
> > >       the costly option as the accumulation in this case is not just
> > > an aggregate
> > >       value but a list of values in that bucket.
> > >       - Watermarks are not assumed to be sent from an input operator
> > >       (although it is okay if an upstream operator sends them). The
> > >       *fixedWatermark* feature is used to assume watermarks which are
> > >       relative to the window time.
> > >       - One of the issues I found with using WindowedOperator for Dedup
> > is
> > >       that event time is tightly coupled with the de-duplication key.
> In
> > > the
> > >       above example, (K1, T1), and (K1, T2) *might* be concluded as two
> > >       unique tuples since T1 and T2 may fall into two different time
> > > buckets.
> > >
> > > Here are the PRs for both of them.
> > >
> > >    - Using Managed State:
> https://github.com/apache/apex-malhar/pull/335
> > >    - Using Windowed Operator:
> > > https://github.com/apache/apex-malhar/pull/343
> > >
> > > Please review them and suggest on the correct approach for the final
> > > implementation which should be used to add other features like fault
> > > tolerance, scalability, optimizations etc.
> > > Thanks.
> > >
> > > ~ Bhupesh
> > >
> > > On Fri, Jul 8, 2016 at 11:30 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > No problem.
> > > >
> > > > By the way, I changed the method name to setFixedWatermark. And also,
> > if
> > > > you want to drop any tuples that are considered late, you need to set
> > the
> > > > allowed lateness to be 0.
> > > >
> > > > David
> > > >
> > > > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bh...@apache.org>
> > > wrote:
> > > >
> > > > > Thanks David.
> > > > > I'll try to create an implementation for Deduper which uses
> > > > > WindowedOperator. Will open a PR soon for review.
> > > > >
> > > > > ~ Bhupesh
> > > > >
> > > > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > > >
> > > > > > Hi Bhupesh,
> > > > > >
> > > > > > I just added the method setFixedLateness(long millis) to
> > > > > > AbstractWindowedOperator in my PR. This will allow you to specify
> > the
> > > > > > lateness with respect to the timestamp from the window ID without
> > > > > watermark
> > > > > > tuples from upstream.
> > > > > >
> > > > > > David
> > > > > >
> > > > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <
> david@datatorrent.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Bhupesh,
> > > > > > >
> > > > > > > Yes, the windowed operator currently depends on the watermark
> > > tuples
> > > > > > > upstream for any "lateness" related operation. If there is no
> > > > > watermark,
> > > > > > > nothing will be considered late. We can add support for
> lateness
> > > > > handling
> > > > > > > without incoming watermark tuples. Let me add that to the pull
> > > > request.
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <
> > > bhupesh@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi David,
> > > > > > >>
> > > > > > >> Thanks for your reply.
> > > > > > >>
> > > > > > >> If I am to use a windowed operator for the Dedup operator,
> there
> > > > > should
> > > > > > be
> > > > > > >> some operator (upstream to Deduper) which sends the watermark
> > > > tuples.
> > > > > > >> These
> > > > > > >> tuples (along with allowed lateness), will be the ones
> deciding
> > > > which
> > > > > > >> incoming tuples are too late and will be dropped. I have the
> > > > following
> > > > > > >> questions:
> > > > > > >>
> > > > > > >> Is a windowed operator (which needs watermarks) dependent upon
> > > some
> > > > > > other
> > > > > > >> operator for these tuples? What happens when there are no
> > > watermark
> > > > > > tuples
> > > > > > >> sent from upstream?
> > > > > > >>
> > > > > > >> Can a windowed operator "*assume*" the watermark tuples based
> on
> > > > some
> > > > > > >> notion of time? For example, can the Deduper, use the
> streaming
> > > > window
> > > > > > >> time
> > > > > > >> as the reference to advance the watermark?
> > > > > > >>
> > > > > > >> Thanks.
> > > > > > >>
> > > > > > >> ~ Bhupesh
> > > > > > >>
> > > > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <
> > david@datatorrent.com>
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > Hi Bhupesh,
> > > > > > >> >
> > > > > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > > > > >> WindowedStorage
> > > > > > >> > and WindowedKeyedStorage:
> > > > > > >> >
> > > > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > > > > >> >
> > > > > > >> > We expect either to use ManagedState directly, or Spillable
> > > > > > structures,
> > > > > > >> > which in turn uses ManagedState.
> > > > > > >> >
> > > > > > >> > I'm not very familiar with the dedup operator. but in order
> to
> > > use
> > > > > the
> > > > > > >> > WindowedOperator, it sounds to me that we can use
> > SlidingWindows
> > > > > with
> > > > > > an
> > > > > > >> > implementation of WindowedKeyedStorage that uses a Bloom
> > filter
> > > to
> > > > > > cover
> > > > > > >> > most of the false cases.
> > > > > > >> >
> > > > > > >> > David
> > > > > > >> >
> > > > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> > > > bhupesh@apache.org>
> > > > > > >> wrote:
> > > > > > >> >
> > > > > > >> > > Hi All,
> > > > > > >> > >
> > > > > > >> > > I have looked into Windowing concepts from Apache Beam and
> > the
> > > > PR
> > > > > > >> #319 by
> > > > > > >> > > David. Looks like there are a lot of advanced concepts
> which
> > > > could
> > > > > > be
> > > > > > >> > used
> > > > > > >> > > by operators using event time windowing.
> > > > > > >> > > Additionally I also looked at the Managed State
> > > implementation.
> > > > > > >> > >
> > > > > > >> > > One of the things I noticed is that there is an overlap of
> > > > > > >> functionality
> > > > > > >> > > between Managed State and Windowing Support in terms of
> the
> > > > > > following:
> > > > > > >> > >
> > > > > > >> > >    - *Discarding / Dropping of tuples* from the system -
> > > Managed
> > > > > > State
> > > > > > >> > uses
> > > > > > >> > >    the concept of expiry while a Windowed operator uses
> the
> > > > > concepts
> > > > > > >> of
> > > > > > >> > >    Watermarks and allowed lateness. If I try to reconcile
> > the
> > > > > above
> > > > > > >> two,
> > > > > > >> > it
> > > > > > >> > >    seems like Managed State (through TimeBucketAssigner)
> is
> > > > trying
> > > > > > to
> > > > > > >> > >    implement some sort of implicit heuristic Watermarks
> > based
> > > on
> > > > > > >> either
> > > > > > >> > the
> > > > > > >> > >    user supplied time or the event time.
> > > > > > >> > >    - *Global Window* support - Once we have an option to
> > > disable
> > > > > > >> purging
> > > > > > >> > in
> > > > > > >> > >    Managed State, it will have similar semantics to the
> > Global
> > > > > > Window
> > > > > > >> > > option
> > > > > > >> > >    in Windowing support.
> > > > > > >> > >
> > > > > > >> > > If I understand correctly, is the suggestion to implement
> > the
> > > > > Dedup
> > > > > > >> > > operator as a Windowed operator and to use managed state
> > only
> > > > as a
> > > > > > >> > storage
> > > > > > >> > > medium (through WindowedStorage) ? What could be a better
> > way
> > > of
> > > > > > going
> > > > > > >> > > about this?
> > > > > > >> > >
> > > > > > >> > > Thanks.
> > > > > > >> > >
> > > > > > >> > > ~ Bhupesh
> > > > > > >> > >
> > > > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > > > > bhupesh@apache.org>
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hi Thomas,
> > > > > > >> > > >
> > > > > > >> > > > I agree that the case of processing bounded data is a
> > > special
> > > > > case
> > > > > > >> of
> > > > > > >> > > > unbounded data.
> > > > > > >> > > > Th difference I was pointing out was in terms of expiry.
> > > This
> > > > is
> > > > > > not
> > > > > > >> > > > applicable in case of bounded data sets, while unbounded
> > > data
> > > > > sets
> > > > > > >> will
> > > > > > >> > > > inherently use expiry for limiting the amount of data to
> > be
> > > > > > stored.
> > > > > > >> > > >
> > > > > > >> > > > For idempotency when applying expiry on the streaming
> > data,
> > > I
> > > > > need
> > > > > > >> to
> > > > > > >> > > > explore more on the using the window timestamp that you
> > > > proposed
> > > > > > as
> > > > > > >> > > opposed
> > > > > > >> > > > to the system time which I was planning to use.
> > > > > > >> > > >
> > > > > > >> > > > Thanks.
> > > > > > >> > > > ~ Bhupesh
> > > > > > >> > > >
> > > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > > > > >> thomas@datatorrent.com>
> > > > > > >> > > > wrote:
> > > > > > >> > > >
> > > > > > >> > > >> Bhupesh,
> > > > > > >> > > >>
> > > > > > >> > > >> Why is there a distinction between bounded and
> unbounded
> > > > data?
> > > > > I
> > > > > > >> see
> > > > > > >> > the
> > > > > > >> > > >> former as a special case of the latter?
> > > > > > >> > > >>
> > > > > > >> > > >> When rewinding the stream or reprocessing the stream in
> > > > another
> > > > > > run
> > > > > > >> > the
> > > > > > >> > > >> operator should produce the same result.
> > > > > > >> > > >>
> > > > > > >> > > >> This operator should be idempotent also. That implies
> > that
> > > > code
> > > > > > >> does
> > > > > > >> > not
> > > > > > >> > > >> rely on current system time but the window timestamp
> > > instead.
> > > > > > >> > > >>
> > > > > > >> > > >> All of this should be accomplished by using the
> windowing
> > > > > > support:
> > > > > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > > > > >> > > >>
> > > > > > >> > > >> Thanks,
> > > > > > >> > > >> Thomas
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > > > > >> > > bhupesh@datatorrent.com>
> > > > > > >> > > >> wrote:
> > > > > > >> > > >>
> > > > > > >> > > >> > Hi All,
> > > > > > >> > > >> >
> > > > > > >> > > >> > I want to validate the use cases for de-duplication
> > that
> > > > will
> > > > > > be
> > > > > > >> > going
> > > > > > >> > > >> as
> > > > > > >> > > >> > part of this implementation.
> > > > > > >> > > >> >
> > > > > > >> > > >> >    - *Bounded data set*
> > > > > > >> > > >> >       - This is de-duplication for bounded data. For
> > > > example,
> > > > > > >> data
> > > > > > >> > > sets
> > > > > > >> > > >> >       which are old or fixed or which may not have a
> > time
> > > > > field
> > > > > > >> at
> > > > > > >> > > >> > all. Example:
> > > > > > >> > > >> >       Last year's transaction records or Customer
> data
> > > etc.
> > > > > > >> > > >> >       - Concept of expiry is not needed as this is
> > > bounded
> > > > > data
> > > > > > >> set.
> > > > > > >> > > >> >       - *Unbounded data set*
> > > > > > >> > > >> >       - This is de-duplication of online streaming
> data
> > > > > > >> > > >> >       - Expiry is needed because here incoming tuples
> > may
> > > > > > arrive
> > > > > > >> > later
> > > > > > >> > > >> than
> > > > > > >> > > >> >       what they are expected. Expiry is always
> computed
> > > by
> > > > > > taking
> > > > > > >> > the
> > > > > > >> > > >> > difference
> > > > > > >> > > >> >       in System time and the Event time.
> > > > > > >> > > >> >
> > > > > > >> > > >> > Any feedback is appreciated.
> > > > > > >> > > >> >
> > > > > > >> > > >> > Thanks.
> > > > > > >> > > >> >
> > > > > > >> > > >> > ~ Bhupesh
> > > > > > >> > > >> >
> > > > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > > > > >> > > >> bhupesh@datatorrent.com>
> > > > > > >> > > >> > wrote:
> > > > > > >> > > >> >
> > > > > > >> > > >> > > Hi All,
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > I am working on adding a De-duplication operator in
> > > > Malhar
> > > > > > >> library
> > > > > > >> > > >> based
> > > > > > >> > > >> > > on managed state APIs. I will be working off the
> > > already
> > > > > > >> created
> > > > > > >> > > JIRA
> > > > > > >> > > >> -
> > > > > > >> > > >> > >
> > https://issues.apache.org/jira/browse/APEXMALHAR-1701
> > > > and
> > > > > > the
> > > > > > >> > > initial
> > > > > > >> > > >> > > pull request for an AbstractDeduper here:
> > > > > > >> > > >> > >
> https://github.com/apache/apex-malhar/pull/260/files
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > I am planning to include the following features in
> > the
> > > > > first
> > > > > > >> > > version:
> > > > > > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key
> > ->
> > > > > > >> Tuple_Time
> > > > > > >> > > >> > > correlation holds.
> > > > > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > > > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate
> and
> > > > > expired
> > > > > > >> > tuples
> > > > > > >> > > >> > > respectively.
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > Thanks.
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > ~ Bhupesh
> > > > > > >> > > >> > >
> > > > > > >> > > >> >
> > > > > > >> > > >>
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Fwd: APEXMALHAR-1701 Deduper in Malhar

Posted by Bhupesh Chawda <bh...@apache.org>.
I can see that Dedup seems like a case where state is continuously merged
with older state. State in this case is the set of unique tuples. However,
for Dedup use case, the event windows are, in a way, fixed, and do not
depend on the incoming tuples. In-coming tuples are just *assigned* to
these windows. The point I am trying to make is that the older event
windows will be purged (depending on the lateness configuration and
watermarks) irrespective of the incoming tuples. Session windows on the
other hand depend on the incoming tuples and are not fixed, and change with
incoming data. Perhaps we should not model this use case as a session
window.

I agree that we cannot decide the approach to be followed with the current
memory backed storage implementation. Actually, even when we have seen a
managed state backed implementation for windowed storage, I am worried that
the interfaces won't still be flexible enough as compared to direct usage
of managed state and will need custom changes to fit the Dedup use case. I
am looking at it from the perspective of asynchronous processing which will
be necessary once we have disk IO involved for processing incoming tuples.

I will suggest we move ahead with the managed state implementation for
Deduper. We can pick up the Windowed operator based implementation once we
have all the necessary features like windowed storage backed by managed
state, input operators with watermark tuple support etc.

Suggestions?

~ Bhupesh


On Mon, Jul 18, 2016 at 11:29 AM, Thomas Weise <th...@datatorrent.com>
wrote:

> Hi Bhupesh,
>
> Dedup is different with regard to state accumulation. For other windowed
> operations, we collect state and then emit a result after a period of time
> (trigger or watermark). Here, we only need the state to detect the
> duplicate. Hence, it is inefficient to collect a list of tuples to
> determine that a subsequently arriving tuple is a duplicate or not. But
> isn't this scenario similar to the session window, where state is
> continuously merged.
>
> I would prefer to see more analysis on performance and scalability to large
> key cardinality. The window operator only has the memory backed window
> store at this time. Until there is a managed state backed implementation
> that has seen benchmarking, we cannot really use it as baseline for further
> implementations on top of it.
>
> Thomas
>
>
> On Thu, Jul 14, 2016 at 7:55 PM, Bhupesh Chawda <bh...@apache.org>
> wrote:
>
> > Hi All,
> >
> > I also implemented a De-duplication operator using Windowed Operator. Now
> > we have two implementations, one with Managed state and another using
> > Windowed operator. Here are their details:
> >
> >    1. *With Managed State - *
> >    - The operator is implemented using managed state as the storage for
> >       buckets into which the tuples will be stored.
> >       - *TimeBucketAssigner* is used to assign an incoming tuple to
> >       different buckets based on the event time. It is also used to
> > identify
> >       whether a particular tuple is expired and should be sent to the
> > expired
> >       port / dropped.
> >       - For managed state, the *ManagedTimeUnifiedStateImpl*
> implementation
> >       is used which just requires the user to specify the event time
> > and a bucket
> >       is automatically assigned based on that. The structure of the
> bucket
> > data
> >       on storage is as follows: /operator_id /time_bucket
> >       - An advantage of using Managed State approach is that we don't
> have
> >       to assume the correlation of event time to the de-duplication key
> of
> > the
> >       tuple. For example, if we get two tuples like: (K1, T1), and (K1,
> > T2), we
> >       can still use ManagedStateImpl and conclude that these tuples are
> >       duplicates based on the Key K1.
> >       2. *With Windowed Operator - *
> >    - The operator uses the WindowedOperatorImpl as the base operator.
> >       - Accumulation, for the deduper, basically amounts to storing a
> list
> >       of tuples in the data storage. Every time we get a unique tuple, we
> >       *accumulate* it in the list.
> >       - Event windows are modeled using the *TimeWindow* option. Although
> >       SlidingTimeWIndows seems to be intuitive for data buckets, it seems
> > to be
> >       the costly option as the accumulation in this case is not just
> > an aggregate
> >       value but a list of values in that bucket.
> >       - Watermarks are not assumed to be sent from an input operator
> >       (although it is okay if an upstream operator sends them). The
> >       *fixedWatermark* feature is used to assume watermarks which are
> >       relative to the window time.
> >       - One of the issues I found with using WindowedOperator for Dedup
> is
> >       that event time is tightly coupled with the de-duplication key. In
> > the
> >       above example, (K1, T1), and (K1, T2) *might* be concluded as two
> >       unique tuples since T1 and T2 may fall into two different time
> > buckets.
> >
> > Here are the PRs for both of them.
> >
> >    - Using Managed State: https://github.com/apache/apex-malhar/pull/335
> >    - Using Windowed Operator:
> > https://github.com/apache/apex-malhar/pull/343
> >
> > Please review them and suggest on the correct approach for the final
> > implementation which should be used to add other features like fault
> > tolerance, scalability, optimizations etc.
> > Thanks.
> >
> > ~ Bhupesh
> >
> > On Fri, Jul 8, 2016 at 11:30 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > No problem.
> > >
> > > By the way, I changed the method name to setFixedWatermark. And also,
> if
> > > you want to drop any tuples that are considered late, you need to set
> the
> > > allowed lateness to be 0.
> > >
> > > David
> > >
> > > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bh...@apache.org>
> > wrote:
> > >
> > > > Thanks David.
> > > > I'll try to create an implementation for Deduper which uses
> > > > WindowedOperator. Will open a PR soon for review.
> > > >
> > > > ~ Bhupesh
> > > >
> > > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > > >
> > > > > Hi Bhupesh,
> > > > >
> > > > > I just added the method setFixedLateness(long millis) to
> > > > > AbstractWindowedOperator in my PR. This will allow you to specify
> the
> > > > > lateness with respect to the timestamp from the window ID without
> > > > watermark
> > > > > tuples from upstream.
> > > > >
> > > > > David
> > > > >
> > > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <da...@datatorrent.com>
> > > > wrote:
> > > > >
> > > > > > Hi Bhupesh,
> > > > > >
> > > > > > Yes, the windowed operator currently depends on the watermark
> > tuples
> > > > > > upstream for any "lateness" related operation. If there is no
> > > > watermark,
> > > > > > nothing will be considered late. We can add support for lateness
> > > > handling
> > > > > > without incoming watermark tuples. Let me add that to the pull
> > > request.
> > > > > >
> > > > > > David
> > > > > >
> > > > > >
> > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <
> > bhupesh@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi David,
> > > > > >>
> > > > > >> Thanks for your reply.
> > > > > >>
> > > > > >> If I am to use a windowed operator for the Dedup operator, there
> > > > should
> > > > > be
> > > > > >> some operator (upstream to Deduper) which sends the watermark
> > > tuples.
> > > > > >> These
> > > > > >> tuples (along with allowed lateness), will be the ones deciding
> > > which
> > > > > >> incoming tuples are too late and will be dropped. I have the
> > > following
> > > > > >> questions:
> > > > > >>
> > > > > >> Is a windowed operator (which needs watermarks) dependent upon
> > some
> > > > > other
> > > > > >> operator for these tuples? What happens when there are no
> > watermark
> > > > > tuples
> > > > > >> sent from upstream?
> > > > > >>
> > > > > >> Can a windowed operator "*assume*" the watermark tuples based on
> > > some
> > > > > >> notion of time? For example, can the Deduper, use the streaming
> > > window
> > > > > >> time
> > > > > >> as the reference to advance the watermark?
> > > > > >>
> > > > > >> Thanks.
> > > > > >>
> > > > > >> ~ Bhupesh
> > > > > >>
> > > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <
> david@datatorrent.com>
> > > > > wrote:
> > > > > >>
> > > > > >> > Hi Bhupesh,
> > > > > >> >
> > > > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > > > >> WindowedStorage
> > > > > >> > and WindowedKeyedStorage:
> > > > > >> >
> > > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > > > >> >
> > > > > >> > We expect either to use ManagedState directly, or Spillable
> > > > > structures,
> > > > > >> > which in turn uses ManagedState.
> > > > > >> >
> > > > > >> > I'm not very familiar with the dedup operator. but in order to
> > use
> > > > the
> > > > > >> > WindowedOperator, it sounds to me that we can use
> SlidingWindows
> > > > with
> > > > > an
> > > > > >> > implementation of WindowedKeyedStorage that uses a Bloom
> filter
> > to
> > > > > cover
> > > > > >> > most of the false cases.
> > > > > >> >
> > > > > >> > David
> > > > > >> >
> > > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> > > bhupesh@apache.org>
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > Hi All,
> > > > > >> > >
> > > > > >> > > I have looked into Windowing concepts from Apache Beam and
> the
> > > PR
> > > > > >> #319 by
> > > > > >> > > David. Looks like there are a lot of advanced concepts which
> > > could
> > > > > be
> > > > > >> > used
> > > > > >> > > by operators using event time windowing.
> > > > > >> > > Additionally I also looked at the Managed State
> > implementation.
> > > > > >> > >
> > > > > >> > > One of the things I noticed is that there is an overlap of
> > > > > >> functionality
> > > > > >> > > between Managed State and Windowing Support in terms of the
> > > > > following:
> > > > > >> > >
> > > > > >> > >    - *Discarding / Dropping of tuples* from the system -
> > Managed
> > > > > State
> > > > > >> > uses
> > > > > >> > >    the concept of expiry while a Windowed operator uses the
> > > > concepts
> > > > > >> of
> > > > > >> > >    Watermarks and allowed lateness. If I try to reconcile
> the
> > > > above
> > > > > >> two,
> > > > > >> > it
> > > > > >> > >    seems like Managed State (through TimeBucketAssigner) is
> > > trying
> > > > > to
> > > > > >> > >    implement some sort of implicit heuristic Watermarks
> based
> > on
> > > > > >> either
> > > > > >> > the
> > > > > >> > >    user supplied time or the event time.
> > > > > >> > >    - *Global Window* support - Once we have an option to
> > disable
> > > > > >> purging
> > > > > >> > in
> > > > > >> > >    Managed State, it will have similar semantics to the
> Global
> > > > > Window
> > > > > >> > > option
> > > > > >> > >    in Windowing support.
> > > > > >> > >
> > > > > >> > > If I understand correctly, is the suggestion to implement
> the
> > > > Dedup
> > > > > >> > > operator as a Windowed operator and to use managed state
> only
> > > as a
> > > > > >> > storage
> > > > > >> > > medium (through WindowedStorage) ? What could be a better
> way
> > of
> > > > > going
> > > > > >> > > about this?
> > > > > >> > >
> > > > > >> > > Thanks.
> > > > > >> > >
> > > > > >> > > ~ Bhupesh
> > > > > >> > >
> > > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > > > bhupesh@apache.org>
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > Hi Thomas,
> > > > > >> > > >
> > > > > >> > > > I agree that the case of processing bounded data is a
> > special
> > > > case
> > > > > >> of
> > > > > >> > > > unbounded data.
> > > > > >> > > > Th difference I was pointing out was in terms of expiry.
> > This
> > > is
> > > > > not
> > > > > >> > > > applicable in case of bounded data sets, while unbounded
> > data
> > > > sets
> > > > > >> will
> > > > > >> > > > inherently use expiry for limiting the amount of data to
> be
> > > > > stored.
> > > > > >> > > >
> > > > > >> > > > For idempotency when applying expiry on the streaming
> data,
> > I
> > > > need
> > > > > >> to
> > > > > >> > > > explore more on the using the window timestamp that you
> > > proposed
> > > > > as
> > > > > >> > > opposed
> > > > > >> > > > to the system time which I was planning to use.
> > > > > >> > > >
> > > > > >> > > > Thanks.
> > > > > >> > > > ~ Bhupesh
> > > > > >> > > >
> > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > > > >> thomas@datatorrent.com>
> > > > > >> > > > wrote:
> > > > > >> > > >
> > > > > >> > > >> Bhupesh,
> > > > > >> > > >>
> > > > > >> > > >> Why is there a distinction between bounded and unbounded
> > > data?
> > > > I
> > > > > >> see
> > > > > >> > the
> > > > > >> > > >> former as a special case of the latter?
> > > > > >> > > >>
> > > > > >> > > >> When rewinding the stream or reprocessing the stream in
> > > another
> > > > > run
> > > > > >> > the
> > > > > >> > > >> operator should produce the same result.
> > > > > >> > > >>
> > > > > >> > > >> This operator should be idempotent also. That implies
> that
> > > code
> > > > > >> does
> > > > > >> > not
> > > > > >> > > >> rely on current system time but the window timestamp
> > instead.
> > > > > >> > > >>
> > > > > >> > > >> All of this should be accomplished by using the windowing
> > > > > support:
> > > > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > > > >> > > >>
> > > > > >> > > >> Thanks,
> > > > > >> > > >> Thomas
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > > > >> > > bhupesh@datatorrent.com>
> > > > > >> > > >> wrote:
> > > > > >> > > >>
> > > > > >> > > >> > Hi All,
> > > > > >> > > >> >
> > > > > >> > > >> > I want to validate the use cases for de-duplication
> that
> > > will
> > > > > be
> > > > > >> > going
> > > > > >> > > >> as
> > > > > >> > > >> > part of this implementation.
> > > > > >> > > >> >
> > > > > >> > > >> >    - *Bounded data set*
> > > > > >> > > >> >       - This is de-duplication for bounded data. For
> > > example,
> > > > > >> data
> > > > > >> > > sets
> > > > > >> > > >> >       which are old or fixed or which may not have a
> time
> > > > field
> > > > > >> at
> > > > > >> > > >> > all. Example:
> > > > > >> > > >> >       Last year's transaction records or Customer data
> > etc.
> > > > > >> > > >> >       - Concept of expiry is not needed as this is
> > bounded
> > > > data
> > > > > >> set.
> > > > > >> > > >> >       - *Unbounded data set*
> > > > > >> > > >> >       - This is de-duplication of online streaming data
> > > > > >> > > >> >       - Expiry is needed because here incoming tuples
> may
> > > > > arrive
> > > > > >> > later
> > > > > >> > > >> than
> > > > > >> > > >> >       what they are expected. Expiry is always computed
> > by
> > > > > taking
> > > > > >> > the
> > > > > >> > > >> > difference
> > > > > >> > > >> >       in System time and the Event time.
> > > > > >> > > >> >
> > > > > >> > > >> > Any feedback is appreciated.
> > > > > >> > > >> >
> > > > > >> > > >> > Thanks.
> > > > > >> > > >> >
> > > > > >> > > >> > ~ Bhupesh
> > > > > >> > > >> >
> > > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > > > >> > > >> bhupesh@datatorrent.com>
> > > > > >> > > >> > wrote:
> > > > > >> > > >> >
> > > > > >> > > >> > > Hi All,
> > > > > >> > > >> > >
> > > > > >> > > >> > > I am working on adding a De-duplication operator in
> > > Malhar
> > > > > >> library
> > > > > >> > > >> based
> > > > > >> > > >> > > on managed state APIs. I will be working off the
> > already
> > > > > >> created
> > > > > >> > > JIRA
> > > > > >> > > >> -
> > > > > >> > > >> > >
> https://issues.apache.org/jira/browse/APEXMALHAR-1701
> > > and
> > > > > the
> > > > > >> > > initial
> > > > > >> > > >> > > pull request for an AbstractDeduper here:
> > > > > >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files
> > > > > >> > > >> > >
> > > > > >> > > >> > > I am planning to include the following features in
> the
> > > > first
> > > > > >> > > version:
> > > > > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key
> ->
> > > > > >> Tuple_Time
> > > > > >> > > >> > > correlation holds.
> > > > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate and
> > > > expired
> > > > > >> > tuples
> > > > > >> > > >> > > respectively.
> > > > > >> > > >> > >
> > > > > >> > > >> > > Thanks.
> > > > > >> > > >> > >
> > > > > >> > > >> > > ~ Bhupesh
> > > > > >> > > >> > >
> > > > > >> > > >> >
> > > > > >> > > >>
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Thomas Weise <th...@datatorrent.com>.
Hi Bhupesh,

Dedup is different with regard to state accumulation. For other windowed
operations, we collect state and then emit a result after a period of time
(trigger or watermark). Here, we only need the state to detect the
duplicate. Hence, it is inefficient to collect a list of tuples to
determine that a subsequently arriving tuple is a duplicate or not. But
isn't this scenario similar to the session window, where state is
continuously merged.

I would prefer to see more analysis on performance and scalability to large
key cardinality. The window operator only has the memory backed window
store at this time. Until there is a managed state backed implementation
that has seen benchmarking, we cannot really use it as baseline for further
implementations on top of it.

Thomas


On Thu, Jul 14, 2016 at 7:55 PM, Bhupesh Chawda <bh...@apache.org> wrote:

> Hi All,
>
> I also implemented a De-duplication operator using Windowed Operator. Now
> we have two implementations, one with Managed state and another using
> Windowed operator. Here are their details:
>
>    1. *With Managed State - *
>    - The operator is implemented using managed state as the storage for
>       buckets into which the tuples will be stored.
>       - *TimeBucketAssigner* is used to assign an incoming tuple to
>       different buckets based on the event time. It is also used to
> identify
>       whether a particular tuple is expired and should be sent to the
> expired
>       port / dropped.
>       - For managed state, the *ManagedTimeUnifiedStateImpl* implementation
>       is used which just requires the user to specify the event time
> and a bucket
>       is automatically assigned based on that. The structure of the bucket
> data
>       on storage is as follows: /operator_id /time_bucket
>       - An advantage of using Managed State approach is that we don't have
>       to assume the correlation of event time to the de-duplication key of
> the
>       tuple. For example, if we get two tuples like: (K1, T1), and (K1,
> T2), we
>       can still use ManagedStateImpl and conclude that these tuples are
>       duplicates based on the Key K1.
>       2. *With Windowed Operator - *
>    - The operator uses the WindowedOperatorImpl as the base operator.
>       - Accumulation, for the deduper, basically amounts to storing a list
>       of tuples in the data storage. Every time we get a unique tuple, we
>       *accumulate* it in the list.
>       - Event windows are modeled using the *TimeWindow* option. Although
>       SlidingTimeWIndows seems to be intuitive for data buckets, it seems
> to be
>       the costly option as the accumulation in this case is not just
> an aggregate
>       value but a list of values in that bucket.
>       - Watermarks are not assumed to be sent from an input operator
>       (although it is okay if an upstream operator sends them). The
>       *fixedWatermark* feature is used to assume watermarks which are
>       relative to the window time.
>       - One of the issues I found with using WindowedOperator for Dedup is
>       that event time is tightly coupled with the de-duplication key. In
> the
>       above example, (K1, T1), and (K1, T2) *might* be concluded as two
>       unique tuples since T1 and T2 may fall into two different time
> buckets.
>
> Here are the PRs for both of them.
>
>    - Using Managed State: https://github.com/apache/apex-malhar/pull/335
>    - Using Windowed Operator:
> https://github.com/apache/apex-malhar/pull/343
>
> Please review them and suggest on the correct approach for the final
> implementation which should be used to add other features like fault
> tolerance, scalability, optimizations etc.
> Thanks.
>
> ~ Bhupesh
>
> On Fri, Jul 8, 2016 at 11:30 PM, David Yan <da...@datatorrent.com> wrote:
>
> > No problem.
> >
> > By the way, I changed the method name to setFixedWatermark. And also, if
> > you want to drop any tuples that are considered late, you need to set the
> > allowed lateness to be 0.
> >
> > David
> >
> > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bh...@apache.org>
> wrote:
> >
> > > Thanks David.
> > > I'll try to create an implementation for Deduper which uses
> > > WindowedOperator. Will open a PR soon for review.
> > >
> > > ~ Bhupesh
> > >
> > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <da...@datatorrent.com>
> wrote:
> > >
> > > > Hi Bhupesh,
> > > >
> > > > I just added the method setFixedLateness(long millis) to
> > > > AbstractWindowedOperator in my PR. This will allow you to specify the
> > > > lateness with respect to the timestamp from the window ID without
> > > watermark
> > > > tuples from upstream.
> > > >
> > > > David
> > > >
> > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > Hi Bhupesh,
> > > > >
> > > > > Yes, the windowed operator currently depends on the watermark
> tuples
> > > > > upstream for any "lateness" related operation. If there is no
> > > watermark,
> > > > > nothing will be considered late. We can add support for lateness
> > > handling
> > > > > without incoming watermark tuples. Let me add that to the pull
> > request.
> > > > >
> > > > > David
> > > > >
> > > > >
> > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <
> bhupesh@apache.org>
> > > > > wrote:
> > > > >
> > > > >> Hi David,
> > > > >>
> > > > >> Thanks for your reply.
> > > > >>
> > > > >> If I am to use a windowed operator for the Dedup operator, there
> > > should
> > > > be
> > > > >> some operator (upstream to Deduper) which sends the watermark
> > tuples.
> > > > >> These
> > > > >> tuples (along with allowed lateness), will be the ones deciding
> > which
> > > > >> incoming tuples are too late and will be dropped. I have the
> > following
> > > > >> questions:
> > > > >>
> > > > >> Is a windowed operator (which needs watermarks) dependent upon
> some
> > > > other
> > > > >> operator for these tuples? What happens when there are no
> watermark
> > > > tuples
> > > > >> sent from upstream?
> > > > >>
> > > > >> Can a windowed operator "*assume*" the watermark tuples based on
> > some
> > > > >> notion of time? For example, can the Deduper, use the streaming
> > window
> > > > >> time
> > > > >> as the reference to advance the watermark?
> > > > >>
> > > > >> Thanks.
> > > > >>
> > > > >> ~ Bhupesh
> > > > >>
> > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <da...@datatorrent.com>
> > > > wrote:
> > > > >>
> > > > >> > Hi Bhupesh,
> > > > >> >
> > > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > > >> WindowedStorage
> > > > >> > and WindowedKeyedStorage:
> > > > >> >
> > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > > >> >
> > > > >> > We expect either to use ManagedState directly, or Spillable
> > > > structures,
> > > > >> > which in turn uses ManagedState.
> > > > >> >
> > > > >> > I'm not very familiar with the dedup operator. but in order to
> use
> > > the
> > > > >> > WindowedOperator, it sounds to me that we can use SlidingWindows
> > > with
> > > > an
> > > > >> > implementation of WindowedKeyedStorage that uses a Bloom filter
> to
> > > > cover
> > > > >> > most of the false cases.
> > > > >> >
> > > > >> > David
> > > > >> >
> > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> > bhupesh@apache.org>
> > > > >> wrote:
> > > > >> >
> > > > >> > > Hi All,
> > > > >> > >
> > > > >> > > I have looked into Windowing concepts from Apache Beam and the
> > PR
> > > > >> #319 by
> > > > >> > > David. Looks like there are a lot of advanced concepts which
> > could
> > > > be
> > > > >> > used
> > > > >> > > by operators using event time windowing.
> > > > >> > > Additionally I also looked at the Managed State
> implementation.
> > > > >> > >
> > > > >> > > One of the things I noticed is that there is an overlap of
> > > > >> functionality
> > > > >> > > between Managed State and Windowing Support in terms of the
> > > > following:
> > > > >> > >
> > > > >> > >    - *Discarding / Dropping of tuples* from the system -
> Managed
> > > > State
> > > > >> > uses
> > > > >> > >    the concept of expiry while a Windowed operator uses the
> > > concepts
> > > > >> of
> > > > >> > >    Watermarks and allowed lateness. If I try to reconcile the
> > > above
> > > > >> two,
> > > > >> > it
> > > > >> > >    seems like Managed State (through TimeBucketAssigner) is
> > trying
> > > > to
> > > > >> > >    implement some sort of implicit heuristic Watermarks based
> on
> > > > >> either
> > > > >> > the
> > > > >> > >    user supplied time or the event time.
> > > > >> > >    - *Global Window* support - Once we have an option to
> disable
> > > > >> purging
> > > > >> > in
> > > > >> > >    Managed State, it will have similar semantics to the Global
> > > > Window
> > > > >> > > option
> > > > >> > >    in Windowing support.
> > > > >> > >
> > > > >> > > If I understand correctly, is the suggestion to implement the
> > > Dedup
> > > > >> > > operator as a Windowed operator and to use managed state only
> > as a
> > > > >> > storage
> > > > >> > > medium (through WindowedStorage) ? What could be a better way
> of
> > > > going
> > > > >> > > about this?
> > > > >> > >
> > > > >> > > Thanks.
> > > > >> > >
> > > > >> > > ~ Bhupesh
> > > > >> > >
> > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > > bhupesh@apache.org>
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Hi Thomas,
> > > > >> > > >
> > > > >> > > > I agree that the case of processing bounded data is a
> special
> > > case
> > > > >> of
> > > > >> > > > unbounded data.
> > > > >> > > > Th difference I was pointing out was in terms of expiry.
> This
> > is
> > > > not
> > > > >> > > > applicable in case of bounded data sets, while unbounded
> data
> > > sets
> > > > >> will
> > > > >> > > > inherently use expiry for limiting the amount of data to be
> > > > stored.
> > > > >> > > >
> > > > >> > > > For idempotency when applying expiry on the streaming data,
> I
> > > need
> > > > >> to
> > > > >> > > > explore more on the using the window timestamp that you
> > proposed
> > > > as
> > > > >> > > opposed
> > > > >> > > > to the system time which I was planning to use.
> > > > >> > > >
> > > > >> > > > Thanks.
> > > > >> > > > ~ Bhupesh
> > > > >> > > >
> > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > > >> thomas@datatorrent.com>
> > > > >> > > > wrote:
> > > > >> > > >
> > > > >> > > >> Bhupesh,
> > > > >> > > >>
> > > > >> > > >> Why is there a distinction between bounded and unbounded
> > data?
> > > I
> > > > >> see
> > > > >> > the
> > > > >> > > >> former as a special case of the latter?
> > > > >> > > >>
> > > > >> > > >> When rewinding the stream or reprocessing the stream in
> > another
> > > > run
> > > > >> > the
> > > > >> > > >> operator should produce the same result.
> > > > >> > > >>
> > > > >> > > >> This operator should be idempotent also. That implies that
> > code
> > > > >> does
> > > > >> > not
> > > > >> > > >> rely on current system time but the window timestamp
> instead.
> > > > >> > > >>
> > > > >> > > >> All of this should be accomplished by using the windowing
> > > > support:
> > > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > > >> > > >>
> > > > >> > > >> Thanks,
> > > > >> > > >> Thomas
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > > >> > > bhupesh@datatorrent.com>
> > > > >> > > >> wrote:
> > > > >> > > >>
> > > > >> > > >> > Hi All,
> > > > >> > > >> >
> > > > >> > > >> > I want to validate the use cases for de-duplication that
> > will
> > > > be
> > > > >> > going
> > > > >> > > >> as
> > > > >> > > >> > part of this implementation.
> > > > >> > > >> >
> > > > >> > > >> >    - *Bounded data set*
> > > > >> > > >> >       - This is de-duplication for bounded data. For
> > example,
> > > > >> data
> > > > >> > > sets
> > > > >> > > >> >       which are old or fixed or which may not have a time
> > > field
> > > > >> at
> > > > >> > > >> > all. Example:
> > > > >> > > >> >       Last year's transaction records or Customer data
> etc.
> > > > >> > > >> >       - Concept of expiry is not needed as this is
> bounded
> > > data
> > > > >> set.
> > > > >> > > >> >       - *Unbounded data set*
> > > > >> > > >> >       - This is de-duplication of online streaming data
> > > > >> > > >> >       - Expiry is needed because here incoming tuples may
> > > > arrive
> > > > >> > later
> > > > >> > > >> than
> > > > >> > > >> >       what they are expected. Expiry is always computed
> by
> > > > taking
> > > > >> > the
> > > > >> > > >> > difference
> > > > >> > > >> >       in System time and the Event time.
> > > > >> > > >> >
> > > > >> > > >> > Any feedback is appreciated.
> > > > >> > > >> >
> > > > >> > > >> > Thanks.
> > > > >> > > >> >
> > > > >> > > >> > ~ Bhupesh
> > > > >> > > >> >
> > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > > >> > > >> bhupesh@datatorrent.com>
> > > > >> > > >> > wrote:
> > > > >> > > >> >
> > > > >> > > >> > > Hi All,
> > > > >> > > >> > >
> > > > >> > > >> > > I am working on adding a De-duplication operator in
> > Malhar
> > > > >> library
> > > > >> > > >> based
> > > > >> > > >> > > on managed state APIs. I will be working off the
> already
> > > > >> created
> > > > >> > > JIRA
> > > > >> > > >> -
> > > > >> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701
> > and
> > > > the
> > > > >> > > initial
> > > > >> > > >> > > pull request for an AbstractDeduper here:
> > > > >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files
> > > > >> > > >> > >
> > > > >> > > >> > > I am planning to include the following features in the
> > > first
> > > > >> > > version:
> > > > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key ->
> > > > >> Tuple_Time
> > > > >> > > >> > > correlation holds.
> > > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate and
> > > expired
> > > > >> > tuples
> > > > >> > > >> > > respectively.
> > > > >> > > >> > >
> > > > >> > > >> > > Thanks.
> > > > >> > > >> > >
> > > > >> > > >> > > ~ Bhupesh
> > > > >> > > >> > >
> > > > >> > > >> >
> > > > >> > > >>
> > > > >> > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Yogi Devendra <yo...@apache.org>.
+1 for Windowed Operator if we manage to take care of the problem you
mentioned for this approach in the earlier email.

IMO, "WindowedStorage to suit the dedup use case" would open up ways for
any other similar use-cases for Windowed Operators which needs some custom
handling.

-1 for keeping both the implementations. End user may not have sufficient
depth to understand which implementation to use.

+0 for using Managed state.


~ Yogi

On 15 July 2016 at 15:18, Bhupesh Chawda <bh...@apache.org> wrote:

> Hi,
>
> ​Yes Pramod, the state is not shared among the sliding windows. Probably,
> as David suggested, we can create a more apt implementation for
> WindowedStorage to suit the dedup use case​. And if we are to create a
> custom storage implementation, we might also have to provide functionality
> for asynchronously fetching the data for different event windows (since
> this has to be processed asynchronously due to the time involved in
> fetching data from persistent storage), which is already provided by
> managed state.
>
> ​One of the first decisions we need to make is which implementation to use.
> Once we decide that, we can concentrate completely on that and figure out
> ways to fill up the shortcomings. Here are the options:
>
>    1. Deduper using Managed State - Deduper using Bucketing mechanism and
>    storage provided by Managed State
>    2. Deduper using Windowed Operator - Deduper supporting Beam concepts ​
>    3. Keep both implementations around for users to try out.
>
> ​I have already described the design, the advantages and problems in both
> approaches in the previous emails. Please vote and mention why you think
> so.
>
> Thanks.
>
> ~ Bhupesh
>
>
>
> On Fri, Jul 15, 2016 at 4:43 AM, David Yan <da...@datatorrent.com> wrote:
>
> > In general, windows cannot share state.
> > But you can have a custom WindowedStorage implementation that does the
> > dedup more efficiently than the default behavior.
> >
> > David
> >
> > On Thu, Jul 14, 2016 at 2:40 PM, Pramod Immaneni <pramod@datatorrent.com
> >
> > wrote:
> >
> > > Bhupesh,
> > >
> > > When using "Windows Operator", if you were using sliding time windows
> > like
> > > you were originally thinking then you would have the correct dedup
> > behavior
> > > with the example case you mentioned with the tuples isn't it? Can the
> > > sliding windows share state with each other?
> > >
> > > Thanks
> > >
> > > On Thu, Jul 14, 2016 at 10:55 AM, Bhupesh Chawda <bh...@apache.org>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > I also implemented a De-duplication operator using Windowed Operator.
> > Now
> > > > we have two implementations, one with Managed state and another using
> > > > Windowed operator. Here are their details:
> > > >
> > > >    1. *With Managed State - *
> > > >    - The operator is implemented using managed state as the storage
> for
> > > >       buckets into which the tuples will be stored.
> > > >       - *TimeBucketAssigner* is used to assign an incoming tuple to
> > > >       different buckets based on the event time. It is also used to
> > > > identify
> > > >       whether a particular tuple is expired and should be sent to the
> > > > expired
> > > >       port / dropped.
> > > >       - For managed state, the *ManagedTimeUnifiedStateImpl*
> > > implementation
> > > >       is used which just requires the user to specify the event time
> > > > and a bucket
> > > >       is automatically assigned based on that. The structure of the
> > > bucket
> > > > data
> > > >       on storage is as follows: /operator_id /time_bucket
> > > >       - An advantage of using Managed State approach is that we don't
> > > have
> > > >       to assume the correlation of event time to the de-duplication
> key
> > > of
> > > > the
> > > >       tuple. For example, if we get two tuples like: (K1, T1), and
> (K1,
> > > > T2), we
> > > >       can still use ManagedStateImpl and conclude that these tuples
> are
> > > >       duplicates based on the Key K1.
> > > >       2. *With Windowed Operator - *
> > > >    - The operator uses the WindowedOperatorImpl as the base operator.
> > > >       - Accumulation, for the deduper, basically amounts to storing a
> > > list
> > > >       of tuples in the data storage. Every time we get a unique
> tuple,
> > we
> > > >       *accumulate* it in the list.
> > > >       - Event windows are modeled using the *TimeWindow* option.
> > Although
> > > >       SlidingTimeWIndows seems to be intuitive for data buckets, it
> > seems
> > > > to be
> > > >       the costly option as the accumulation in this case is not just
> > > > an aggregate
> > > >       value but a list of values in that bucket.
> > > >       - Watermarks are not assumed to be sent from an input operator
> > > >       (although it is okay if an upstream operator sends them). The
> > > >       *fixedWatermark* feature is used to assume watermarks which are
> > > >       relative to the window time.
> > > >       - One of the issues I found with using WindowedOperator for
> Dedup
> > > is
> > > >       that event time is tightly coupled with the de-duplication key.
> > In
> > > > the
> > > >       above example, (K1, T1), and (K1, T2) *might* be concluded as
> two
> > > >       unique tuples since T1 and T2 may fall into two different time
> > > > buckets.
> > > >
> > > > Here are the PRs for both of them.
> > > >
> > > >    - Using Managed State:
> > https://github.com/apache/apex-malhar/pull/335
> > > >    - Using Windowed Operator:
> > > > https://github.com/apache/apex-malhar/pull/343
> > > >
> > > > Please review them and suggest on the correct approach for the final
> > > > implementation which should be used to add other features like fault
> > > > tolerance, scalability, optimizations etc.
> > > > Thanks.
> > > >
> > > > ~ Bhupesh
> > > >
> > > > On Fri, Jul 8, 2016 at 11:30 PM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > No problem.
> > > > >
> > > > > By the way, I changed the method name to setFixedWatermark. And
> also,
> > > if
> > > > > you want to drop any tuples that are considered late, you need to
> set
> > > the
> > > > > allowed lateness to be 0.
> > > > >
> > > > > David
> > > > >
> > > > > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bhupesh@apache.org
> >
> > > > wrote:
> > > > >
> > > > > > Thanks David.
> > > > > > I'll try to create an implementation for Deduper which uses
> > > > > > WindowedOperator. Will open a PR soon for review.
> > > > > >
> > > > > > ~ Bhupesh
> > > > > >
> > > > > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <david@datatorrent.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hi Bhupesh,
> > > > > > >
> > > > > > > I just added the method setFixedLateness(long millis) to
> > > > > > > AbstractWindowedOperator in my PR. This will allow you to
> specify
> > > the
> > > > > > > lateness with respect to the timestamp from the window ID
> without
> > > > > > watermark
> > > > > > > tuples from upstream.
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <
> > david@datatorrent.com>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Bhupesh,
> > > > > > > >
> > > > > > > > Yes, the windowed operator currently depends on the watermark
> > > > tuples
> > > > > > > > upstream for any "lateness" related operation. If there is no
> > > > > > watermark,
> > > > > > > > nothing will be considered late. We can add support for
> > lateness
> > > > > > handling
> > > > > > > > without incoming watermark tuples. Let me add that to the
> pull
> > > > > request.
> > > > > > > >
> > > > > > > > David
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <
> > > > bhupesh@apache.org>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> Hi David,
> > > > > > > >>
> > > > > > > >> Thanks for your reply.
> > > > > > > >>
> > > > > > > >> If I am to use a windowed operator for the Dedup operator,
> > there
> > > > > > should
> > > > > > > be
> > > > > > > >> some operator (upstream to Deduper) which sends the
> watermark
> > > > > tuples.
> > > > > > > >> These
> > > > > > > >> tuples (along with allowed lateness), will be the ones
> > deciding
> > > > > which
> > > > > > > >> incoming tuples are too late and will be dropped. I have the
> > > > > following
> > > > > > > >> questions:
> > > > > > > >>
> > > > > > > >> Is a windowed operator (which needs watermarks) dependent
> upon
> > > > some
> > > > > > > other
> > > > > > > >> operator for these tuples? What happens when there are no
> > > > watermark
> > > > > > > tuples
> > > > > > > >> sent from upstream?
> > > > > > > >>
> > > > > > > >> Can a windowed operator "*assume*" the watermark tuples
> based
> > on
> > > > > some
> > > > > > > >> notion of time? For example, can the Deduper, use the
> > streaming
> > > > > window
> > > > > > > >> time
> > > > > > > >> as the reference to advance the watermark?
> > > > > > > >>
> > > > > > > >> Thanks.
> > > > > > > >>
> > > > > > > >> ~ Bhupesh
> > > > > > > >>
> > > > > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <
> > > david@datatorrent.com>
> > > > > > > wrote:
> > > > > > > >>
> > > > > > > >> > Hi Bhupesh,
> > > > > > > >> >
> > > > > > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > > > > > >> WindowedStorage
> > > > > > > >> > and WindowedKeyedStorage:
> > > > > > > >> >
> > > > > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > > > > > >> >
> > > > > > > >> > We expect either to use ManagedState directly, or
> Spillable
> > > > > > > structures,
> > > > > > > >> > which in turn uses ManagedState.
> > > > > > > >> >
> > > > > > > >> > I'm not very familiar with the dedup operator. but in
> order
> > to
> > > > use
> > > > > > the
> > > > > > > >> > WindowedOperator, it sounds to me that we can use
> > > SlidingWindows
> > > > > > with
> > > > > > > an
> > > > > > > >> > implementation of WindowedKeyedStorage that uses a Bloom
> > > filter
> > > > to
> > > > > > > cover
> > > > > > > >> > most of the false cases.
> > > > > > > >> >
> > > > > > > >> > David
> > > > > > > >> >
> > > > > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> > > > > bhupesh@apache.org>
> > > > > > > >> wrote:
> > > > > > > >> >
> > > > > > > >> > > Hi All,
> > > > > > > >> > >
> > > > > > > >> > > I have looked into Windowing concepts from Apache Beam
> and
> > > the
> > > > > PR
> > > > > > > >> #319 by
> > > > > > > >> > > David. Looks like there are a lot of advanced concepts
> > which
> > > > > could
> > > > > > > be
> > > > > > > >> > used
> > > > > > > >> > > by operators using event time windowing.
> > > > > > > >> > > Additionally I also looked at the Managed State
> > > > implementation.
> > > > > > > >> > >
> > > > > > > >> > > One of the things I noticed is that there is an overlap
> of
> > > > > > > >> functionality
> > > > > > > >> > > between Managed State and Windowing Support in terms of
> > the
> > > > > > > following:
> > > > > > > >> > >
> > > > > > > >> > >    - *Discarding / Dropping of tuples* from the system -
> > > > Managed
> > > > > > > State
> > > > > > > >> > uses
> > > > > > > >> > >    the concept of expiry while a Windowed operator uses
> > the
> > > > > > concepts
> > > > > > > >> of
> > > > > > > >> > >    Watermarks and allowed lateness. If I try to
> reconcile
> > > the
> > > > > > above
> > > > > > > >> two,
> > > > > > > >> > it
> > > > > > > >> > >    seems like Managed State (through TimeBucketAssigner)
> > is
> > > > > trying
> > > > > > > to
> > > > > > > >> > >    implement some sort of implicit heuristic Watermarks
> > > based
> > > > on
> > > > > > > >> either
> > > > > > > >> > the
> > > > > > > >> > >    user supplied time or the event time.
> > > > > > > >> > >    - *Global Window* support - Once we have an option to
> > > > disable
> > > > > > > >> purging
> > > > > > > >> > in
> > > > > > > >> > >    Managed State, it will have similar semantics to the
> > > Global
> > > > > > > Window
> > > > > > > >> > > option
> > > > > > > >> > >    in Windowing support.
> > > > > > > >> > >
> > > > > > > >> > > If I understand correctly, is the suggestion to
> implement
> > > the
> > > > > > Dedup
> > > > > > > >> > > operator as a Windowed operator and to use managed state
> > > only
> > > > > as a
> > > > > > > >> > storage
> > > > > > > >> > > medium (through WindowedStorage) ? What could be a
> better
> > > way
> > > > of
> > > > > > > going
> > > > > > > >> > > about this?
> > > > > > > >> > >
> > > > > > > >> > > Thanks.
> > > > > > > >> > >
> > > > > > > >> > > ~ Bhupesh
> > > > > > > >> > >
> > > > > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > > > > > bhupesh@apache.org>
> > > > > > > >> > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > Hi Thomas,
> > > > > > > >> > > >
> > > > > > > >> > > > I agree that the case of processing bounded data is a
> > > > special
> > > > > > case
> > > > > > > >> of
> > > > > > > >> > > > unbounded data.
> > > > > > > >> > > > Th difference I was pointing out was in terms of
> expiry.
> > > > This
> > > > > is
> > > > > > > not
> > > > > > > >> > > > applicable in case of bounded data sets, while
> unbounded
> > > > data
> > > > > > sets
> > > > > > > >> will
> > > > > > > >> > > > inherently use expiry for limiting the amount of data
> to
> > > be
> > > > > > > stored.
> > > > > > > >> > > >
> > > > > > > >> > > > For idempotency when applying expiry on the streaming
> > > data,
> > > > I
> > > > > > need
> > > > > > > >> to
> > > > > > > >> > > > explore more on the using the window timestamp that
> you
> > > > > proposed
> > > > > > > as
> > > > > > > >> > > opposed
> > > > > > > >> > > > to the system time which I was planning to use.
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks.
> > > > > > > >> > > > ~ Bhupesh
> > > > > > > >> > > >
> > > > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > > > > > >> thomas@datatorrent.com>
> > > > > > > >> > > > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > >> Bhupesh,
> > > > > > > >> > > >>
> > > > > > > >> > > >> Why is there a distinction between bounded and
> > unbounded
> > > > > data?
> > > > > > I
> > > > > > > >> see
> > > > > > > >> > the
> > > > > > > >> > > >> former as a special case of the latter?
> > > > > > > >> > > >>
> > > > > > > >> > > >> When rewinding the stream or reprocessing the stream
> in
> > > > > another
> > > > > > > run
> > > > > > > >> > the
> > > > > > > >> > > >> operator should produce the same result.
> > > > > > > >> > > >>
> > > > > > > >> > > >> This operator should be idempotent also. That implies
> > > that
> > > > > code
> > > > > > > >> does
> > > > > > > >> > not
> > > > > > > >> > > >> rely on current system time but the window timestamp
> > > > instead.
> > > > > > > >> > > >>
> > > > > > > >> > > >> All of this should be accomplished by using the
> > windowing
> > > > > > > support:
> > > > > > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > > > > > >> > > >>
> > > > > > > >> > > >> Thanks,
> > > > > > > >> > > >> Thomas
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >>
> > > > > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > > > > > >> > > bhupesh@datatorrent.com>
> > > > > > > >> > > >> wrote:
> > > > > > > >> > > >>
> > > > > > > >> > > >> > Hi All,
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > I want to validate the use cases for de-duplication
> > > that
> > > > > will
> > > > > > > be
> > > > > > > >> > going
> > > > > > > >> > > >> as
> > > > > > > >> > > >> > part of this implementation.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> >    - *Bounded data set*
> > > > > > > >> > > >> >       - This is de-duplication for bounded data.
> For
> > > > > example,
> > > > > > > >> data
> > > > > > > >> > > sets
> > > > > > > >> > > >> >       which are old or fixed or which may not have
> a
> > > time
> > > > > > field
> > > > > > > >> at
> > > > > > > >> > > >> > all. Example:
> > > > > > > >> > > >> >       Last year's transaction records or Customer
> > data
> > > > etc.
> > > > > > > >> > > >> >       - Concept of expiry is not needed as this is
> > > > bounded
> > > > > > data
> > > > > > > >> set.
> > > > > > > >> > > >> >       - *Unbounded data set*
> > > > > > > >> > > >> >       - This is de-duplication of online streaming
> > data
> > > > > > > >> > > >> >       - Expiry is needed because here incoming
> tuples
> > > may
> > > > > > > arrive
> > > > > > > >> > later
> > > > > > > >> > > >> than
> > > > > > > >> > > >> >       what they are expected. Expiry is always
> > computed
> > > > by
> > > > > > > taking
> > > > > > > >> > the
> > > > > > > >> > > >> > difference
> > > > > > > >> > > >> >       in System time and the Event time.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > Any feedback is appreciated.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > Thanks.
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > ~ Bhupesh
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > > > > > >> > > >> bhupesh@datatorrent.com>
> > > > > > > >> > > >> > wrote:
> > > > > > > >> > > >> >
> > > > > > > >> > > >> > > Hi All,
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > I am working on adding a De-duplication operator
> in
> > > > > Malhar
> > > > > > > >> library
> > > > > > > >> > > >> based
> > > > > > > >> > > >> > > on managed state APIs. I will be working off the
> > > > already
> > > > > > > >> created
> > > > > > > >> > > JIRA
> > > > > > > >> > > >> -
> > > > > > > >> > > >> > >
> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701
> > > > > and
> > > > > > > the
> > > > > > > >> > > initial
> > > > > > > >> > > >> > > pull request for an AbstractDeduper here:
> > > > > > > >> > > >> > >
> > https://github.com/apache/apex-malhar/pull/260/files
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > I am planning to include the following features
> in
> > > the
> > > > > > first
> > > > > > > >> > > version:
> > > > > > > >> > > >> > > 1. Time based de-duplication. Assumption:
> Tuple_Key
> > > ->
> > > > > > > >> Tuple_Time
> > > > > > > >> > > >> > > correlation holds.
> > > > > > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > > > > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate
> > and
> > > > > > expired
> > > > > > > >> > tuples
> > > > > > > >> > > >> > > respectively.
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > Thanks.
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> > > ~ Bhupesh
> > > > > > > >> > > >> > >
> > > > > > > >> > > >> >
> > > > > > > >> > > >>
> > > > > > > >> > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Bhupesh Chawda <bh...@apache.org>.
Hi,

​Yes Pramod, the state is not shared among the sliding windows. Probably,
as David suggested, we can create a more apt implementation for
WindowedStorage to suit the dedup use case​. And if we are to create a
custom storage implementation, we might also have to provide functionality
for asynchronously fetching the data for different event windows (since
this has to be processed asynchronously due to the time involved in
fetching data from persistent storage), which is already provided by
managed state.

​One of the first decisions we need to make is which implementation to use.
Once we decide that, we can concentrate completely on that and figure out
ways to fill up the shortcomings. Here are the options:

   1. Deduper using Managed State - Deduper using Bucketing mechanism and
   storage provided by Managed State
   2. Deduper using Windowed Operator - Deduper supporting Beam concepts ​
   3. Keep both implementations around for users to try out.

​I have already described the design, the advantages and problems in both
approaches in the previous emails. Please vote and mention why you think so.

Thanks.

~ Bhupesh



On Fri, Jul 15, 2016 at 4:43 AM, David Yan <da...@datatorrent.com> wrote:

> In general, windows cannot share state.
> But you can have a custom WindowedStorage implementation that does the
> dedup more efficiently than the default behavior.
>
> David
>
> On Thu, Jul 14, 2016 at 2:40 PM, Pramod Immaneni <pr...@datatorrent.com>
> wrote:
>
> > Bhupesh,
> >
> > When using "Windows Operator", if you were using sliding time windows
> like
> > you were originally thinking then you would have the correct dedup
> behavior
> > with the example case you mentioned with the tuples isn't it? Can the
> > sliding windows share state with each other?
> >
> > Thanks
> >
> > On Thu, Jul 14, 2016 at 10:55 AM, Bhupesh Chawda <bh...@apache.org>
> > wrote:
> >
> > > Hi All,
> > >
> > > I also implemented a De-duplication operator using Windowed Operator.
> Now
> > > we have two implementations, one with Managed state and another using
> > > Windowed operator. Here are their details:
> > >
> > >    1. *With Managed State - *
> > >    - The operator is implemented using managed state as the storage for
> > >       buckets into which the tuples will be stored.
> > >       - *TimeBucketAssigner* is used to assign an incoming tuple to
> > >       different buckets based on the event time. It is also used to
> > > identify
> > >       whether a particular tuple is expired and should be sent to the
> > > expired
> > >       port / dropped.
> > >       - For managed state, the *ManagedTimeUnifiedStateImpl*
> > implementation
> > >       is used which just requires the user to specify the event time
> > > and a bucket
> > >       is automatically assigned based on that. The structure of the
> > bucket
> > > data
> > >       on storage is as follows: /operator_id /time_bucket
> > >       - An advantage of using Managed State approach is that we don't
> > have
> > >       to assume the correlation of event time to the de-duplication key
> > of
> > > the
> > >       tuple. For example, if we get two tuples like: (K1, T1), and (K1,
> > > T2), we
> > >       can still use ManagedStateImpl and conclude that these tuples are
> > >       duplicates based on the Key K1.
> > >       2. *With Windowed Operator - *
> > >    - The operator uses the WindowedOperatorImpl as the base operator.
> > >       - Accumulation, for the deduper, basically amounts to storing a
> > list
> > >       of tuples in the data storage. Every time we get a unique tuple,
> we
> > >       *accumulate* it in the list.
> > >       - Event windows are modeled using the *TimeWindow* option.
> Although
> > >       SlidingTimeWIndows seems to be intuitive for data buckets, it
> seems
> > > to be
> > >       the costly option as the accumulation in this case is not just
> > > an aggregate
> > >       value but a list of values in that bucket.
> > >       - Watermarks are not assumed to be sent from an input operator
> > >       (although it is okay if an upstream operator sends them). The
> > >       *fixedWatermark* feature is used to assume watermarks which are
> > >       relative to the window time.
> > >       - One of the issues I found with using WindowedOperator for Dedup
> > is
> > >       that event time is tightly coupled with the de-duplication key.
> In
> > > the
> > >       above example, (K1, T1), and (K1, T2) *might* be concluded as two
> > >       unique tuples since T1 and T2 may fall into two different time
> > > buckets.
> > >
> > > Here are the PRs for both of them.
> > >
> > >    - Using Managed State:
> https://github.com/apache/apex-malhar/pull/335
> > >    - Using Windowed Operator:
> > > https://github.com/apache/apex-malhar/pull/343
> > >
> > > Please review them and suggest on the correct approach for the final
> > > implementation which should be used to add other features like fault
> > > tolerance, scalability, optimizations etc.
> > > Thanks.
> > >
> > > ~ Bhupesh
> > >
> > > On Fri, Jul 8, 2016 at 11:30 PM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > No problem.
> > > >
> > > > By the way, I changed the method name to setFixedWatermark. And also,
> > if
> > > > you want to drop any tuples that are considered late, you need to set
> > the
> > > > allowed lateness to be 0.
> > > >
> > > > David
> > > >
> > > > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bh...@apache.org>
> > > wrote:
> > > >
> > > > > Thanks David.
> > > > > I'll try to create an implementation for Deduper which uses
> > > > > WindowedOperator. Will open a PR soon for review.
> > > > >
> > > > > ~ Bhupesh
> > > > >
> > > > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > > >
> > > > > > Hi Bhupesh,
> > > > > >
> > > > > > I just added the method setFixedLateness(long millis) to
> > > > > > AbstractWindowedOperator in my PR. This will allow you to specify
> > the
> > > > > > lateness with respect to the timestamp from the window ID without
> > > > > watermark
> > > > > > tuples from upstream.
> > > > > >
> > > > > > David
> > > > > >
> > > > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <
> david@datatorrent.com>
> > > > > wrote:
> > > > > >
> > > > > > > Hi Bhupesh,
> > > > > > >
> > > > > > > Yes, the windowed operator currently depends on the watermark
> > > tuples
> > > > > > > upstream for any "lateness" related operation. If there is no
> > > > > watermark,
> > > > > > > nothing will be considered late. We can add support for
> lateness
> > > > > handling
> > > > > > > without incoming watermark tuples. Let me add that to the pull
> > > > request.
> > > > > > >
> > > > > > > David
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <
> > > bhupesh@apache.org>
> > > > > > > wrote:
> > > > > > >
> > > > > > >> Hi David,
> > > > > > >>
> > > > > > >> Thanks for your reply.
> > > > > > >>
> > > > > > >> If I am to use a windowed operator for the Dedup operator,
> there
> > > > > should
> > > > > > be
> > > > > > >> some operator (upstream to Deduper) which sends the watermark
> > > > tuples.
> > > > > > >> These
> > > > > > >> tuples (along with allowed lateness), will be the ones
> deciding
> > > > which
> > > > > > >> incoming tuples are too late and will be dropped. I have the
> > > > following
> > > > > > >> questions:
> > > > > > >>
> > > > > > >> Is a windowed operator (which needs watermarks) dependent upon
> > > some
> > > > > > other
> > > > > > >> operator for these tuples? What happens when there are no
> > > watermark
> > > > > > tuples
> > > > > > >> sent from upstream?
> > > > > > >>
> > > > > > >> Can a windowed operator "*assume*" the watermark tuples based
> on
> > > > some
> > > > > > >> notion of time? For example, can the Deduper, use the
> streaming
> > > > window
> > > > > > >> time
> > > > > > >> as the reference to advance the watermark?
> > > > > > >>
> > > > > > >> Thanks.
> > > > > > >>
> > > > > > >> ~ Bhupesh
> > > > > > >>
> > > > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <
> > david@datatorrent.com>
> > > > > > wrote:
> > > > > > >>
> > > > > > >> > Hi Bhupesh,
> > > > > > >> >
> > > > > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > > > > >> WindowedStorage
> > > > > > >> > and WindowedKeyedStorage:
> > > > > > >> >
> > > > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > > > > >> >
> > > > > > >> > We expect either to use ManagedState directly, or Spillable
> > > > > > structures,
> > > > > > >> > which in turn uses ManagedState.
> > > > > > >> >
> > > > > > >> > I'm not very familiar with the dedup operator. but in order
> to
> > > use
> > > > > the
> > > > > > >> > WindowedOperator, it sounds to me that we can use
> > SlidingWindows
> > > > > with
> > > > > > an
> > > > > > >> > implementation of WindowedKeyedStorage that uses a Bloom
> > filter
> > > to
> > > > > > cover
> > > > > > >> > most of the false cases.
> > > > > > >> >
> > > > > > >> > David
> > > > > > >> >
> > > > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> > > > bhupesh@apache.org>
> > > > > > >> wrote:
> > > > > > >> >
> > > > > > >> > > Hi All,
> > > > > > >> > >
> > > > > > >> > > I have looked into Windowing concepts from Apache Beam and
> > the
> > > > PR
> > > > > > >> #319 by
> > > > > > >> > > David. Looks like there are a lot of advanced concepts
> which
> > > > could
> > > > > > be
> > > > > > >> > used
> > > > > > >> > > by operators using event time windowing.
> > > > > > >> > > Additionally I also looked at the Managed State
> > > implementation.
> > > > > > >> > >
> > > > > > >> > > One of the things I noticed is that there is an overlap of
> > > > > > >> functionality
> > > > > > >> > > between Managed State and Windowing Support in terms of
> the
> > > > > > following:
> > > > > > >> > >
> > > > > > >> > >    - *Discarding / Dropping of tuples* from the system -
> > > Managed
> > > > > > State
> > > > > > >> > uses
> > > > > > >> > >    the concept of expiry while a Windowed operator uses
> the
> > > > > concepts
> > > > > > >> of
> > > > > > >> > >    Watermarks and allowed lateness. If I try to reconcile
> > the
> > > > > above
> > > > > > >> two,
> > > > > > >> > it
> > > > > > >> > >    seems like Managed State (through TimeBucketAssigner)
> is
> > > > trying
> > > > > > to
> > > > > > >> > >    implement some sort of implicit heuristic Watermarks
> > based
> > > on
> > > > > > >> either
> > > > > > >> > the
> > > > > > >> > >    user supplied time or the event time.
> > > > > > >> > >    - *Global Window* support - Once we have an option to
> > > disable
> > > > > > >> purging
> > > > > > >> > in
> > > > > > >> > >    Managed State, it will have similar semantics to the
> > Global
> > > > > > Window
> > > > > > >> > > option
> > > > > > >> > >    in Windowing support.
> > > > > > >> > >
> > > > > > >> > > If I understand correctly, is the suggestion to implement
> > the
> > > > > Dedup
> > > > > > >> > > operator as a Windowed operator and to use managed state
> > only
> > > > as a
> > > > > > >> > storage
> > > > > > >> > > medium (through WindowedStorage) ? What could be a better
> > way
> > > of
> > > > > > going
> > > > > > >> > > about this?
> > > > > > >> > >
> > > > > > >> > > Thanks.
> > > > > > >> > >
> > > > > > >> > > ~ Bhupesh
> > > > > > >> > >
> > > > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > > > > bhupesh@apache.org>
> > > > > > >> > > wrote:
> > > > > > >> > >
> > > > > > >> > > > Hi Thomas,
> > > > > > >> > > >
> > > > > > >> > > > I agree that the case of processing bounded data is a
> > > special
> > > > > case
> > > > > > >> of
> > > > > > >> > > > unbounded data.
> > > > > > >> > > > Th difference I was pointing out was in terms of expiry.
> > > This
> > > > is
> > > > > > not
> > > > > > >> > > > applicable in case of bounded data sets, while unbounded
> > > data
> > > > > sets
> > > > > > >> will
> > > > > > >> > > > inherently use expiry for limiting the amount of data to
> > be
> > > > > > stored.
> > > > > > >> > > >
> > > > > > >> > > > For idempotency when applying expiry on the streaming
> > data,
> > > I
> > > > > need
> > > > > > >> to
> > > > > > >> > > > explore more on the using the window timestamp that you
> > > > proposed
> > > > > > as
> > > > > > >> > > opposed
> > > > > > >> > > > to the system time which I was planning to use.
> > > > > > >> > > >
> > > > > > >> > > > Thanks.
> > > > > > >> > > > ~ Bhupesh
> > > > > > >> > > >
> > > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > > > > >> thomas@datatorrent.com>
> > > > > > >> > > > wrote:
> > > > > > >> > > >
> > > > > > >> > > >> Bhupesh,
> > > > > > >> > > >>
> > > > > > >> > > >> Why is there a distinction between bounded and
> unbounded
> > > > data?
> > > > > I
> > > > > > >> see
> > > > > > >> > the
> > > > > > >> > > >> former as a special case of the latter?
> > > > > > >> > > >>
> > > > > > >> > > >> When rewinding the stream or reprocessing the stream in
> > > > another
> > > > > > run
> > > > > > >> > the
> > > > > > >> > > >> operator should produce the same result.
> > > > > > >> > > >>
> > > > > > >> > > >> This operator should be idempotent also. That implies
> > that
> > > > code
> > > > > > >> does
> > > > > > >> > not
> > > > > > >> > > >> rely on current system time but the window timestamp
> > > instead.
> > > > > > >> > > >>
> > > > > > >> > > >> All of this should be accomplished by using the
> windowing
> > > > > > support:
> > > > > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > > > > >> > > >>
> > > > > > >> > > >> Thanks,
> > > > > > >> > > >> Thomas
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >>
> > > > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > > > > >> > > bhupesh@datatorrent.com>
> > > > > > >> > > >> wrote:
> > > > > > >> > > >>
> > > > > > >> > > >> > Hi All,
> > > > > > >> > > >> >
> > > > > > >> > > >> > I want to validate the use cases for de-duplication
> > that
> > > > will
> > > > > > be
> > > > > > >> > going
> > > > > > >> > > >> as
> > > > > > >> > > >> > part of this implementation.
> > > > > > >> > > >> >
> > > > > > >> > > >> >    - *Bounded data set*
> > > > > > >> > > >> >       - This is de-duplication for bounded data. For
> > > > example,
> > > > > > >> data
> > > > > > >> > > sets
> > > > > > >> > > >> >       which are old or fixed or which may not have a
> > time
> > > > > field
> > > > > > >> at
> > > > > > >> > > >> > all. Example:
> > > > > > >> > > >> >       Last year's transaction records or Customer
> data
> > > etc.
> > > > > > >> > > >> >       - Concept of expiry is not needed as this is
> > > bounded
> > > > > data
> > > > > > >> set.
> > > > > > >> > > >> >       - *Unbounded data set*
> > > > > > >> > > >> >       - This is de-duplication of online streaming
> data
> > > > > > >> > > >> >       - Expiry is needed because here incoming tuples
> > may
> > > > > > arrive
> > > > > > >> > later
> > > > > > >> > > >> than
> > > > > > >> > > >> >       what they are expected. Expiry is always
> computed
> > > by
> > > > > > taking
> > > > > > >> > the
> > > > > > >> > > >> > difference
> > > > > > >> > > >> >       in System time and the Event time.
> > > > > > >> > > >> >
> > > > > > >> > > >> > Any feedback is appreciated.
> > > > > > >> > > >> >
> > > > > > >> > > >> > Thanks.
> > > > > > >> > > >> >
> > > > > > >> > > >> > ~ Bhupesh
> > > > > > >> > > >> >
> > > > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > > > > >> > > >> bhupesh@datatorrent.com>
> > > > > > >> > > >> > wrote:
> > > > > > >> > > >> >
> > > > > > >> > > >> > > Hi All,
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > I am working on adding a De-duplication operator in
> > > > Malhar
> > > > > > >> library
> > > > > > >> > > >> based
> > > > > > >> > > >> > > on managed state APIs. I will be working off the
> > > already
> > > > > > >> created
> > > > > > >> > > JIRA
> > > > > > >> > > >> -
> > > > > > >> > > >> > >
> > https://issues.apache.org/jira/browse/APEXMALHAR-1701
> > > > and
> > > > > > the
> > > > > > >> > > initial
> > > > > > >> > > >> > > pull request for an AbstractDeduper here:
> > > > > > >> > > >> > >
> https://github.com/apache/apex-malhar/pull/260/files
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > I am planning to include the following features in
> > the
> > > > > first
> > > > > > >> > > version:
> > > > > > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key
> > ->
> > > > > > >> Tuple_Time
> > > > > > >> > > >> > > correlation holds.
> > > > > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > > > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate
> and
> > > > > expired
> > > > > > >> > tuples
> > > > > > >> > > >> > > respectively.
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > Thanks.
> > > > > > >> > > >> > >
> > > > > > >> > > >> > > ~ Bhupesh
> > > > > > >> > > >> > >
> > > > > > >> > > >> >
> > > > > > >> > > >>
> > > > > > >> > > >
> > > > > > >> > > >
> > > > > > >> > >
> > > > > > >> >
> > > > > > >>
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by David Yan <da...@datatorrent.com>.
In general, windows cannot share state.
But you can have a custom WindowedStorage implementation that does the
dedup more efficiently than the default behavior.

David

On Thu, Jul 14, 2016 at 2:40 PM, Pramod Immaneni <pr...@datatorrent.com>
wrote:

> Bhupesh,
>
> When using "Windows Operator", if you were using sliding time windows like
> you were originally thinking then you would have the correct dedup behavior
> with the example case you mentioned with the tuples isn't it? Can the
> sliding windows share state with each other?
>
> Thanks
>
> On Thu, Jul 14, 2016 at 10:55 AM, Bhupesh Chawda <bh...@apache.org>
> wrote:
>
> > Hi All,
> >
> > I also implemented a De-duplication operator using Windowed Operator. Now
> > we have two implementations, one with Managed state and another using
> > Windowed operator. Here are their details:
> >
> >    1. *With Managed State - *
> >    - The operator is implemented using managed state as the storage for
> >       buckets into which the tuples will be stored.
> >       - *TimeBucketAssigner* is used to assign an incoming tuple to
> >       different buckets based on the event time. It is also used to
> > identify
> >       whether a particular tuple is expired and should be sent to the
> > expired
> >       port / dropped.
> >       - For managed state, the *ManagedTimeUnifiedStateImpl*
> implementation
> >       is used which just requires the user to specify the event time
> > and a bucket
> >       is automatically assigned based on that. The structure of the
> bucket
> > data
> >       on storage is as follows: /operator_id /time_bucket
> >       - An advantage of using Managed State approach is that we don't
> have
> >       to assume the correlation of event time to the de-duplication key
> of
> > the
> >       tuple. For example, if we get two tuples like: (K1, T1), and (K1,
> > T2), we
> >       can still use ManagedStateImpl and conclude that these tuples are
> >       duplicates based on the Key K1.
> >       2. *With Windowed Operator - *
> >    - The operator uses the WindowedOperatorImpl as the base operator.
> >       - Accumulation, for the deduper, basically amounts to storing a
> list
> >       of tuples in the data storage. Every time we get a unique tuple, we
> >       *accumulate* it in the list.
> >       - Event windows are modeled using the *TimeWindow* option. Although
> >       SlidingTimeWIndows seems to be intuitive for data buckets, it seems
> > to be
> >       the costly option as the accumulation in this case is not just
> > an aggregate
> >       value but a list of values in that bucket.
> >       - Watermarks are not assumed to be sent from an input operator
> >       (although it is okay if an upstream operator sends them). The
> >       *fixedWatermark* feature is used to assume watermarks which are
> >       relative to the window time.
> >       - One of the issues I found with using WindowedOperator for Dedup
> is
> >       that event time is tightly coupled with the de-duplication key. In
> > the
> >       above example, (K1, T1), and (K1, T2) *might* be concluded as two
> >       unique tuples since T1 and T2 may fall into two different time
> > buckets.
> >
> > Here are the PRs for both of them.
> >
> >    - Using Managed State: https://github.com/apache/apex-malhar/pull/335
> >    - Using Windowed Operator:
> > https://github.com/apache/apex-malhar/pull/343
> >
> > Please review them and suggest on the correct approach for the final
> > implementation which should be used to add other features like fault
> > tolerance, scalability, optimizations etc.
> > Thanks.
> >
> > ~ Bhupesh
> >
> > On Fri, Jul 8, 2016 at 11:30 PM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > No problem.
> > >
> > > By the way, I changed the method name to setFixedWatermark. And also,
> if
> > > you want to drop any tuples that are considered late, you need to set
> the
> > > allowed lateness to be 0.
> > >
> > > David
> > >
> > > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bh...@apache.org>
> > wrote:
> > >
> > > > Thanks David.
> > > > I'll try to create an implementation for Deduper which uses
> > > > WindowedOperator. Will open a PR soon for review.
> > > >
> > > > ~ Bhupesh
> > > >
> > > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > > >
> > > > > Hi Bhupesh,
> > > > >
> > > > > I just added the method setFixedLateness(long millis) to
> > > > > AbstractWindowedOperator in my PR. This will allow you to specify
> the
> > > > > lateness with respect to the timestamp from the window ID without
> > > > watermark
> > > > > tuples from upstream.
> > > > >
> > > > > David
> > > > >
> > > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <da...@datatorrent.com>
> > > > wrote:
> > > > >
> > > > > > Hi Bhupesh,
> > > > > >
> > > > > > Yes, the windowed operator currently depends on the watermark
> > tuples
> > > > > > upstream for any "lateness" related operation. If there is no
> > > > watermark,
> > > > > > nothing will be considered late. We can add support for lateness
> > > > handling
> > > > > > without incoming watermark tuples. Let me add that to the pull
> > > request.
> > > > > >
> > > > > > David
> > > > > >
> > > > > >
> > > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <
> > bhupesh@apache.org>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi David,
> > > > > >>
> > > > > >> Thanks for your reply.
> > > > > >>
> > > > > >> If I am to use a windowed operator for the Dedup operator, there
> > > > should
> > > > > be
> > > > > >> some operator (upstream to Deduper) which sends the watermark
> > > tuples.
> > > > > >> These
> > > > > >> tuples (along with allowed lateness), will be the ones deciding
> > > which
> > > > > >> incoming tuples are too late and will be dropped. I have the
> > > following
> > > > > >> questions:
> > > > > >>
> > > > > >> Is a windowed operator (which needs watermarks) dependent upon
> > some
> > > > > other
> > > > > >> operator for these tuples? What happens when there are no
> > watermark
> > > > > tuples
> > > > > >> sent from upstream?
> > > > > >>
> > > > > >> Can a windowed operator "*assume*" the watermark tuples based on
> > > some
> > > > > >> notion of time? For example, can the Deduper, use the streaming
> > > window
> > > > > >> time
> > > > > >> as the reference to advance the watermark?
> > > > > >>
> > > > > >> Thanks.
> > > > > >>
> > > > > >> ~ Bhupesh
> > > > > >>
> > > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <
> david@datatorrent.com>
> > > > > wrote:
> > > > > >>
> > > > > >> > Hi Bhupesh,
> > > > > >> >
> > > > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > > > >> WindowedStorage
> > > > > >> > and WindowedKeyedStorage:
> > > > > >> >
> > > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > > > >> >
> > > > > >> > We expect either to use ManagedState directly, or Spillable
> > > > > structures,
> > > > > >> > which in turn uses ManagedState.
> > > > > >> >
> > > > > >> > I'm not very familiar with the dedup operator. but in order to
> > use
> > > > the
> > > > > >> > WindowedOperator, it sounds to me that we can use
> SlidingWindows
> > > > with
> > > > > an
> > > > > >> > implementation of WindowedKeyedStorage that uses a Bloom
> filter
> > to
> > > > > cover
> > > > > >> > most of the false cases.
> > > > > >> >
> > > > > >> > David
> > > > > >> >
> > > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> > > bhupesh@apache.org>
> > > > > >> wrote:
> > > > > >> >
> > > > > >> > > Hi All,
> > > > > >> > >
> > > > > >> > > I have looked into Windowing concepts from Apache Beam and
> the
> > > PR
> > > > > >> #319 by
> > > > > >> > > David. Looks like there are a lot of advanced concepts which
> > > could
> > > > > be
> > > > > >> > used
> > > > > >> > > by operators using event time windowing.
> > > > > >> > > Additionally I also looked at the Managed State
> > implementation.
> > > > > >> > >
> > > > > >> > > One of the things I noticed is that there is an overlap of
> > > > > >> functionality
> > > > > >> > > between Managed State and Windowing Support in terms of the
> > > > > following:
> > > > > >> > >
> > > > > >> > >    - *Discarding / Dropping of tuples* from the system -
> > Managed
> > > > > State
> > > > > >> > uses
> > > > > >> > >    the concept of expiry while a Windowed operator uses the
> > > > concepts
> > > > > >> of
> > > > > >> > >    Watermarks and allowed lateness. If I try to reconcile
> the
> > > > above
> > > > > >> two,
> > > > > >> > it
> > > > > >> > >    seems like Managed State (through TimeBucketAssigner) is
> > > trying
> > > > > to
> > > > > >> > >    implement some sort of implicit heuristic Watermarks
> based
> > on
> > > > > >> either
> > > > > >> > the
> > > > > >> > >    user supplied time or the event time.
> > > > > >> > >    - *Global Window* support - Once we have an option to
> > disable
> > > > > >> purging
> > > > > >> > in
> > > > > >> > >    Managed State, it will have similar semantics to the
> Global
> > > > > Window
> > > > > >> > > option
> > > > > >> > >    in Windowing support.
> > > > > >> > >
> > > > > >> > > If I understand correctly, is the suggestion to implement
> the
> > > > Dedup
> > > > > >> > > operator as a Windowed operator and to use managed state
> only
> > > as a
> > > > > >> > storage
> > > > > >> > > medium (through WindowedStorage) ? What could be a better
> way
> > of
> > > > > going
> > > > > >> > > about this?
> > > > > >> > >
> > > > > >> > > Thanks.
> > > > > >> > >
> > > > > >> > > ~ Bhupesh
> > > > > >> > >
> > > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > > > bhupesh@apache.org>
> > > > > >> > > wrote:
> > > > > >> > >
> > > > > >> > > > Hi Thomas,
> > > > > >> > > >
> > > > > >> > > > I agree that the case of processing bounded data is a
> > special
> > > > case
> > > > > >> of
> > > > > >> > > > unbounded data.
> > > > > >> > > > Th difference I was pointing out was in terms of expiry.
> > This
> > > is
> > > > > not
> > > > > >> > > > applicable in case of bounded data sets, while unbounded
> > data
> > > > sets
> > > > > >> will
> > > > > >> > > > inherently use expiry for limiting the amount of data to
> be
> > > > > stored.
> > > > > >> > > >
> > > > > >> > > > For idempotency when applying expiry on the streaming
> data,
> > I
> > > > need
> > > > > >> to
> > > > > >> > > > explore more on the using the window timestamp that you
> > > proposed
> > > > > as
> > > > > >> > > opposed
> > > > > >> > > > to the system time which I was planning to use.
> > > > > >> > > >
> > > > > >> > > > Thanks.
> > > > > >> > > > ~ Bhupesh
> > > > > >> > > >
> > > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > > > >> thomas@datatorrent.com>
> > > > > >> > > > wrote:
> > > > > >> > > >
> > > > > >> > > >> Bhupesh,
> > > > > >> > > >>
> > > > > >> > > >> Why is there a distinction between bounded and unbounded
> > > data?
> > > > I
> > > > > >> see
> > > > > >> > the
> > > > > >> > > >> former as a special case of the latter?
> > > > > >> > > >>
> > > > > >> > > >> When rewinding the stream or reprocessing the stream in
> > > another
> > > > > run
> > > > > >> > the
> > > > > >> > > >> operator should produce the same result.
> > > > > >> > > >>
> > > > > >> > > >> This operator should be idempotent also. That implies
> that
> > > code
> > > > > >> does
> > > > > >> > not
> > > > > >> > > >> rely on current system time but the window timestamp
> > instead.
> > > > > >> > > >>
> > > > > >> > > >> All of this should be accomplished by using the windowing
> > > > > support:
> > > > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > > > >> > > >>
> > > > > >> > > >> Thanks,
> > > > > >> > > >> Thomas
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > > >>
> > > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > > > >> > > bhupesh@datatorrent.com>
> > > > > >> > > >> wrote:
> > > > > >> > > >>
> > > > > >> > > >> > Hi All,
> > > > > >> > > >> >
> > > > > >> > > >> > I want to validate the use cases for de-duplication
> that
> > > will
> > > > > be
> > > > > >> > going
> > > > > >> > > >> as
> > > > > >> > > >> > part of this implementation.
> > > > > >> > > >> >
> > > > > >> > > >> >    - *Bounded data set*
> > > > > >> > > >> >       - This is de-duplication for bounded data. For
> > > example,
> > > > > >> data
> > > > > >> > > sets
> > > > > >> > > >> >       which are old or fixed or which may not have a
> time
> > > > field
> > > > > >> at
> > > > > >> > > >> > all. Example:
> > > > > >> > > >> >       Last year's transaction records or Customer data
> > etc.
> > > > > >> > > >> >       - Concept of expiry is not needed as this is
> > bounded
> > > > data
> > > > > >> set.
> > > > > >> > > >> >       - *Unbounded data set*
> > > > > >> > > >> >       - This is de-duplication of online streaming data
> > > > > >> > > >> >       - Expiry is needed because here incoming tuples
> may
> > > > > arrive
> > > > > >> > later
> > > > > >> > > >> than
> > > > > >> > > >> >       what they are expected. Expiry is always computed
> > by
> > > > > taking
> > > > > >> > the
> > > > > >> > > >> > difference
> > > > > >> > > >> >       in System time and the Event time.
> > > > > >> > > >> >
> > > > > >> > > >> > Any feedback is appreciated.
> > > > > >> > > >> >
> > > > > >> > > >> > Thanks.
> > > > > >> > > >> >
> > > > > >> > > >> > ~ Bhupesh
> > > > > >> > > >> >
> > > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > > > >> > > >> bhupesh@datatorrent.com>
> > > > > >> > > >> > wrote:
> > > > > >> > > >> >
> > > > > >> > > >> > > Hi All,
> > > > > >> > > >> > >
> > > > > >> > > >> > > I am working on adding a De-duplication operator in
> > > Malhar
> > > > > >> library
> > > > > >> > > >> based
> > > > > >> > > >> > > on managed state APIs. I will be working off the
> > already
> > > > > >> created
> > > > > >> > > JIRA
> > > > > >> > > >> -
> > > > > >> > > >> > >
> https://issues.apache.org/jira/browse/APEXMALHAR-1701
> > > and
> > > > > the
> > > > > >> > > initial
> > > > > >> > > >> > > pull request for an AbstractDeduper here:
> > > > > >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files
> > > > > >> > > >> > >
> > > > > >> > > >> > > I am planning to include the following features in
> the
> > > > first
> > > > > >> > > version:
> > > > > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key
> ->
> > > > > >> Tuple_Time
> > > > > >> > > >> > > correlation holds.
> > > > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate and
> > > > expired
> > > > > >> > tuples
> > > > > >> > > >> > > respectively.
> > > > > >> > > >> > >
> > > > > >> > > >> > > Thanks.
> > > > > >> > > >> > >
> > > > > >> > > >> > > ~ Bhupesh
> > > > > >> > > >> > >
> > > > > >> > > >> >
> > > > > >> > > >>
> > > > > >> > > >
> > > > > >> > > >
> > > > > >> > >
> > > > > >> >
> > > > > >>
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Pramod Immaneni <pr...@datatorrent.com>.
Bhupesh,

When using "Windows Operator", if you were using sliding time windows like
you were originally thinking then you would have the correct dedup behavior
with the example case you mentioned with the tuples isn't it? Can the
sliding windows share state with each other?

Thanks

On Thu, Jul 14, 2016 at 10:55 AM, Bhupesh Chawda <bh...@apache.org> wrote:

> Hi All,
>
> I also implemented a De-duplication operator using Windowed Operator. Now
> we have two implementations, one with Managed state and another using
> Windowed operator. Here are their details:
>
>    1. *With Managed State - *
>    - The operator is implemented using managed state as the storage for
>       buckets into which the tuples will be stored.
>       - *TimeBucketAssigner* is used to assign an incoming tuple to
>       different buckets based on the event time. It is also used to
> identify
>       whether a particular tuple is expired and should be sent to the
> expired
>       port / dropped.
>       - For managed state, the *ManagedTimeUnifiedStateImpl* implementation
>       is used which just requires the user to specify the event time
> and a bucket
>       is automatically assigned based on that. The structure of the bucket
> data
>       on storage is as follows: /operator_id /time_bucket
>       - An advantage of using Managed State approach is that we don't have
>       to assume the correlation of event time to the de-duplication key of
> the
>       tuple. For example, if we get two tuples like: (K1, T1), and (K1,
> T2), we
>       can still use ManagedStateImpl and conclude that these tuples are
>       duplicates based on the Key K1.
>       2. *With Windowed Operator - *
>    - The operator uses the WindowedOperatorImpl as the base operator.
>       - Accumulation, for the deduper, basically amounts to storing a list
>       of tuples in the data storage. Every time we get a unique tuple, we
>       *accumulate* it in the list.
>       - Event windows are modeled using the *TimeWindow* option. Although
>       SlidingTimeWIndows seems to be intuitive for data buckets, it seems
> to be
>       the costly option as the accumulation in this case is not just
> an aggregate
>       value but a list of values in that bucket.
>       - Watermarks are not assumed to be sent from an input operator
>       (although it is okay if an upstream operator sends them). The
>       *fixedWatermark* feature is used to assume watermarks which are
>       relative to the window time.
>       - One of the issues I found with using WindowedOperator for Dedup is
>       that event time is tightly coupled with the de-duplication key. In
> the
>       above example, (K1, T1), and (K1, T2) *might* be concluded as two
>       unique tuples since T1 and T2 may fall into two different time
> buckets.
>
> Here are the PRs for both of them.
>
>    - Using Managed State: https://github.com/apache/apex-malhar/pull/335
>    - Using Windowed Operator:
> https://github.com/apache/apex-malhar/pull/343
>
> Please review them and suggest on the correct approach for the final
> implementation which should be used to add other features like fault
> tolerance, scalability, optimizations etc.
> Thanks.
>
> ~ Bhupesh
>
> On Fri, Jul 8, 2016 at 11:30 PM, David Yan <da...@datatorrent.com> wrote:
>
> > No problem.
> >
> > By the way, I changed the method name to setFixedWatermark. And also, if
> > you want to drop any tuples that are considered late, you need to set the
> > allowed lateness to be 0.
> >
> > David
> >
> > On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bh...@apache.org>
> wrote:
> >
> > > Thanks David.
> > > I'll try to create an implementation for Deduper which uses
> > > WindowedOperator. Will open a PR soon for review.
> > >
> > > ~ Bhupesh
> > >
> > > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <da...@datatorrent.com>
> wrote:
> > >
> > > > Hi Bhupesh,
> > > >
> > > > I just added the method setFixedLateness(long millis) to
> > > > AbstractWindowedOperator in my PR. This will allow you to specify the
> > > > lateness with respect to the timestamp from the window ID without
> > > watermark
> > > > tuples from upstream.
> > > >
> > > > David
> > > >
> > > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >
> > > > > Hi Bhupesh,
> > > > >
> > > > > Yes, the windowed operator currently depends on the watermark
> tuples
> > > > > upstream for any "lateness" related operation. If there is no
> > > watermark,
> > > > > nothing will be considered late. We can add support for lateness
> > > handling
> > > > > without incoming watermark tuples. Let me add that to the pull
> > request.
> > > > >
> > > > > David
> > > > >
> > > > >
> > > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <
> bhupesh@apache.org>
> > > > > wrote:
> > > > >
> > > > >> Hi David,
> > > > >>
> > > > >> Thanks for your reply.
> > > > >>
> > > > >> If I am to use a windowed operator for the Dedup operator, there
> > > should
> > > > be
> > > > >> some operator (upstream to Deduper) which sends the watermark
> > tuples.
> > > > >> These
> > > > >> tuples (along with allowed lateness), will be the ones deciding
> > which
> > > > >> incoming tuples are too late and will be dropped. I have the
> > following
> > > > >> questions:
> > > > >>
> > > > >> Is a windowed operator (which needs watermarks) dependent upon
> some
> > > > other
> > > > >> operator for these tuples? What happens when there are no
> watermark
> > > > tuples
> > > > >> sent from upstream?
> > > > >>
> > > > >> Can a windowed operator "*assume*" the watermark tuples based on
> > some
> > > > >> notion of time? For example, can the Deduper, use the streaming
> > window
> > > > >> time
> > > > >> as the reference to advance the watermark?
> > > > >>
> > > > >> Thanks.
> > > > >>
> > > > >> ~ Bhupesh
> > > > >>
> > > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <da...@datatorrent.com>
> > > > wrote:
> > > > >>
> > > > >> > Hi Bhupesh,
> > > > >> >
> > > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > > >> WindowedStorage
> > > > >> > and WindowedKeyedStorage:
> > > > >> >
> > > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > > >> >
> > > > >> > We expect either to use ManagedState directly, or Spillable
> > > > structures,
> > > > >> > which in turn uses ManagedState.
> > > > >> >
> > > > >> > I'm not very familiar with the dedup operator. but in order to
> use
> > > the
> > > > >> > WindowedOperator, it sounds to me that we can use SlidingWindows
> > > with
> > > > an
> > > > >> > implementation of WindowedKeyedStorage that uses a Bloom filter
> to
> > > > cover
> > > > >> > most of the false cases.
> > > > >> >
> > > > >> > David
> > > > >> >
> > > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> > bhupesh@apache.org>
> > > > >> wrote:
> > > > >> >
> > > > >> > > Hi All,
> > > > >> > >
> > > > >> > > I have looked into Windowing concepts from Apache Beam and the
> > PR
> > > > >> #319 by
> > > > >> > > David. Looks like there are a lot of advanced concepts which
> > could
> > > > be
> > > > >> > used
> > > > >> > > by operators using event time windowing.
> > > > >> > > Additionally I also looked at the Managed State
> implementation.
> > > > >> > >
> > > > >> > > One of the things I noticed is that there is an overlap of
> > > > >> functionality
> > > > >> > > between Managed State and Windowing Support in terms of the
> > > > following:
> > > > >> > >
> > > > >> > >    - *Discarding / Dropping of tuples* from the system -
> Managed
> > > > State
> > > > >> > uses
> > > > >> > >    the concept of expiry while a Windowed operator uses the
> > > concepts
> > > > >> of
> > > > >> > >    Watermarks and allowed lateness. If I try to reconcile the
> > > above
> > > > >> two,
> > > > >> > it
> > > > >> > >    seems like Managed State (through TimeBucketAssigner) is
> > trying
> > > > to
> > > > >> > >    implement some sort of implicit heuristic Watermarks based
> on
> > > > >> either
> > > > >> > the
> > > > >> > >    user supplied time or the event time.
> > > > >> > >    - *Global Window* support - Once we have an option to
> disable
> > > > >> purging
> > > > >> > in
> > > > >> > >    Managed State, it will have similar semantics to the Global
> > > > Window
> > > > >> > > option
> > > > >> > >    in Windowing support.
> > > > >> > >
> > > > >> > > If I understand correctly, is the suggestion to implement the
> > > Dedup
> > > > >> > > operator as a Windowed operator and to use managed state only
> > as a
> > > > >> > storage
> > > > >> > > medium (through WindowedStorage) ? What could be a better way
> of
> > > > going
> > > > >> > > about this?
> > > > >> > >
> > > > >> > > Thanks.
> > > > >> > >
> > > > >> > > ~ Bhupesh
> > > > >> > >
> > > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > > bhupesh@apache.org>
> > > > >> > > wrote:
> > > > >> > >
> > > > >> > > > Hi Thomas,
> > > > >> > > >
> > > > >> > > > I agree that the case of processing bounded data is a
> special
> > > case
> > > > >> of
> > > > >> > > > unbounded data.
> > > > >> > > > Th difference I was pointing out was in terms of expiry.
> This
> > is
> > > > not
> > > > >> > > > applicable in case of bounded data sets, while unbounded
> data
> > > sets
> > > > >> will
> > > > >> > > > inherently use expiry for limiting the amount of data to be
> > > > stored.
> > > > >> > > >
> > > > >> > > > For idempotency when applying expiry on the streaming data,
> I
> > > need
> > > > >> to
> > > > >> > > > explore more on the using the window timestamp that you
> > proposed
> > > > as
> > > > >> > > opposed
> > > > >> > > > to the system time which I was planning to use.
> > > > >> > > >
> > > > >> > > > Thanks.
> > > > >> > > > ~ Bhupesh
> > > > >> > > >
> > > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > > >> thomas@datatorrent.com>
> > > > >> > > > wrote:
> > > > >> > > >
> > > > >> > > >> Bhupesh,
> > > > >> > > >>
> > > > >> > > >> Why is there a distinction between bounded and unbounded
> > data?
> > > I
> > > > >> see
> > > > >> > the
> > > > >> > > >> former as a special case of the latter?
> > > > >> > > >>
> > > > >> > > >> When rewinding the stream or reprocessing the stream in
> > another
> > > > run
> > > > >> > the
> > > > >> > > >> operator should produce the same result.
> > > > >> > > >>
> > > > >> > > >> This operator should be idempotent also. That implies that
> > code
> > > > >> does
> > > > >> > not
> > > > >> > > >> rely on current system time but the window timestamp
> instead.
> > > > >> > > >>
> > > > >> > > >> All of this should be accomplished by using the windowing
> > > > support:
> > > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > > >> > > >>
> > > > >> > > >> Thanks,
> > > > >> > > >> Thomas
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > > >>
> > > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > > >> > > bhupesh@datatorrent.com>
> > > > >> > > >> wrote:
> > > > >> > > >>
> > > > >> > > >> > Hi All,
> > > > >> > > >> >
> > > > >> > > >> > I want to validate the use cases for de-duplication that
> > will
> > > > be
> > > > >> > going
> > > > >> > > >> as
> > > > >> > > >> > part of this implementation.
> > > > >> > > >> >
> > > > >> > > >> >    - *Bounded data set*
> > > > >> > > >> >       - This is de-duplication for bounded data. For
> > example,
> > > > >> data
> > > > >> > > sets
> > > > >> > > >> >       which are old or fixed or which may not have a time
> > > field
> > > > >> at
> > > > >> > > >> > all. Example:
> > > > >> > > >> >       Last year's transaction records or Customer data
> etc.
> > > > >> > > >> >       - Concept of expiry is not needed as this is
> bounded
> > > data
> > > > >> set.
> > > > >> > > >> >       - *Unbounded data set*
> > > > >> > > >> >       - This is de-duplication of online streaming data
> > > > >> > > >> >       - Expiry is needed because here incoming tuples may
> > > > arrive
> > > > >> > later
> > > > >> > > >> than
> > > > >> > > >> >       what they are expected. Expiry is always computed
> by
> > > > taking
> > > > >> > the
> > > > >> > > >> > difference
> > > > >> > > >> >       in System time and the Event time.
> > > > >> > > >> >
> > > > >> > > >> > Any feedback is appreciated.
> > > > >> > > >> >
> > > > >> > > >> > Thanks.
> > > > >> > > >> >
> > > > >> > > >> > ~ Bhupesh
> > > > >> > > >> >
> > > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > > >> > > >> bhupesh@datatorrent.com>
> > > > >> > > >> > wrote:
> > > > >> > > >> >
> > > > >> > > >> > > Hi All,
> > > > >> > > >> > >
> > > > >> > > >> > > I am working on adding a De-duplication operator in
> > Malhar
> > > > >> library
> > > > >> > > >> based
> > > > >> > > >> > > on managed state APIs. I will be working off the
> already
> > > > >> created
> > > > >> > > JIRA
> > > > >> > > >> -
> > > > >> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701
> > and
> > > > the
> > > > >> > > initial
> > > > >> > > >> > > pull request for an AbstractDeduper here:
> > > > >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files
> > > > >> > > >> > >
> > > > >> > > >> > > I am planning to include the following features in the
> > > first
> > > > >> > > version:
> > > > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key ->
> > > > >> Tuple_Time
> > > > >> > > >> > > correlation holds.
> > > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate and
> > > expired
> > > > >> > tuples
> > > > >> > > >> > > respectively.
> > > > >> > > >> > >
> > > > >> > > >> > > Thanks.
> > > > >> > > >> > >
> > > > >> > > >> > > ~ Bhupesh
> > > > >> > > >> > >
> > > > >> > > >> >
> > > > >> > > >>
> > > > >> > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Bhupesh Chawda <bh...@apache.org>.
Hi All,

I also implemented a De-duplication operator using Windowed Operator. Now
we have two implementations, one with Managed state and another using
Windowed operator. Here are their details:

   1. *With Managed State - *
   - The operator is implemented using managed state as the storage for
      buckets into which the tuples will be stored.
      - *TimeBucketAssigner* is used to assign an incoming tuple to
      different buckets based on the event time. It is also used to identify
      whether a particular tuple is expired and should be sent to the expired
      port / dropped.
      - For managed state, the *ManagedTimeUnifiedStateImpl* implementation
      is used which just requires the user to specify the event time
and a bucket
      is automatically assigned based on that. The structure of the bucket data
      on storage is as follows: /operator_id /time_bucket
      - An advantage of using Managed State approach is that we don't have
      to assume the correlation of event time to the de-duplication key of the
      tuple. For example, if we get two tuples like: (K1, T1), and (K1, T2), we
      can still use ManagedStateImpl and conclude that these tuples are
      duplicates based on the Key K1.
      2. *With Windowed Operator - *
   - The operator uses the WindowedOperatorImpl as the base operator.
      - Accumulation, for the deduper, basically amounts to storing a list
      of tuples in the data storage. Every time we get a unique tuple, we
      *accumulate* it in the list.
      - Event windows are modeled using the *TimeWindow* option. Although
      SlidingTimeWIndows seems to be intuitive for data buckets, it seems to be
      the costly option as the accumulation in this case is not just
an aggregate
      value but a list of values in that bucket.
      - Watermarks are not assumed to be sent from an input operator
      (although it is okay if an upstream operator sends them). The
      *fixedWatermark* feature is used to assume watermarks which are
      relative to the window time.
      - One of the issues I found with using WindowedOperator for Dedup is
      that event time is tightly coupled with the de-duplication key. In the
      above example, (K1, T1), and (K1, T2) *might* be concluded as two
      unique tuples since T1 and T2 may fall into two different time buckets.

Here are the PRs for both of them.

   - Using Managed State: https://github.com/apache/apex-malhar/pull/335
   - Using Windowed Operator: https://github.com/apache/apex-malhar/pull/343

Please review them and suggest on the correct approach for the final
implementation which should be used to add other features like fault
tolerance, scalability, optimizations etc.
Thanks.

~ Bhupesh

On Fri, Jul 8, 2016 at 11:30 PM, David Yan <da...@datatorrent.com> wrote:

> No problem.
>
> By the way, I changed the method name to setFixedWatermark. And also, if
> you want to drop any tuples that are considered late, you need to set the
> allowed lateness to be 0.
>
> David
>
> On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bh...@apache.org> wrote:
>
> > Thanks David.
> > I'll try to create an implementation for Deduper which uses
> > WindowedOperator. Will open a PR soon for review.
> >
> > ~ Bhupesh
> >
> > On Fri, Jul 8, 2016 at 2:23 AM, David Yan <da...@datatorrent.com> wrote:
> >
> > > Hi Bhupesh,
> > >
> > > I just added the method setFixedLateness(long millis) to
> > > AbstractWindowedOperator in my PR. This will allow you to specify the
> > > lateness with respect to the timestamp from the window ID without
> > watermark
> > > tuples from upstream.
> > >
> > > David
> > >
> > > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > >
> > > > Hi Bhupesh,
> > > >
> > > > Yes, the windowed operator currently depends on the watermark tuples
> > > > upstream for any "lateness" related operation. If there is no
> > watermark,
> > > > nothing will be considered late. We can add support for lateness
> > handling
> > > > without incoming watermark tuples. Let me add that to the pull
> request.
> > > >
> > > > David
> > > >
> > > >
> > > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <bh...@apache.org>
> > > > wrote:
> > > >
> > > >> Hi David,
> > > >>
> > > >> Thanks for your reply.
> > > >>
> > > >> If I am to use a windowed operator for the Dedup operator, there
> > should
> > > be
> > > >> some operator (upstream to Deduper) which sends the watermark
> tuples.
> > > >> These
> > > >> tuples (along with allowed lateness), will be the ones deciding
> which
> > > >> incoming tuples are too late and will be dropped. I have the
> following
> > > >> questions:
> > > >>
> > > >> Is a windowed operator (which needs watermarks) dependent upon some
> > > other
> > > >> operator for these tuples? What happens when there are no watermark
> > > tuples
> > > >> sent from upstream?
> > > >>
> > > >> Can a windowed operator "*assume*" the watermark tuples based on
> some
> > > >> notion of time? For example, can the Deduper, use the streaming
> window
> > > >> time
> > > >> as the reference to advance the watermark?
> > > >>
> > > >> Thanks.
> > > >>
> > > >> ~ Bhupesh
> > > >>
> > > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <da...@datatorrent.com>
> > > wrote:
> > > >>
> > > >> > Hi Bhupesh,
> > > >> >
> > > >> > FYI, there is a JIRA open for a scalable implementation of
> > > >> WindowedStorage
> > > >> > and WindowedKeyedStorage:
> > > >> >
> > > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > > >> >
> > > >> > We expect either to use ManagedState directly, or Spillable
> > > structures,
> > > >> > which in turn uses ManagedState.
> > > >> >
> > > >> > I'm not very familiar with the dedup operator. but in order to use
> > the
> > > >> > WindowedOperator, it sounds to me that we can use SlidingWindows
> > with
> > > an
> > > >> > implementation of WindowedKeyedStorage that uses a Bloom filter to
> > > cover
> > > >> > most of the false cases.
> > > >> >
> > > >> > David
> > > >> >
> > > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <
> bhupesh@apache.org>
> > > >> wrote:
> > > >> >
> > > >> > > Hi All,
> > > >> > >
> > > >> > > I have looked into Windowing concepts from Apache Beam and the
> PR
> > > >> #319 by
> > > >> > > David. Looks like there are a lot of advanced concepts which
> could
> > > be
> > > >> > used
> > > >> > > by operators using event time windowing.
> > > >> > > Additionally I also looked at the Managed State implementation.
> > > >> > >
> > > >> > > One of the things I noticed is that there is an overlap of
> > > >> functionality
> > > >> > > between Managed State and Windowing Support in terms of the
> > > following:
> > > >> > >
> > > >> > >    - *Discarding / Dropping of tuples* from the system - Managed
> > > State
> > > >> > uses
> > > >> > >    the concept of expiry while a Windowed operator uses the
> > concepts
> > > >> of
> > > >> > >    Watermarks and allowed lateness. If I try to reconcile the
> > above
> > > >> two,
> > > >> > it
> > > >> > >    seems like Managed State (through TimeBucketAssigner) is
> trying
> > > to
> > > >> > >    implement some sort of implicit heuristic Watermarks based on
> > > >> either
> > > >> > the
> > > >> > >    user supplied time or the event time.
> > > >> > >    - *Global Window* support - Once we have an option to disable
> > > >> purging
> > > >> > in
> > > >> > >    Managed State, it will have similar semantics to the Global
> > > Window
> > > >> > > option
> > > >> > >    in Windowing support.
> > > >> > >
> > > >> > > If I understand correctly, is the suggestion to implement the
> > Dedup
> > > >> > > operator as a Windowed operator and to use managed state only
> as a
> > > >> > storage
> > > >> > > medium (through WindowedStorage) ? What could be a better way of
> > > going
> > > >> > > about this?
> > > >> > >
> > > >> > > Thanks.
> > > >> > >
> > > >> > > ~ Bhupesh
> > > >> > >
> > > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > > bhupesh@apache.org>
> > > >> > > wrote:
> > > >> > >
> > > >> > > > Hi Thomas,
> > > >> > > >
> > > >> > > > I agree that the case of processing bounded data is a special
> > case
> > > >> of
> > > >> > > > unbounded data.
> > > >> > > > Th difference I was pointing out was in terms of expiry. This
> is
> > > not
> > > >> > > > applicable in case of bounded data sets, while unbounded data
> > sets
> > > >> will
> > > >> > > > inherently use expiry for limiting the amount of data to be
> > > stored.
> > > >> > > >
> > > >> > > > For idempotency when applying expiry on the streaming data, I
> > need
> > > >> to
> > > >> > > > explore more on the using the window timestamp that you
> proposed
> > > as
> > > >> > > opposed
> > > >> > > > to the system time which I was planning to use.
> > > >> > > >
> > > >> > > > Thanks.
> > > >> > > > ~ Bhupesh
> > > >> > > >
> > > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > > >> thomas@datatorrent.com>
> > > >> > > > wrote:
> > > >> > > >
> > > >> > > >> Bhupesh,
> > > >> > > >>
> > > >> > > >> Why is there a distinction between bounded and unbounded
> data?
> > I
> > > >> see
> > > >> > the
> > > >> > > >> former as a special case of the latter?
> > > >> > > >>
> > > >> > > >> When rewinding the stream or reprocessing the stream in
> another
> > > run
> > > >> > the
> > > >> > > >> operator should produce the same result.
> > > >> > > >>
> > > >> > > >> This operator should be idempotent also. That implies that
> code
> > > >> does
> > > >> > not
> > > >> > > >> rely on current system time but the window timestamp instead.
> > > >> > > >>
> > > >> > > >> All of this should be accomplished by using the windowing
> > > support:
> > > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > > >> > > >>
> > > >> > > >> Thanks,
> > > >> > > >> Thomas
> > > >> > > >>
> > > >> > > >>
> > > >> > > >>
> > > >> > > >>
> > > >> > > >>
> > > >> > > >>
> > > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > >> > > bhupesh@datatorrent.com>
> > > >> > > >> wrote:
> > > >> > > >>
> > > >> > > >> > Hi All,
> > > >> > > >> >
> > > >> > > >> > I want to validate the use cases for de-duplication that
> will
> > > be
> > > >> > going
> > > >> > > >> as
> > > >> > > >> > part of this implementation.
> > > >> > > >> >
> > > >> > > >> >    - *Bounded data set*
> > > >> > > >> >       - This is de-duplication for bounded data. For
> example,
> > > >> data
> > > >> > > sets
> > > >> > > >> >       which are old or fixed or which may not have a time
> > field
> > > >> at
> > > >> > > >> > all. Example:
> > > >> > > >> >       Last year's transaction records or Customer data etc.
> > > >> > > >> >       - Concept of expiry is not needed as this is bounded
> > data
> > > >> set.
> > > >> > > >> >       - *Unbounded data set*
> > > >> > > >> >       - This is de-duplication of online streaming data
> > > >> > > >> >       - Expiry is needed because here incoming tuples may
> > > arrive
> > > >> > later
> > > >> > > >> than
> > > >> > > >> >       what they are expected. Expiry is always computed by
> > > taking
> > > >> > the
> > > >> > > >> > difference
> > > >> > > >> >       in System time and the Event time.
> > > >> > > >> >
> > > >> > > >> > Any feedback is appreciated.
> > > >> > > >> >
> > > >> > > >> > Thanks.
> > > >> > > >> >
> > > >> > > >> > ~ Bhupesh
> > > >> > > >> >
> > > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > >> > > >> bhupesh@datatorrent.com>
> > > >> > > >> > wrote:
> > > >> > > >> >
> > > >> > > >> > > Hi All,
> > > >> > > >> > >
> > > >> > > >> > > I am working on adding a De-duplication operator in
> Malhar
> > > >> library
> > > >> > > >> based
> > > >> > > >> > > on managed state APIs. I will be working off the already
> > > >> created
> > > >> > > JIRA
> > > >> > > >> -
> > > >> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701
> and
> > > the
> > > >> > > initial
> > > >> > > >> > > pull request for an AbstractDeduper here:
> > > >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files
> > > >> > > >> > >
> > > >> > > >> > > I am planning to include the following features in the
> > first
> > > >> > > version:
> > > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key ->
> > > >> Tuple_Time
> > > >> > > >> > > correlation holds.
> > > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate and
> > expired
> > > >> > tuples
> > > >> > > >> > > respectively.
> > > >> > > >> > >
> > > >> > > >> > > Thanks.
> > > >> > > >> > >
> > > >> > > >> > > ~ Bhupesh
> > > >> > > >> > >
> > > >> > > >> >
> > > >> > > >>
> > > >> > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by David Yan <da...@datatorrent.com>.
No problem.

By the way, I changed the method name to setFixedWatermark. And also, if
you want to drop any tuples that are considered late, you need to set the
allowed lateness to be 0.

David

On Fri, Jul 8, 2016 at 4:55 AM, Bhupesh Chawda <bh...@apache.org> wrote:

> Thanks David.
> I'll try to create an implementation for Deduper which uses
> WindowedOperator. Will open a PR soon for review.
>
> ~ Bhupesh
>
> On Fri, Jul 8, 2016 at 2:23 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Hi Bhupesh,
> >
> > I just added the method setFixedLateness(long millis) to
> > AbstractWindowedOperator in my PR. This will allow you to specify the
> > lateness with respect to the timestamp from the window ID without
> watermark
> > tuples from upstream.
> >
> > David
> >
> > On Thu, Jul 7, 2016 at 11:49 AM, David Yan <da...@datatorrent.com>
> wrote:
> >
> > > Hi Bhupesh,
> > >
> > > Yes, the windowed operator currently depends on the watermark tuples
> > > upstream for any "lateness" related operation. If there is no
> watermark,
> > > nothing will be considered late. We can add support for lateness
> handling
> > > without incoming watermark tuples. Let me add that to the pull request.
> > >
> > > David
> > >
> > >
> > > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <bh...@apache.org>
> > > wrote:
> > >
> > >> Hi David,
> > >>
> > >> Thanks for your reply.
> > >>
> > >> If I am to use a windowed operator for the Dedup operator, there
> should
> > be
> > >> some operator (upstream to Deduper) which sends the watermark tuples.
> > >> These
> > >> tuples (along with allowed lateness), will be the ones deciding which
> > >> incoming tuples are too late and will be dropped. I have the following
> > >> questions:
> > >>
> > >> Is a windowed operator (which needs watermarks) dependent upon some
> > other
> > >> operator for these tuples? What happens when there are no watermark
> > tuples
> > >> sent from upstream?
> > >>
> > >> Can a windowed operator "*assume*" the watermark tuples based on some
> > >> notion of time? For example, can the Deduper, use the streaming window
> > >> time
> > >> as the reference to advance the watermark?
> > >>
> > >> Thanks.
> > >>
> > >> ~ Bhupesh
> > >>
> > >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <da...@datatorrent.com>
> > wrote:
> > >>
> > >> > Hi Bhupesh,
> > >> >
> > >> > FYI, there is a JIRA open for a scalable implementation of
> > >> WindowedStorage
> > >> > and WindowedKeyedStorage:
> > >> >
> > >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> > >> >
> > >> > We expect either to use ManagedState directly, or Spillable
> > structures,
> > >> > which in turn uses ManagedState.
> > >> >
> > >> > I'm not very familiar with the dedup operator. but in order to use
> the
> > >> > WindowedOperator, it sounds to me that we can use SlidingWindows
> with
> > an
> > >> > implementation of WindowedKeyedStorage that uses a Bloom filter to
> > cover
> > >> > most of the false cases.
> > >> >
> > >> > David
> > >> >
> > >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <bh...@apache.org>
> > >> wrote:
> > >> >
> > >> > > Hi All,
> > >> > >
> > >> > > I have looked into Windowing concepts from Apache Beam and the PR
> > >> #319 by
> > >> > > David. Looks like there are a lot of advanced concepts which could
> > be
> > >> > used
> > >> > > by operators using event time windowing.
> > >> > > Additionally I also looked at the Managed State implementation.
> > >> > >
> > >> > > One of the things I noticed is that there is an overlap of
> > >> functionality
> > >> > > between Managed State and Windowing Support in terms of the
> > following:
> > >> > >
> > >> > >    - *Discarding / Dropping of tuples* from the system - Managed
> > State
> > >> > uses
> > >> > >    the concept of expiry while a Windowed operator uses the
> concepts
> > >> of
> > >> > >    Watermarks and allowed lateness. If I try to reconcile the
> above
> > >> two,
> > >> > it
> > >> > >    seems like Managed State (through TimeBucketAssigner) is trying
> > to
> > >> > >    implement some sort of implicit heuristic Watermarks based on
> > >> either
> > >> > the
> > >> > >    user supplied time or the event time.
> > >> > >    - *Global Window* support - Once we have an option to disable
> > >> purging
> > >> > in
> > >> > >    Managed State, it will have similar semantics to the Global
> > Window
> > >> > > option
> > >> > >    in Windowing support.
> > >> > >
> > >> > > If I understand correctly, is the suggestion to implement the
> Dedup
> > >> > > operator as a Windowed operator and to use managed state only as a
> > >> > storage
> > >> > > medium (through WindowedStorage) ? What could be a better way of
> > going
> > >> > > about this?
> > >> > >
> > >> > > Thanks.
> > >> > >
> > >> > > ~ Bhupesh
> > >> > >
> > >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> > bhupesh@apache.org>
> > >> > > wrote:
> > >> > >
> > >> > > > Hi Thomas,
> > >> > > >
> > >> > > > I agree that the case of processing bounded data is a special
> case
> > >> of
> > >> > > > unbounded data.
> > >> > > > Th difference I was pointing out was in terms of expiry. This is
> > not
> > >> > > > applicable in case of bounded data sets, while unbounded data
> sets
> > >> will
> > >> > > > inherently use expiry for limiting the amount of data to be
> > stored.
> > >> > > >
> > >> > > > For idempotency when applying expiry on the streaming data, I
> need
> > >> to
> > >> > > > explore more on the using the window timestamp that you proposed
> > as
> > >> > > opposed
> > >> > > > to the system time which I was planning to use.
> > >> > > >
> > >> > > > Thanks.
> > >> > > > ~ Bhupesh
> > >> > > >
> > >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> > >> thomas@datatorrent.com>
> > >> > > > wrote:
> > >> > > >
> > >> > > >> Bhupesh,
> > >> > > >>
> > >> > > >> Why is there a distinction between bounded and unbounded data?
> I
> > >> see
> > >> > the
> > >> > > >> former as a special case of the latter?
> > >> > > >>
> > >> > > >> When rewinding the stream or reprocessing the stream in another
> > run
> > >> > the
> > >> > > >> operator should produce the same result.
> > >> > > >>
> > >> > > >> This operator should be idempotent also. That implies that code
> > >> does
> > >> > not
> > >> > > >> rely on current system time but the window timestamp instead.
> > >> > > >>
> > >> > > >> All of this should be accomplished by using the windowing
> > support:
> > >> > > >> https://github.com/apache/apex-malhar/pull/319
> > >> > > >>
> > >> > > >> Thanks,
> > >> > > >> Thomas
> > >> > > >>
> > >> > > >>
> > >> > > >>
> > >> > > >>
> > >> > > >>
> > >> > > >>
> > >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > >> > > bhupesh@datatorrent.com>
> > >> > > >> wrote:
> > >> > > >>
> > >> > > >> > Hi All,
> > >> > > >> >
> > >> > > >> > I want to validate the use cases for de-duplication that will
> > be
> > >> > going
> > >> > > >> as
> > >> > > >> > part of this implementation.
> > >> > > >> >
> > >> > > >> >    - *Bounded data set*
> > >> > > >> >       - This is de-duplication for bounded data. For example,
> > >> data
> > >> > > sets
> > >> > > >> >       which are old or fixed or which may not have a time
> field
> > >> at
> > >> > > >> > all. Example:
> > >> > > >> >       Last year's transaction records or Customer data etc.
> > >> > > >> >       - Concept of expiry is not needed as this is bounded
> data
> > >> set.
> > >> > > >> >       - *Unbounded data set*
> > >> > > >> >       - This is de-duplication of online streaming data
> > >> > > >> >       - Expiry is needed because here incoming tuples may
> > arrive
> > >> > later
> > >> > > >> than
> > >> > > >> >       what they are expected. Expiry is always computed by
> > taking
> > >> > the
> > >> > > >> > difference
> > >> > > >> >       in System time and the Event time.
> > >> > > >> >
> > >> > > >> > Any feedback is appreciated.
> > >> > > >> >
> > >> > > >> > Thanks.
> > >> > > >> >
> > >> > > >> > ~ Bhupesh
> > >> > > >> >
> > >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > >> > > >> bhupesh@datatorrent.com>
> > >> > > >> > wrote:
> > >> > > >> >
> > >> > > >> > > Hi All,
> > >> > > >> > >
> > >> > > >> > > I am working on adding a De-duplication operator in Malhar
> > >> library
> > >> > > >> based
> > >> > > >> > > on managed state APIs. I will be working off the already
> > >> created
> > >> > > JIRA
> > >> > > >> -
> > >> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and
> > the
> > >> > > initial
> > >> > > >> > > pull request for an AbstractDeduper here:
> > >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files
> > >> > > >> > >
> > >> > > >> > > I am planning to include the following features in the
> first
> > >> > > version:
> > >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key ->
> > >> Tuple_Time
> > >> > > >> > > correlation holds.
> > >> > > >> > > 2. Option to maintain order of incoming tuples.
> > >> > > >> > > 3. Duplicate and Expired ports to emit duplicate and
> expired
> > >> > tuples
> > >> > > >> > > respectively.
> > >> > > >> > >
> > >> > > >> > > Thanks.
> > >> > > >> > >
> > >> > > >> > > ~ Bhupesh
> > >> > > >> > >
> > >> > > >> >
> > >> > > >>
> > >> > > >
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Bhupesh Chawda <bh...@apache.org>.
Thanks David.
I'll try to create an implementation for Deduper which uses
WindowedOperator. Will open a PR soon for review.

~ Bhupesh

On Fri, Jul 8, 2016 at 2:23 AM, David Yan <da...@datatorrent.com> wrote:

> Hi Bhupesh,
>
> I just added the method setFixedLateness(long millis) to
> AbstractWindowedOperator in my PR. This will allow you to specify the
> lateness with respect to the timestamp from the window ID without watermark
> tuples from upstream.
>
> David
>
> On Thu, Jul 7, 2016 at 11:49 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Hi Bhupesh,
> >
> > Yes, the windowed operator currently depends on the watermark tuples
> > upstream for any "lateness" related operation. If there is no watermark,
> > nothing will be considered late. We can add support for lateness handling
> > without incoming watermark tuples. Let me add that to the pull request.
> >
> > David
> >
> >
> > On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <bh...@apache.org>
> > wrote:
> >
> >> Hi David,
> >>
> >> Thanks for your reply.
> >>
> >> If I am to use a windowed operator for the Dedup operator, there should
> be
> >> some operator (upstream to Deduper) which sends the watermark tuples.
> >> These
> >> tuples (along with allowed lateness), will be the ones deciding which
> >> incoming tuples are too late and will be dropped. I have the following
> >> questions:
> >>
> >> Is a windowed operator (which needs watermarks) dependent upon some
> other
> >> operator for these tuples? What happens when there are no watermark
> tuples
> >> sent from upstream?
> >>
> >> Can a windowed operator "*assume*" the watermark tuples based on some
> >> notion of time? For example, can the Deduper, use the streaming window
> >> time
> >> as the reference to advance the watermark?
> >>
> >> Thanks.
> >>
> >> ~ Bhupesh
> >>
> >> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <da...@datatorrent.com>
> wrote:
> >>
> >> > Hi Bhupesh,
> >> >
> >> > FYI, there is a JIRA open for a scalable implementation of
> >> WindowedStorage
> >> > and WindowedKeyedStorage:
> >> >
> >> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> >> >
> >> > We expect either to use ManagedState directly, or Spillable
> structures,
> >> > which in turn uses ManagedState.
> >> >
> >> > I'm not very familiar with the dedup operator. but in order to use the
> >> > WindowedOperator, it sounds to me that we can use SlidingWindows with
> an
> >> > implementation of WindowedKeyedStorage that uses a Bloom filter to
> cover
> >> > most of the false cases.
> >> >
> >> > David
> >> >
> >> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <bh...@apache.org>
> >> wrote:
> >> >
> >> > > Hi All,
> >> > >
> >> > > I have looked into Windowing concepts from Apache Beam and the PR
> >> #319 by
> >> > > David. Looks like there are a lot of advanced concepts which could
> be
> >> > used
> >> > > by operators using event time windowing.
> >> > > Additionally I also looked at the Managed State implementation.
> >> > >
> >> > > One of the things I noticed is that there is an overlap of
> >> functionality
> >> > > between Managed State and Windowing Support in terms of the
> following:
> >> > >
> >> > >    - *Discarding / Dropping of tuples* from the system - Managed
> State
> >> > uses
> >> > >    the concept of expiry while a Windowed operator uses the concepts
> >> of
> >> > >    Watermarks and allowed lateness. If I try to reconcile the above
> >> two,
> >> > it
> >> > >    seems like Managed State (through TimeBucketAssigner) is trying
> to
> >> > >    implement some sort of implicit heuristic Watermarks based on
> >> either
> >> > the
> >> > >    user supplied time or the event time.
> >> > >    - *Global Window* support - Once we have an option to disable
> >> purging
> >> > in
> >> > >    Managed State, it will have similar semantics to the Global
> Window
> >> > > option
> >> > >    in Windowing support.
> >> > >
> >> > > If I understand correctly, is the suggestion to implement the Dedup
> >> > > operator as a Windowed operator and to use managed state only as a
> >> > storage
> >> > > medium (through WindowedStorage) ? What could be a better way of
> going
> >> > > about this?
> >> > >
> >> > > Thanks.
> >> > >
> >> > > ~ Bhupesh
> >> > >
> >> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <
> bhupesh@apache.org>
> >> > > wrote:
> >> > >
> >> > > > Hi Thomas,
> >> > > >
> >> > > > I agree that the case of processing bounded data is a special case
> >> of
> >> > > > unbounded data.
> >> > > > Th difference I was pointing out was in terms of expiry. This is
> not
> >> > > > applicable in case of bounded data sets, while unbounded data sets
> >> will
> >> > > > inherently use expiry for limiting the amount of data to be
> stored.
> >> > > >
> >> > > > For idempotency when applying expiry on the streaming data, I need
> >> to
> >> > > > explore more on the using the window timestamp that you proposed
> as
> >> > > opposed
> >> > > > to the system time which I was planning to use.
> >> > > >
> >> > > > Thanks.
> >> > > > ~ Bhupesh
> >> > > >
> >> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> >> thomas@datatorrent.com>
> >> > > > wrote:
> >> > > >
> >> > > >> Bhupesh,
> >> > > >>
> >> > > >> Why is there a distinction between bounded and unbounded data? I
> >> see
> >> > the
> >> > > >> former as a special case of the latter?
> >> > > >>
> >> > > >> When rewinding the stream or reprocessing the stream in another
> run
> >> > the
> >> > > >> operator should produce the same result.
> >> > > >>
> >> > > >> This operator should be idempotent also. That implies that code
> >> does
> >> > not
> >> > > >> rely on current system time but the window timestamp instead.
> >> > > >>
> >> > > >> All of this should be accomplished by using the windowing
> support:
> >> > > >> https://github.com/apache/apex-malhar/pull/319
> >> > > >>
> >> > > >> Thanks,
> >> > > >> Thomas
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >>
> >> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> >> > > bhupesh@datatorrent.com>
> >> > > >> wrote:
> >> > > >>
> >> > > >> > Hi All,
> >> > > >> >
> >> > > >> > I want to validate the use cases for de-duplication that will
> be
> >> > going
> >> > > >> as
> >> > > >> > part of this implementation.
> >> > > >> >
> >> > > >> >    - *Bounded data set*
> >> > > >> >       - This is de-duplication for bounded data. For example,
> >> data
> >> > > sets
> >> > > >> >       which are old or fixed or which may not have a time field
> >> at
> >> > > >> > all. Example:
> >> > > >> >       Last year's transaction records or Customer data etc.
> >> > > >> >       - Concept of expiry is not needed as this is bounded data
> >> set.
> >> > > >> >       - *Unbounded data set*
> >> > > >> >       - This is de-duplication of online streaming data
> >> > > >> >       - Expiry is needed because here incoming tuples may
> arrive
> >> > later
> >> > > >> than
> >> > > >> >       what they are expected. Expiry is always computed by
> taking
> >> > the
> >> > > >> > difference
> >> > > >> >       in System time and the Event time.
> >> > > >> >
> >> > > >> > Any feedback is appreciated.
> >> > > >> >
> >> > > >> > Thanks.
> >> > > >> >
> >> > > >> > ~ Bhupesh
> >> > > >> >
> >> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> >> > > >> bhupesh@datatorrent.com>
> >> > > >> > wrote:
> >> > > >> >
> >> > > >> > > Hi All,
> >> > > >> > >
> >> > > >> > > I am working on adding a De-duplication operator in Malhar
> >> library
> >> > > >> based
> >> > > >> > > on managed state APIs. I will be working off the already
> >> created
> >> > > JIRA
> >> > > >> -
> >> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and
> the
> >> > > initial
> >> > > >> > > pull request for an AbstractDeduper here:
> >> > > >> > > https://github.com/apache/apex-malhar/pull/260/files
> >> > > >> > >
> >> > > >> > > I am planning to include the following features in the first
> >> > > version:
> >> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key ->
> >> Tuple_Time
> >> > > >> > > correlation holds.
> >> > > >> > > 2. Option to maintain order of incoming tuples.
> >> > > >> > > 3. Duplicate and Expired ports to emit duplicate and expired
> >> > tuples
> >> > > >> > > respectively.
> >> > > >> > >
> >> > > >> > > Thanks.
> >> > > >> > >
> >> > > >> > > ~ Bhupesh
> >> > > >> > >
> >> > > >> >
> >> > > >>
> >> > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by David Yan <da...@datatorrent.com>.
Hi Bhupesh,

I just added the method setFixedLateness(long millis) to
AbstractWindowedOperator in my PR. This will allow you to specify the
lateness with respect to the timestamp from the window ID without watermark
tuples from upstream.

David

On Thu, Jul 7, 2016 at 11:49 AM, David Yan <da...@datatorrent.com> wrote:

> Hi Bhupesh,
>
> Yes, the windowed operator currently depends on the watermark tuples
> upstream for any "lateness" related operation. If there is no watermark,
> nothing will be considered late. We can add support for lateness handling
> without incoming watermark tuples. Let me add that to the pull request.
>
> David
>
>
> On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <bh...@apache.org>
> wrote:
>
>> Hi David,
>>
>> Thanks for your reply.
>>
>> If I am to use a windowed operator for the Dedup operator, there should be
>> some operator (upstream to Deduper) which sends the watermark tuples.
>> These
>> tuples (along with allowed lateness), will be the ones deciding which
>> incoming tuples are too late and will be dropped. I have the following
>> questions:
>>
>> Is a windowed operator (which needs watermarks) dependent upon some other
>> operator for these tuples? What happens when there are no watermark tuples
>> sent from upstream?
>>
>> Can a windowed operator "*assume*" the watermark tuples based on some
>> notion of time? For example, can the Deduper, use the streaming window
>> time
>> as the reference to advance the watermark?
>>
>> Thanks.
>>
>> ~ Bhupesh
>>
>> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <da...@datatorrent.com> wrote:
>>
>> > Hi Bhupesh,
>> >
>> > FYI, there is a JIRA open for a scalable implementation of
>> WindowedStorage
>> > and WindowedKeyedStorage:
>> >
>> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
>> >
>> > We expect either to use ManagedState directly, or Spillable structures,
>> > which in turn uses ManagedState.
>> >
>> > I'm not very familiar with the dedup operator. but in order to use the
>> > WindowedOperator, it sounds to me that we can use SlidingWindows with an
>> > implementation of WindowedKeyedStorage that uses a Bloom filter to cover
>> > most of the false cases.
>> >
>> > David
>> >
>> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <bh...@apache.org>
>> wrote:
>> >
>> > > Hi All,
>> > >
>> > > I have looked into Windowing concepts from Apache Beam and the PR
>> #319 by
>> > > David. Looks like there are a lot of advanced concepts which could be
>> > used
>> > > by operators using event time windowing.
>> > > Additionally I also looked at the Managed State implementation.
>> > >
>> > > One of the things I noticed is that there is an overlap of
>> functionality
>> > > between Managed State and Windowing Support in terms of the following:
>> > >
>> > >    - *Discarding / Dropping of tuples* from the system - Managed State
>> > uses
>> > >    the concept of expiry while a Windowed operator uses the concepts
>> of
>> > >    Watermarks and allowed lateness. If I try to reconcile the above
>> two,
>> > it
>> > >    seems like Managed State (through TimeBucketAssigner) is trying to
>> > >    implement some sort of implicit heuristic Watermarks based on
>> either
>> > the
>> > >    user supplied time or the event time.
>> > >    - *Global Window* support - Once we have an option to disable
>> purging
>> > in
>> > >    Managed State, it will have similar semantics to the Global Window
>> > > option
>> > >    in Windowing support.
>> > >
>> > > If I understand correctly, is the suggestion to implement the Dedup
>> > > operator as a Windowed operator and to use managed state only as a
>> > storage
>> > > medium (through WindowedStorage) ? What could be a better way of going
>> > > about this?
>> > >
>> > > Thanks.
>> > >
>> > > ~ Bhupesh
>> > >
>> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <bh...@apache.org>
>> > > wrote:
>> > >
>> > > > Hi Thomas,
>> > > >
>> > > > I agree that the case of processing bounded data is a special case
>> of
>> > > > unbounded data.
>> > > > Th difference I was pointing out was in terms of expiry. This is not
>> > > > applicable in case of bounded data sets, while unbounded data sets
>> will
>> > > > inherently use expiry for limiting the amount of data to be stored.
>> > > >
>> > > > For idempotency when applying expiry on the streaming data, I need
>> to
>> > > > explore more on the using the window timestamp that you proposed as
>> > > opposed
>> > > > to the system time which I was planning to use.
>> > > >
>> > > > Thanks.
>> > > > ~ Bhupesh
>> > > >
>> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
>> thomas@datatorrent.com>
>> > > > wrote:
>> > > >
>> > > >> Bhupesh,
>> > > >>
>> > > >> Why is there a distinction between bounded and unbounded data? I
>> see
>> > the
>> > > >> former as a special case of the latter?
>> > > >>
>> > > >> When rewinding the stream or reprocessing the stream in another run
>> > the
>> > > >> operator should produce the same result.
>> > > >>
>> > > >> This operator should be idempotent also. That implies that code
>> does
>> > not
>> > > >> rely on current system time but the window timestamp instead.
>> > > >>
>> > > >> All of this should be accomplished by using the windowing support:
>> > > >> https://github.com/apache/apex-malhar/pull/319
>> > > >>
>> > > >> Thanks,
>> > > >> Thomas
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >>
>> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
>> > > bhupesh@datatorrent.com>
>> > > >> wrote:
>> > > >>
>> > > >> > Hi All,
>> > > >> >
>> > > >> > I want to validate the use cases for de-duplication that will be
>> > going
>> > > >> as
>> > > >> > part of this implementation.
>> > > >> >
>> > > >> >    - *Bounded data set*
>> > > >> >       - This is de-duplication for bounded data. For example,
>> data
>> > > sets
>> > > >> >       which are old or fixed or which may not have a time field
>> at
>> > > >> > all. Example:
>> > > >> >       Last year's transaction records or Customer data etc.
>> > > >> >       - Concept of expiry is not needed as this is bounded data
>> set.
>> > > >> >       - *Unbounded data set*
>> > > >> >       - This is de-duplication of online streaming data
>> > > >> >       - Expiry is needed because here incoming tuples may arrive
>> > later
>> > > >> than
>> > > >> >       what they are expected. Expiry is always computed by taking
>> > the
>> > > >> > difference
>> > > >> >       in System time and the Event time.
>> > > >> >
>> > > >> > Any feedback is appreciated.
>> > > >> >
>> > > >> > Thanks.
>> > > >> >
>> > > >> > ~ Bhupesh
>> > > >> >
>> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
>> > > >> bhupesh@datatorrent.com>
>> > > >> > wrote:
>> > > >> >
>> > > >> > > Hi All,
>> > > >> > >
>> > > >> > > I am working on adding a De-duplication operator in Malhar
>> library
>> > > >> based
>> > > >> > > on managed state APIs. I will be working off the already
>> created
>> > > JIRA
>> > > >> -
>> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the
>> > > initial
>> > > >> > > pull request for an AbstractDeduper here:
>> > > >> > > https://github.com/apache/apex-malhar/pull/260/files
>> > > >> > >
>> > > >> > > I am planning to include the following features in the first
>> > > version:
>> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key ->
>> Tuple_Time
>> > > >> > > correlation holds.
>> > > >> > > 2. Option to maintain order of incoming tuples.
>> > > >> > > 3. Duplicate and Expired ports to emit duplicate and expired
>> > tuples
>> > > >> > > respectively.
>> > > >> > >
>> > > >> > > Thanks.
>> > > >> > >
>> > > >> > > ~ Bhupesh
>> > > >> > >
>> > > >> >
>> > > >>
>> > > >
>> > > >
>> > >
>> >
>>
>
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by David Yan <da...@datatorrent.com>.
Hi Bhupesh,

Yes, the windowed operator currently depends on the watermark tuples
upstream for any "lateness" related operation. If there is no watermark,
nothing will be considered late. We can add support for lateness handling
without incoming watermark tuples. Let me add that to the pull request.

David


On Wed, Jul 6, 2016 at 10:48 PM, Bhupesh Chawda <bh...@apache.org> wrote:

> Hi David,
>
> Thanks for your reply.
>
> If I am to use a windowed operator for the Dedup operator, there should be
> some operator (upstream to Deduper) which sends the watermark tuples. These
> tuples (along with allowed lateness), will be the ones deciding which
> incoming tuples are too late and will be dropped. I have the following
> questions:
>
> Is a windowed operator (which needs watermarks) dependent upon some other
> operator for these tuples? What happens when there are no watermark tuples
> sent from upstream?
>
> Can a windowed operator "*assume*" the watermark tuples based on some
> notion of time? For example, can the Deduper, use the streaming window time
> as the reference to advance the watermark?
>
> Thanks.
>
> ~ Bhupesh
>
> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <da...@datatorrent.com> wrote:
>
> > Hi Bhupesh,
> >
> > FYI, there is a JIRA open for a scalable implementation of
> WindowedStorage
> > and WindowedKeyedStorage:
> >
> > https://issues.apache.org/jira/browse/APEXMALHAR-2130
> >
> > We expect either to use ManagedState directly, or Spillable structures,
> > which in turn uses ManagedState.
> >
> > I'm not very familiar with the dedup operator. but in order to use the
> > WindowedOperator, it sounds to me that we can use SlidingWindows with an
> > implementation of WindowedKeyedStorage that uses a Bloom filter to cover
> > most of the false cases.
> >
> > David
> >
> > On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <bh...@apache.org>
> wrote:
> >
> > > Hi All,
> > >
> > > I have looked into Windowing concepts from Apache Beam and the PR #319
> by
> > > David. Looks like there are a lot of advanced concepts which could be
> > used
> > > by operators using event time windowing.
> > > Additionally I also looked at the Managed State implementation.
> > >
> > > One of the things I noticed is that there is an overlap of
> functionality
> > > between Managed State and Windowing Support in terms of the following:
> > >
> > >    - *Discarding / Dropping of tuples* from the system - Managed State
> > uses
> > >    the concept of expiry while a Windowed operator uses the concepts of
> > >    Watermarks and allowed lateness. If I try to reconcile the above
> two,
> > it
> > >    seems like Managed State (through TimeBucketAssigner) is trying to
> > >    implement some sort of implicit heuristic Watermarks based on either
> > the
> > >    user supplied time or the event time.
> > >    - *Global Window* support - Once we have an option to disable
> purging
> > in
> > >    Managed State, it will have similar semantics to the Global Window
> > > option
> > >    in Windowing support.
> > >
> > > If I understand correctly, is the suggestion to implement the Dedup
> > > operator as a Windowed operator and to use managed state only as a
> > storage
> > > medium (through WindowedStorage) ? What could be a better way of going
> > > about this?
> > >
> > > Thanks.
> > >
> > > ~ Bhupesh
> > >
> > > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <bh...@apache.org>
> > > wrote:
> > >
> > > > Hi Thomas,
> > > >
> > > > I agree that the case of processing bounded data is a special case of
> > > > unbounded data.
> > > > Th difference I was pointing out was in terms of expiry. This is not
> > > > applicable in case of bounded data sets, while unbounded data sets
> will
> > > > inherently use expiry for limiting the amount of data to be stored.
> > > >
> > > > For idempotency when applying expiry on the streaming data, I need to
> > > > explore more on the using the window timestamp that you proposed as
> > > opposed
> > > > to the system time which I was planning to use.
> > > >
> > > > Thanks.
> > > > ~ Bhupesh
> > > >
> > > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <
> thomas@datatorrent.com>
> > > > wrote:
> > > >
> > > >> Bhupesh,
> > > >>
> > > >> Why is there a distinction between bounded and unbounded data? I see
> > the
> > > >> former as a special case of the latter?
> > > >>
> > > >> When rewinding the stream or reprocessing the stream in another run
> > the
> > > >> operator should produce the same result.
> > > >>
> > > >> This operator should be idempotent also. That implies that code does
> > not
> > > >> rely on current system time but the window timestamp instead.
> > > >>
> > > >> All of this should be accomplished by using the windowing support:
> > > >> https://github.com/apache/apex-malhar/pull/319
> > > >>
> > > >> Thanks,
> > > >> Thomas
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >>
> > > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > > bhupesh@datatorrent.com>
> > > >> wrote:
> > > >>
> > > >> > Hi All,
> > > >> >
> > > >> > I want to validate the use cases for de-duplication that will be
> > going
> > > >> as
> > > >> > part of this implementation.
> > > >> >
> > > >> >    - *Bounded data set*
> > > >> >       - This is de-duplication for bounded data. For example, data
> > > sets
> > > >> >       which are old or fixed or which may not have a time field at
> > > >> > all. Example:
> > > >> >       Last year's transaction records or Customer data etc.
> > > >> >       - Concept of expiry is not needed as this is bounded data
> set.
> > > >> >       - *Unbounded data set*
> > > >> >       - This is de-duplication of online streaming data
> > > >> >       - Expiry is needed because here incoming tuples may arrive
> > later
> > > >> than
> > > >> >       what they are expected. Expiry is always computed by taking
> > the
> > > >> > difference
> > > >> >       in System time and the Event time.
> > > >> >
> > > >> > Any feedback is appreciated.
> > > >> >
> > > >> > Thanks.
> > > >> >
> > > >> > ~ Bhupesh
> > > >> >
> > > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > > >> bhupesh@datatorrent.com>
> > > >> > wrote:
> > > >> >
> > > >> > > Hi All,
> > > >> > >
> > > >> > > I am working on adding a De-duplication operator in Malhar
> library
> > > >> based
> > > >> > > on managed state APIs. I will be working off the already created
> > > JIRA
> > > >> -
> > > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the
> > > initial
> > > >> > > pull request for an AbstractDeduper here:
> > > >> > > https://github.com/apache/apex-malhar/pull/260/files
> > > >> > >
> > > >> > > I am planning to include the following features in the first
> > > version:
> > > >> > > 1. Time based de-duplication. Assumption: Tuple_Key ->
> Tuple_Time
> > > >> > > correlation holds.
> > > >> > > 2. Option to maintain order of incoming tuples.
> > > >> > > 3. Duplicate and Expired ports to emit duplicate and expired
> > tuples
> > > >> > > respectively.
> > > >> > >
> > > >> > > Thanks.
> > > >> > >
> > > >> > > ~ Bhupesh
> > > >> > >
> > > >> >
> > > >>
> > > >
> > > >
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Bhupesh Chawda <bh...@apache.org>.
Hi All,

I have created an initial Deduper implementation based on Managed State and
opened a PR to get feedback from the community. This is based on the
initial PR by @chandnisingh.

Please help review this PR: https://github.com/apache/apex-malhar/pull/335

Note that this is based entirely on the support provided by Managed State.
If this is acceptable to the community, then I can proceed to other
additive support like the partitioning ability and fault tolerance support
etc.

Any feedback is appreciated.

Thanks.
~ Bhupesh

On Thu, Jul 7, 2016 at 11:18 AM, Bhupesh Chawda <bh...@apache.org> wrote:

> Hi David,
>
> Thanks for your reply.
>
> If I am to use a windowed operator for the Dedup operator, there should be
> some operator (upstream to Deduper) which sends the watermark tuples. These
> tuples (along with allowed lateness), will be the ones deciding which
> incoming tuples are too late and will be dropped. I have the following
> questions:
>
> Is a windowed operator (which needs watermarks) dependent upon some other
> operator for these tuples? What happens when there are no watermark tuples
> sent from upstream?
>
> Can a windowed operator "*assume*" the watermark tuples based on some
> notion of time? For example, can the Deduper, use the streaming window time
> as the reference to advance the watermark?
>
> Thanks.
>
> ~ Bhupesh
>
> On Thu, Jul 7, 2016 at 4:07 AM, David Yan <da...@datatorrent.com> wrote:
>
>> Hi Bhupesh,
>>
>> FYI, there is a JIRA open for a scalable implementation of WindowedStorage
>> and WindowedKeyedStorage:
>>
>> https://issues.apache.org/jira/browse/APEXMALHAR-2130
>>
>> We expect either to use ManagedState directly, or Spillable structures,
>> which in turn uses ManagedState.
>>
>> I'm not very familiar with the dedup operator. but in order to use the
>> WindowedOperator, it sounds to me that we can use SlidingWindows with an
>> implementation of WindowedKeyedStorage that uses a Bloom filter to cover
>> most of the false cases.
>>
>> David
>>
>> On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <bh...@apache.org>
>> wrote:
>>
>> > Hi All,
>> >
>> > I have looked into Windowing concepts from Apache Beam and the PR #319
>> by
>> > David. Looks like there are a lot of advanced concepts which could be
>> used
>> > by operators using event time windowing.
>> > Additionally I also looked at the Managed State implementation.
>> >
>> > One of the things I noticed is that there is an overlap of functionality
>> > between Managed State and Windowing Support in terms of the following:
>> >
>> >    - *Discarding / Dropping of tuples* from the system - Managed State
>> uses
>> >    the concept of expiry while a Windowed operator uses the concepts of
>> >    Watermarks and allowed lateness. If I try to reconcile the above
>> two, it
>> >    seems like Managed State (through TimeBucketAssigner) is trying to
>> >    implement some sort of implicit heuristic Watermarks based on either
>> the
>> >    user supplied time or the event time.
>> >    - *Global Window* support - Once we have an option to disable
>> purging in
>> >    Managed State, it will have similar semantics to the Global Window
>> > option
>> >    in Windowing support.
>> >
>> > If I understand correctly, is the suggestion to implement the Dedup
>> > operator as a Windowed operator and to use managed state only as a
>> storage
>> > medium (through WindowedStorage) ? What could be a better way of going
>> > about this?
>> >
>> > Thanks.
>> >
>> > ~ Bhupesh
>> >
>> > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <bh...@apache.org>
>> > wrote:
>> >
>> > > Hi Thomas,
>> > >
>> > > I agree that the case of processing bounded data is a special case of
>> > > unbounded data.
>> > > Th difference I was pointing out was in terms of expiry. This is not
>> > > applicable in case of bounded data sets, while unbounded data sets
>> will
>> > > inherently use expiry for limiting the amount of data to be stored.
>> > >
>> > > For idempotency when applying expiry on the streaming data, I need to
>> > > explore more on the using the window timestamp that you proposed as
>> > opposed
>> > > to the system time which I was planning to use.
>> > >
>> > > Thanks.
>> > > ~ Bhupesh
>> > >
>> > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <thomas@datatorrent.com
>> >
>> > > wrote:
>> > >
>> > >> Bhupesh,
>> > >>
>> > >> Why is there a distinction between bounded and unbounded data? I see
>> the
>> > >> former as a special case of the latter?
>> > >>
>> > >> When rewinding the stream or reprocessing the stream in another run
>> the
>> > >> operator should produce the same result.
>> > >>
>> > >> This operator should be idempotent also. That implies that code does
>> not
>> > >> rely on current system time but the window timestamp instead.
>> > >>
>> > >> All of this should be accomplished by using the windowing support:
>> > >> https://github.com/apache/apex-malhar/pull/319
>> > >>
>> > >> Thanks,
>> > >> Thomas
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >>
>> > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
>> > bhupesh@datatorrent.com>
>> > >> wrote:
>> > >>
>> > >> > Hi All,
>> > >> >
>> > >> > I want to validate the use cases for de-duplication that will be
>> going
>> > >> as
>> > >> > part of this implementation.
>> > >> >
>> > >> >    - *Bounded data set*
>> > >> >       - This is de-duplication for bounded data. For example, data
>> > sets
>> > >> >       which are old or fixed or which may not have a time field at
>> > >> > all. Example:
>> > >> >       Last year's transaction records or Customer data etc.
>> > >> >       - Concept of expiry is not needed as this is bounded data
>> set.
>> > >> >       - *Unbounded data set*
>> > >> >       - This is de-duplication of online streaming data
>> > >> >       - Expiry is needed because here incoming tuples may arrive
>> later
>> > >> than
>> > >> >       what they are expected. Expiry is always computed by taking
>> the
>> > >> > difference
>> > >> >       in System time and the Event time.
>> > >> >
>> > >> > Any feedback is appreciated.
>> > >> >
>> > >> > Thanks.
>> > >> >
>> > >> > ~ Bhupesh
>> > >> >
>> > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
>> > >> bhupesh@datatorrent.com>
>> > >> > wrote:
>> > >> >
>> > >> > > Hi All,
>> > >> > >
>> > >> > > I am working on adding a De-duplication operator in Malhar
>> library
>> > >> based
>> > >> > > on managed state APIs. I will be working off the already created
>> > JIRA
>> > >> -
>> > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the
>> > initial
>> > >> > > pull request for an AbstractDeduper here:
>> > >> > > https://github.com/apache/apex-malhar/pull/260/files
>> > >> > >
>> > >> > > I am planning to include the following features in the first
>> > version:
>> > >> > > 1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time
>> > >> > > correlation holds.
>> > >> > > 2. Option to maintain order of incoming tuples.
>> > >> > > 3. Duplicate and Expired ports to emit duplicate and expired
>> tuples
>> > >> > > respectively.
>> > >> > >
>> > >> > > Thanks.
>> > >> > >
>> > >> > > ~ Bhupesh
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Bhupesh Chawda <bh...@apache.org>.
Hi David,

Thanks for your reply.

If I am to use a windowed operator for the Dedup operator, there should be
some operator (upstream to Deduper) which sends the watermark tuples. These
tuples (along with allowed lateness), will be the ones deciding which
incoming tuples are too late and will be dropped. I have the following
questions:

Is a windowed operator (which needs watermarks) dependent upon some other
operator for these tuples? What happens when there are no watermark tuples
sent from upstream?

Can a windowed operator "*assume*" the watermark tuples based on some
notion of time? For example, can the Deduper, use the streaming window time
as the reference to advance the watermark?

Thanks.

~ Bhupesh

On Thu, Jul 7, 2016 at 4:07 AM, David Yan <da...@datatorrent.com> wrote:

> Hi Bhupesh,
>
> FYI, there is a JIRA open for a scalable implementation of WindowedStorage
> and WindowedKeyedStorage:
>
> https://issues.apache.org/jira/browse/APEXMALHAR-2130
>
> We expect either to use ManagedState directly, or Spillable structures,
> which in turn uses ManagedState.
>
> I'm not very familiar with the dedup operator. but in order to use the
> WindowedOperator, it sounds to me that we can use SlidingWindows with an
> implementation of WindowedKeyedStorage that uses a Bloom filter to cover
> most of the false cases.
>
> David
>
> On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <bh...@apache.org> wrote:
>
> > Hi All,
> >
> > I have looked into Windowing concepts from Apache Beam and the PR #319 by
> > David. Looks like there are a lot of advanced concepts which could be
> used
> > by operators using event time windowing.
> > Additionally I also looked at the Managed State implementation.
> >
> > One of the things I noticed is that there is an overlap of functionality
> > between Managed State and Windowing Support in terms of the following:
> >
> >    - *Discarding / Dropping of tuples* from the system - Managed State
> uses
> >    the concept of expiry while a Windowed operator uses the concepts of
> >    Watermarks and allowed lateness. If I try to reconcile the above two,
> it
> >    seems like Managed State (through TimeBucketAssigner) is trying to
> >    implement some sort of implicit heuristic Watermarks based on either
> the
> >    user supplied time or the event time.
> >    - *Global Window* support - Once we have an option to disable purging
> in
> >    Managed State, it will have similar semantics to the Global Window
> > option
> >    in Windowing support.
> >
> > If I understand correctly, is the suggestion to implement the Dedup
> > operator as a Windowed operator and to use managed state only as a
> storage
> > medium (through WindowedStorage) ? What could be a better way of going
> > about this?
> >
> > Thanks.
> >
> > ~ Bhupesh
> >
> > On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <bh...@apache.org>
> > wrote:
> >
> > > Hi Thomas,
> > >
> > > I agree that the case of processing bounded data is a special case of
> > > unbounded data.
> > > Th difference I was pointing out was in terms of expiry. This is not
> > > applicable in case of bounded data sets, while unbounded data sets will
> > > inherently use expiry for limiting the amount of data to be stored.
> > >
> > > For idempotency when applying expiry on the streaming data, I need to
> > > explore more on the using the window timestamp that you proposed as
> > opposed
> > > to the system time which I was planning to use.
> > >
> > > Thanks.
> > > ~ Bhupesh
> > >
> > > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <th...@datatorrent.com>
> > > wrote:
> > >
> > >> Bhupesh,
> > >>
> > >> Why is there a distinction between bounded and unbounded data? I see
> the
> > >> former as a special case of the latter?
> > >>
> > >> When rewinding the stream or reprocessing the stream in another run
> the
> > >> operator should produce the same result.
> > >>
> > >> This operator should be idempotent also. That implies that code does
> not
> > >> rely on current system time but the window timestamp instead.
> > >>
> > >> All of this should be accomplished by using the windowing support:
> > >> https://github.com/apache/apex-malhar/pull/319
> > >>
> > >> Thanks,
> > >> Thomas
> > >>
> > >>
> > >>
> > >>
> > >>
> > >>
> > >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> > bhupesh@datatorrent.com>
> > >> wrote:
> > >>
> > >> > Hi All,
> > >> >
> > >> > I want to validate the use cases for de-duplication that will be
> going
> > >> as
> > >> > part of this implementation.
> > >> >
> > >> >    - *Bounded data set*
> > >> >       - This is de-duplication for bounded data. For example, data
> > sets
> > >> >       which are old or fixed or which may not have a time field at
> > >> > all. Example:
> > >> >       Last year's transaction records or Customer data etc.
> > >> >       - Concept of expiry is not needed as this is bounded data set.
> > >> >       - *Unbounded data set*
> > >> >       - This is de-duplication of online streaming data
> > >> >       - Expiry is needed because here incoming tuples may arrive
> later
> > >> than
> > >> >       what they are expected. Expiry is always computed by taking
> the
> > >> > difference
> > >> >       in System time and the Event time.
> > >> >
> > >> > Any feedback is appreciated.
> > >> >
> > >> > Thanks.
> > >> >
> > >> > ~ Bhupesh
> > >> >
> > >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> > >> bhupesh@datatorrent.com>
> > >> > wrote:
> > >> >
> > >> > > Hi All,
> > >> > >
> > >> > > I am working on adding a De-duplication operator in Malhar library
> > >> based
> > >> > > on managed state APIs. I will be working off the already created
> > JIRA
> > >> -
> > >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the
> > initial
> > >> > > pull request for an AbstractDeduper here:
> > >> > > https://github.com/apache/apex-malhar/pull/260/files
> > >> > >
> > >> > > I am planning to include the following features in the first
> > version:
> > >> > > 1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time
> > >> > > correlation holds.
> > >> > > 2. Option to maintain order of incoming tuples.
> > >> > > 3. Duplicate and Expired ports to emit duplicate and expired
> tuples
> > >> > > respectively.
> > >> > >
> > >> > > Thanks.
> > >> > >
> > >> > > ~ Bhupesh
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by David Yan <da...@datatorrent.com>.
Hi Bhupesh,

FYI, there is a JIRA open for a scalable implementation of WindowedStorage
and WindowedKeyedStorage:

https://issues.apache.org/jira/browse/APEXMALHAR-2130

We expect either to use ManagedState directly, or Spillable structures,
which in turn uses ManagedState.

I'm not very familiar with the dedup operator. but in order to use the
WindowedOperator, it sounds to me that we can use SlidingWindows with an
implementation of WindowedKeyedStorage that uses a Bloom filter to cover
most of the false cases.

David

On Mon, Jul 4, 2016 at 4:42 AM, Bhupesh Chawda <bh...@apache.org> wrote:

> Hi All,
>
> I have looked into Windowing concepts from Apache Beam and the PR #319 by
> David. Looks like there are a lot of advanced concepts which could be used
> by operators using event time windowing.
> Additionally I also looked at the Managed State implementation.
>
> One of the things I noticed is that there is an overlap of functionality
> between Managed State and Windowing Support in terms of the following:
>
>    - *Discarding / Dropping of tuples* from the system - Managed State uses
>    the concept of expiry while a Windowed operator uses the concepts of
>    Watermarks and allowed lateness. If I try to reconcile the above two, it
>    seems like Managed State (through TimeBucketAssigner) is trying to
>    implement some sort of implicit heuristic Watermarks based on either the
>    user supplied time or the event time.
>    - *Global Window* support - Once we have an option to disable purging in
>    Managed State, it will have similar semantics to the Global Window
> option
>    in Windowing support.
>
> If I understand correctly, is the suggestion to implement the Dedup
> operator as a Windowed operator and to use managed state only as a storage
> medium (through WindowedStorage) ? What could be a better way of going
> about this?
>
> Thanks.
>
> ~ Bhupesh
>
> On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <bh...@apache.org>
> wrote:
>
> > Hi Thomas,
> >
> > I agree that the case of processing bounded data is a special case of
> > unbounded data.
> > Th difference I was pointing out was in terms of expiry. This is not
> > applicable in case of bounded data sets, while unbounded data sets will
> > inherently use expiry for limiting the amount of data to be stored.
> >
> > For idempotency when applying expiry on the streaming data, I need to
> > explore more on the using the window timestamp that you proposed as
> opposed
> > to the system time which I was planning to use.
> >
> > Thanks.
> > ~ Bhupesh
> >
> > On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <th...@datatorrent.com>
> > wrote:
> >
> >> Bhupesh,
> >>
> >> Why is there a distinction between bounded and unbounded data? I see the
> >> former as a special case of the latter?
> >>
> >> When rewinding the stream or reprocessing the stream in another run the
> >> operator should produce the same result.
> >>
> >> This operator should be idempotent also. That implies that code does not
> >> rely on current system time but the window timestamp instead.
> >>
> >> All of this should be accomplished by using the windowing support:
> >> https://github.com/apache/apex-malhar/pull/319
> >>
> >> Thanks,
> >> Thomas
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <
> bhupesh@datatorrent.com>
> >> wrote:
> >>
> >> > Hi All,
> >> >
> >> > I want to validate the use cases for de-duplication that will be going
> >> as
> >> > part of this implementation.
> >> >
> >> >    - *Bounded data set*
> >> >       - This is de-duplication for bounded data. For example, data
> sets
> >> >       which are old or fixed or which may not have a time field at
> >> > all. Example:
> >> >       Last year's transaction records or Customer data etc.
> >> >       - Concept of expiry is not needed as this is bounded data set.
> >> >       - *Unbounded data set*
> >> >       - This is de-duplication of online streaming data
> >> >       - Expiry is needed because here incoming tuples may arrive later
> >> than
> >> >       what they are expected. Expiry is always computed by taking the
> >> > difference
> >> >       in System time and the Event time.
> >> >
> >> > Any feedback is appreciated.
> >> >
> >> > Thanks.
> >> >
> >> > ~ Bhupesh
> >> >
> >> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> >> bhupesh@datatorrent.com>
> >> > wrote:
> >> >
> >> > > Hi All,
> >> > >
> >> > > I am working on adding a De-duplication operator in Malhar library
> >> based
> >> > > on managed state APIs. I will be working off the already created
> JIRA
> >> -
> >> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the
> initial
> >> > > pull request for an AbstractDeduper here:
> >> > > https://github.com/apache/apex-malhar/pull/260/files
> >> > >
> >> > > I am planning to include the following features in the first
> version:
> >> > > 1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time
> >> > > correlation holds.
> >> > > 2. Option to maintain order of incoming tuples.
> >> > > 3. Duplicate and Expired ports to emit duplicate and expired tuples
> >> > > respectively.
> >> > >
> >> > > Thanks.
> >> > >
> >> > > ~ Bhupesh
> >> > >
> >> >
> >>
> >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Bhupesh Chawda <bh...@apache.org>.
Hi All,

I have looked into Windowing concepts from Apache Beam and the PR #319 by
David. Looks like there are a lot of advanced concepts which could be used
by operators using event time windowing.
Additionally I also looked at the Managed State implementation.

One of the things I noticed is that there is an overlap of functionality
between Managed State and Windowing Support in terms of the following:

   - *Discarding / Dropping of tuples* from the system - Managed State uses
   the concept of expiry while a Windowed operator uses the concepts of
   Watermarks and allowed lateness. If I try to reconcile the above two, it
   seems like Managed State (through TimeBucketAssigner) is trying to
   implement some sort of implicit heuristic Watermarks based on either the
   user supplied time or the event time.
   - *Global Window* support - Once we have an option to disable purging in
   Managed State, it will have similar semantics to the Global Window option
   in Windowing support.

If I understand correctly, is the suggestion to implement the Dedup
operator as a Windowed operator and to use managed state only as a storage
medium (through WindowedStorage) ? What could be a better way of going
about this?

Thanks.

~ Bhupesh

On Wed, Jun 29, 2016 at 10:35 PM, Bhupesh Chawda <bh...@apache.org> wrote:

> Hi Thomas,
>
> I agree that the case of processing bounded data is a special case of
> unbounded data.
> Th difference I was pointing out was in terms of expiry. This is not
> applicable in case of bounded data sets, while unbounded data sets will
> inherently use expiry for limiting the amount of data to be stored.
>
> For idempotency when applying expiry on the streaming data, I need to
> explore more on the using the window timestamp that you proposed as opposed
> to the system time which I was planning to use.
>
> Thanks.
> ~ Bhupesh
>
> On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <th...@datatorrent.com>
> wrote:
>
>> Bhupesh,
>>
>> Why is there a distinction between bounded and unbounded data? I see the
>> former as a special case of the latter?
>>
>> When rewinding the stream or reprocessing the stream in another run the
>> operator should produce the same result.
>>
>> This operator should be idempotent also. That implies that code does not
>> rely on current system time but the window timestamp instead.
>>
>> All of this should be accomplished by using the windowing support:
>> https://github.com/apache/apex-malhar/pull/319
>>
>> Thanks,
>> Thomas
>>
>>
>>
>>
>>
>>
>> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <bh...@datatorrent.com>
>> wrote:
>>
>> > Hi All,
>> >
>> > I want to validate the use cases for de-duplication that will be going
>> as
>> > part of this implementation.
>> >
>> >    - *Bounded data set*
>> >       - This is de-duplication for bounded data. For example, data sets
>> >       which are old or fixed or which may not have a time field at
>> > all. Example:
>> >       Last year's transaction records or Customer data etc.
>> >       - Concept of expiry is not needed as this is bounded data set.
>> >       - *Unbounded data set*
>> >       - This is de-duplication of online streaming data
>> >       - Expiry is needed because here incoming tuples may arrive later
>> than
>> >       what they are expected. Expiry is always computed by taking the
>> > difference
>> >       in System time and the Event time.
>> >
>> > Any feedback is appreciated.
>> >
>> > Thanks.
>> >
>> > ~ Bhupesh
>> >
>> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
>> bhupesh@datatorrent.com>
>> > wrote:
>> >
>> > > Hi All,
>> > >
>> > > I am working on adding a De-duplication operator in Malhar library
>> based
>> > > on managed state APIs. I will be working off the already created JIRA
>> -
>> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the initial
>> > > pull request for an AbstractDeduper here:
>> > > https://github.com/apache/apex-malhar/pull/260/files
>> > >
>> > > I am planning to include the following features in the first version:
>> > > 1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time
>> > > correlation holds.
>> > > 2. Option to maintain order of incoming tuples.
>> > > 3. Duplicate and Expired ports to emit duplicate and expired tuples
>> > > respectively.
>> > >
>> > > Thanks.
>> > >
>> > > ~ Bhupesh
>> > >
>> >
>>
>
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Bhupesh Chawda <bh...@apache.org>.
Hi Thomas,

I agree that the case of processing bounded data is a special case of
unbounded data.
Th difference I was pointing out was in terms of expiry. This is not
applicable in case of bounded data sets, while unbounded data sets will
inherently use expiry for limiting the amount of data to be stored.

For idempotency when applying expiry on the streaming data, I need to
explore more on the using the window timestamp that you proposed as opposed
to the system time which I was planning to use.

Thanks.
~ Bhupesh

On Wed, Jun 29, 2016 at 8:39 PM, Thomas Weise <th...@datatorrent.com>
wrote:

> Bhupesh,
>
> Why is there a distinction between bounded and unbounded data? I see the
> former as a special case of the latter?
>
> When rewinding the stream or reprocessing the stream in another run the
> operator should produce the same result.
>
> This operator should be idempotent also. That implies that code does not
> rely on current system time but the window timestamp instead.
>
> All of this should be accomplished by using the windowing support:
> https://github.com/apache/apex-malhar/pull/319
>
> Thanks,
> Thomas
>
>
>
>
>
>
> On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <bh...@datatorrent.com>
> wrote:
>
> > Hi All,
> >
> > I want to validate the use cases for de-duplication that will be going as
> > part of this implementation.
> >
> >    - *Bounded data set*
> >       - This is de-duplication for bounded data. For example, data sets
> >       which are old or fixed or which may not have a time field at
> > all. Example:
> >       Last year's transaction records or Customer data etc.
> >       - Concept of expiry is not needed as this is bounded data set.
> >       - *Unbounded data set*
> >       - This is de-duplication of online streaming data
> >       - Expiry is needed because here incoming tuples may arrive later
> than
> >       what they are expected. Expiry is always computed by taking the
> > difference
> >       in System time and the Event time.
> >
> > Any feedback is appreciated.
> >
> > Thanks.
> >
> > ~ Bhupesh
> >
> > On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <
> bhupesh@datatorrent.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > I am working on adding a De-duplication operator in Malhar library
> based
> > > on managed state APIs. I will be working off the already created JIRA -
> > > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the initial
> > > pull request for an AbstractDeduper here:
> > > https://github.com/apache/apex-malhar/pull/260/files
> > >
> > > I am planning to include the following features in the first version:
> > > 1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time
> > > correlation holds.
> > > 2. Option to maintain order of incoming tuples.
> > > 3. Duplicate and Expired ports to emit duplicate and expired tuples
> > > respectively.
> > >
> > > Thanks.
> > >
> > > ~ Bhupesh
> > >
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Thomas Weise <th...@datatorrent.com>.
Bhupesh,

Why is there a distinction between bounded and unbounded data? I see the
former as a special case of the latter?

When rewinding the stream or reprocessing the stream in another run the
operator should produce the same result.

This operator should be idempotent also. That implies that code does not
rely on current system time but the window timestamp instead.

All of this should be accomplished by using the windowing support:
https://github.com/apache/apex-malhar/pull/319

Thanks,
Thomas






On Wed, Jun 29, 2016 at 4:32 AM, Bhupesh Chawda <bh...@datatorrent.com>
wrote:

> Hi All,
>
> I want to validate the use cases for de-duplication that will be going as
> part of this implementation.
>
>    - *Bounded data set*
>       - This is de-duplication for bounded data. For example, data sets
>       which are old or fixed or which may not have a time field at
> all. Example:
>       Last year's transaction records or Customer data etc.
>       - Concept of expiry is not needed as this is bounded data set.
>       - *Unbounded data set*
>       - This is de-duplication of online streaming data
>       - Expiry is needed because here incoming tuples may arrive later than
>       what they are expected. Expiry is always computed by taking the
> difference
>       in System time and the Event time.
>
> Any feedback is appreciated.
>
> Thanks.
>
> ~ Bhupesh
>
> On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <bh...@datatorrent.com>
> wrote:
>
> > Hi All,
> >
> > I am working on adding a De-duplication operator in Malhar library based
> > on managed state APIs. I will be working off the already created JIRA -
> > https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the initial
> > pull request for an AbstractDeduper here:
> > https://github.com/apache/apex-malhar/pull/260/files
> >
> > I am planning to include the following features in the first version:
> > 1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time
> > correlation holds.
> > 2. Option to maintain order of incoming tuples.
> > 3. Duplicate and Expired ports to emit duplicate and expired tuples
> > respectively.
> >
> > Thanks.
> >
> > ~ Bhupesh
> >
>

Re: APEXMALHAR-1701 Deduper in Malhar

Posted by Bhupesh Chawda <bh...@datatorrent.com>.
Hi All,

I want to validate the use cases for de-duplication that will be going as
part of this implementation.

   - *Bounded data set*
      - This is de-duplication for bounded data. For example, data sets
      which are old or fixed or which may not have a time field at
all. Example:
      Last year's transaction records or Customer data etc.
      - Concept of expiry is not needed as this is bounded data set.
      - *Unbounded data set*
      - This is de-duplication of online streaming data
      - Expiry is needed because here incoming tuples may arrive later than
      what they are expected. Expiry is always computed by taking the
difference
      in System time and the Event time.

Any feedback is appreciated.

Thanks.

~ Bhupesh

On Mon, Jun 27, 2016 at 11:34 AM, Bhupesh Chawda <bh...@datatorrent.com>
wrote:

> Hi All,
>
> I am working on adding a De-duplication operator in Malhar library based
> on managed state APIs. I will be working off the already created JIRA -
> https://issues.apache.org/jira/browse/APEXMALHAR-1701 and the initial
> pull request for an AbstractDeduper here:
> https://github.com/apache/apex-malhar/pull/260/files
>
> I am planning to include the following features in the first version:
> 1. Time based de-duplication. Assumption: Tuple_Key -> Tuple_Time
> correlation holds.
> 2. Option to maintain order of incoming tuples.
> 3. Duplicate and Expired ports to emit duplicate and expired tuples
> respectively.
>
> Thanks.
>
> ~ Bhupesh
>