You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by "Chandni Singh (JIRA)" <ji...@apache.org> on 2016/08/30 03:04:20 UTC

[jira] [Comment Edited] (APEXMALHAR-2130) implement scalable windowed storage

    [ https://issues.apache.org/jira/browse/APEXMALHAR-2130?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15447840#comment-15447840 ] 

Chandni Singh edited comment on APEXMALHAR-2130 at 8/30/16 3:04 AM:
--------------------------------------------------------------------

Note: The main change in ManagedState which is required here is that timeBuckets (Window time in your example) is now computed outside ManagedState. TimeBuckets were being computed by TimeBucketAssigner within ManagedState but now it will be provided to it.

>>>>
Since event time is arbitrary, unlike processing time, the actual key representing the timebucket cannot be assumed a natural sequence. However, TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that is sequential starting from 0. We want to make the actual timebucket key based on the actual event window timestamp. Chandni Singh Will this break anything?

Answer: No it will not break anything. The time here is event time and this does NOT assume that events are received in order. Based on event time, this method creates timebucket. In your use case, the time bucket is computed outside ManagedState so there are 2 ways to approach it:
 - create a special TimeBucketAssigner which will just return the input Window for the event. It will not further compute timebucket.
 - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If it is null, then the time argument is used as timebucket save in Bucket.

>>>
Expiring and purging are done very differently and should be based on 3. Managed State should determine whether to purge a timebucket based on whether an Apex window is committed and whether all event windows that belong to that timebucket are marked "deleted" for that Apex window.

Answer: This is handled by TimeBucketAssigner again. I don't think much change is needed here. TimeBucketAssigner computes a timeBucket (in your case, this corresponds to Window time) and checks if the oldest buckets need to be purged (line 132 - 133). It figures out the lowest purgeable timebucket. In the endWindow, it informs IncrementalCheckpointManager, that it can delete all the timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager deletes the data up to that timebucket only when the window in which it was request to be purged gets committed. So this will remain the same for you as well.

I think this can also by achieved by creating a special TimeBucketAssigner and overriding a few methods.



was (Author: csingh):
Note: The main change in ManagedState which is required here is that timeBuckets (Window time in your example) is now computed outside ManagedState. TimeBuckets were being computed by TimeBucketAssigner within ManagedState but now it will be provided to it.

Since event time is arbitrary, unlike processing time, the actual key representing the timebucket cannot be assumed a natural sequence. However, TimeBucketAssigner.getTimeBucketAndAdjustBoundaries seems to return a long that is sequential starting from 0. We want to make the actual timebucket key based on the actual event window timestamp. Chandni Singh Will this break anything?

Answer: No it will not break anything. The time here is event time and this does NOT assume that events are received in order. Based on event time, this method creates timebucket. In your use case, the time bucket is computed outside ManagedState so there are 2 ways to approach it:
 - create a special TimeBucketAssigner which will just return the input Window for the event. It will not further compute timebucket.
 - make TimeBucketAssigner an optional property in AbstractManagedStateImpl. If it is null, then the time argument is used as timebucket save in Bucket.

Expiring and purging are done very differently and should be based on 3. Managed State should determine whether to purge a timebucket based on whether an Apex window is committed and whether all event windows that belong to that timebucket are marked "deleted" for that Apex window.

Answer: This is handled by TimeBucketAssigner again. I don't think much change is needed here. TimeBucketAssigner computes a timeBucket (in your case, this corresponds to Window time) and checks if the oldest buckets need to be purged (line 132 - 133). It figures out the lowest purgeable timebucket. In the endWindow, it informs IncrementalCheckpointManager, that it can delete all the timebuckets<=lowestPurgeableTimeBucket. However, IncrementalCheckpointManager deletes the data up to that timebucket only when the window in which it was request to be purged gets committed. So this will remain the same for you as well.

I think this can also by achieved by creating a special TimeBucketAssigner and overriding a few methods.


> implement scalable windowed storage
> -----------------------------------
>
>                 Key: APEXMALHAR-2130
>                 URL: https://issues.apache.org/jira/browse/APEXMALHAR-2130
>             Project: Apache Apex Malhar
>          Issue Type: Task
>            Reporter: bright chen
>            Assignee: David Yan
>
> This feature is used for supporting windowing.
> The storage needs to have the following features:
> 1. Spillable key value storage (integrate with APEXMALHAR-2026)
> 2. Upon checkpoint, it saves a snapshot for the entire data set with the checkpointing window id.  This should be done incrementally (ManagedState) to avoid wasting space with unchanged data
> 3. When recovering, it takes the recovery window id and restores to that snapshot
> 4. When a window is committed, all windows with a lower ID should be purged from the store.
> 5. It should implement the WindowedStorage and WindowedKeyedStorage interfaces, and because of 2 and 3, we may want to add methods to the WindowedStorage interface so that the implementation of WindowedOperator can notify the storage of checkpointing, recovering and committing of a window.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)