You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by davidyan74 <gi...@git.apache.org> on 2016/07/15 17:50:38 UTC

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

GitHub user davidyan74 opened a pull request:

    https://github.com/apache/apex-malhar/pull/345

    REVIEW ONLY (WindowedOperator): splitting WindowedStorage in preparation of incorporating Spillable data structures

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davidyan74/apex-malhar windowedSpillable

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/apex-malhar/pull/345.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #345
    
----
commit c2c3f0acfcdf033a0e3044967ab3f8048f719259
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-06-05T00:11:20Z

    - Intermediate commit.

commit 1bee1ed0308470ff35a739ab6f9e94d53debddb8
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-06-13T06:03:21Z

    Intermediate commit

commit 60acf68b96f2145af3d90b410c6b20613347f881
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-06-21T06:58:09Z

    Intermediate commit

commit 5b9ca5e7e1e8139cc5c8ab6dd96b8a60553e38fc
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-13T18:57:57Z

    Merge branch 'APEXMALHAR-2048_pull' of github.com:ilooner/incubator-apex-malhar

commit a49ca7279536bd236161a4aba9f298e075d08db0
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-14T00:04:20Z

    Split WindowedStorage interface into two interfaces for plain and key data

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: APEXMALHAR-2130 REVIEW ONLY (WindowedOperator...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 closed the pull request at:

    https://github.com/apache/apex-malhar/pull/345


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71096391
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,136 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.annotation.concurrent.Immutable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private final ManagedStateSpillableStateStore store;
    +
    +  protected final SpillableByteMapImpl<ImmutablePair<Window, K>, V> internValues;
    +  protected final SpillableByteArrayListMultimapImpl<Window, K> internKeys;
    +
    +  private SpillableWindowedKeyedStorage()
    +  {
    +    // for kryo
    +    store = null;
    +    internValues = null;
    +    internKeys = null;
    +  }
    +
    +  public SpillableWindowedKeyedStorage(long bucket, byte[] identifier, Serde<Window, Slice> serdeWindow, Serde<K, Slice> serdeKey, Serde<ImmutablePair<Window, K>, Slice> serdeWindowKey, Serde<V, Slice> serdeValue)
    +  {
    +    store = new ManagedStateSpillableStateStore();
    +    store.getCheckpointManager().setNeedBucketFile(false);
    +    internValues = new SpillableByteMapImpl<>(store, identifier, bucket, serdeWindowKey, serdeValue);
    +    internKeys = new SpillableByteArrayListMultimapImpl<>(store, identifier, bucket, serdeWindow, serdeKey);
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internKeys.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internKeys.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    List<K> keys = internKeys.get(window);
    +    for (K key : keys) {
    +      internValues.remove(new ImmutablePair<>(window, key));
    +    }
    +    internKeys.removeAll(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    List<K> keys = internKeys.get(fromWindow);
    +    internValues.remove(toWindow);
    +    for (K key : keys) {
    +      internKeys.put(toWindow, key);
    +      ImmutablePair<Window, K> oldKey = new ImmutablePair<>(fromWindow, key);
    +      ImmutablePair<Window, K> newKey = new ImmutablePair<>(toWindow, key);
    +
    +      V value = internValues.get(oldKey);
    +      internValues.remove(oldKey);
    +      internValues.put(newKey, value);
    +    }
    +    internKeys.removeAll(fromWindow);
    +  }
    +
    --- End diff --
    
    setup would look like this
    
    ```
    setup(OperatorContext context) {
      internalValues = spillableComplexComponent.newSpillableByteMap(bucket, serdeWindowKey, serdeValue)
      internKeys = spillableComplexComponent.newSpillableByteArrayListMultimap(bucket, serdeWindow, serdeKey)
      spillableComplexComponent.setup(context)
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71095316
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,136 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.annotation.concurrent.Immutable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private final ManagedStateSpillableStateStore store;
    +
    +  protected final SpillableByteMapImpl<ImmutablePair<Window, K>, V> internValues;
    +  protected final SpillableByteArrayListMultimapImpl<Window, K> internKeys;
    +
    +  private SpillableWindowedKeyedStorage()
    +  {
    +    // for kryo
    +    store = null;
    +    internValues = null;
    +    internKeys = null;
    +  }
    +
    +  public SpillableWindowedKeyedStorage(long bucket, byte[] identifier, Serde<Window, Slice> serdeWindow, Serde<K, Slice> serdeKey, Serde<ImmutablePair<Window, K>, Slice> serdeWindowKey, Serde<V, Slice> serdeValue)
    +  {
    +    store = new ManagedStateSpillableStateStore();
    +    store.getCheckpointManager().setNeedBucketFile(false);
    +    internValues = new SpillableByteMapImpl<>(store, identifier, bucket, serdeWindowKey, serdeValue);
    +    internKeys = new SpillableByteArrayListMultimapImpl<>(store, identifier, bucket, serdeWindow, serdeKey);
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internKeys.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internKeys.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    List<K> keys = internKeys.get(window);
    +    for (K key : keys) {
    +      internValues.remove(new ImmutablePair<>(window, key));
    +    }
    +    internKeys.removeAll(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    List<K> keys = internKeys.get(fromWindow);
    +    internValues.remove(toWindow);
    +    for (K key : keys) {
    +      internKeys.put(toWindow, key);
    +      ImmutablePair<Window, K> oldKey = new ImmutablePair<>(fromWindow, key);
    +      ImmutablePair<Window, K> newKey = new ImmutablePair<>(toWindow, key);
    +
    +      V value = internValues.get(oldKey);
    +      internValues.remove(oldKey);
    +      internValues.put(newKey, value);
    +    }
    +    internKeys.removeAll(fromWindow);
    +  }
    +
    --- End diff --
    
    I would add setup and teardown callbacks here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: APEXMALHAR-2130 REVIEW ONLY (WindowedOperator...

Posted by davidyan74 <gi...@git.apache.org>.
GitHub user davidyan74 reopened a pull request:

    https://github.com/apache/apex-malhar/pull/345

    APEXMALHAR-2130 REVIEW ONLY (WindowedOperator): incorporating Spillable data structures

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davidyan74/apex-malhar windowedSpillable

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/apex-malhar/pull/345.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #345
    
----
commit c2c3f0acfcdf033a0e3044967ab3f8048f719259
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-06-05T00:11:20Z

    - Intermediate commit.

commit 1bee1ed0308470ff35a739ab6f9e94d53debddb8
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-06-13T06:03:21Z

    Intermediate commit

commit 60acf68b96f2145af3d90b410c6b20613347f881
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-06-21T06:58:09Z

    Intermediate commit

commit 5b9ca5e7e1e8139cc5c8ab6dd96b8a60553e38fc
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-13T18:57:57Z

    Merge branch 'APEXMALHAR-2048_pull' of github.com:ilooner/incubator-apex-malhar

commit b72ee18e6f7cf0a9d30b195439912d96acafb3b4
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-17T21:32:34Z

    Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap

commit 184653a23f662f78e7e6a7e1d53ffb0efbdb7127
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-18T01:56:41Z

    Added SpillableComplexComponentImpl

commit 9f17b4ba9233e3f46746505941a378236193f719
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-18T03:29:07Z

    Added propagating callbacks to store

commit 9e637a899e6aedb4ca6495a676654bb636616267
Author: devtagare <de...@gmail.com>
Date:   2016-05-18T22:25:56Z

    APEXMALHAR-2066 JdbcPolling,idempotent,partitionable

commit 10fe7a14c7294c54c5d18e4d0e94882778266ac6
Author: sandeshh <sa...@gmail.com>
Date:   2016-05-25T15:56:56Z

    Kafka 0.9.0 output operators and unit tests.
    
    1. Abstract Base class
    2. Kafka Output operator
    3. Exactly Once output operator
         Key in the Kafka message is used by the operator to track the tuples written by it.

commit 8df0de73f58c69ac457e5a28d51a7b56b5549859
Author: Chaitanya <ch...@datatorrent.com>
Date:   2016-07-13T10:16:13Z

    APEXMALHAR-2019 Implemented S3 Input Module

commit 2ed0a102d434ace34978c6e940e04c998133fd1e
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-18T01:56:41Z

    Added SpillableComplexComponentImpl

commit c60e76eee8ae5cf034d4d9984b840d3219f52594
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-18T03:29:07Z

    Added propagating callbacks to store

commit 452db2feee7cc6e186df3faf384147b41eef40ea
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-14T00:04:20Z

    Split WindowedStorage interface into two interfaces for plain and key data

commit be96f667238cce47957324eac9979ebafc013fc6
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-15T21:40:59Z

    check null for retractionStorage

commit b7471a5df5e883b23b5a6051b974d3521ce46151
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-15T21:48:13Z

    Removed unused imports

commit 361d0e152f4709dd8fda864ce94727161807a98e
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-16T01:00:20Z

    added first draft implementation for spillable data structures

commit ad65865ed9e5ba25d44aacaf76372e2338ef6377
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-18T21:04:24Z

    Implemented some of Tim's suggestion

commit 32c2660409b3c85872002aba6b84c5c5e748f203
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-20T01:17:43Z

    moved storage initialization to setup

commit 955f2d6c66be26f8f205a743eb9c49d4e8d2bdd6
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-20T17:50:05Z

    removed identifier as per tim's suggestion

commit ec4e7508e2fd70030ecab256bba32be4cad8ad3a
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-21T00:05:51Z

    choose a bucket automatically and added entrySet implementation for SpillableWindowedKeyedStorage

commit fe41f0c20235aac3ca57facc2491ecfba11d20a7
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-21T06:38:48Z

    Added checkpoint callbacks to spillable complex components
    Added some half completed tests

commit dfc84a7847b4e71ec12ca8e8e1e0127732d880c0
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-21T18:07:47Z

    intermediate commit

commit fe209fb72e169ae9605c826e61adfabda670c74d
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-21T18:08:15Z

    Merge branch 'APEXMALHAR-2048_pull' of github.com:ilooner/incubator-apex-malhar into windowedSpillable

commit 11fb1ddd24705ed215ecbe4b9da72a39a3e8a9e1
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-21T19:59:24Z

    intermediate commit

commit e5cacbbc9e1ae18f5cf0938f13377f75a3a99cd2
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T04:09:54Z

    Finished unit test for SpillableArrayListMultimap

commit dc258b8900688264f349307737b59b096dbc3d2b
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T05:19:37Z

    Added unit test which uses managed state

commit ed9924b810a76c39404701da81f4753ab68af5a5
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T06:55:18Z

    Finished adding managed state tests for SpillableByteMap

commit 43da17d9633dc2405b596b25697b6b4b0baef69f
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T16:31:53Z

    Added ManagedStateTests For SpillableArrayList

commit 1343ccf4ccc5099d7abcf7f99ab2ed648baeef08
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T16:49:00Z

    Added managed state tests for SpillableArrayListMultimap

commit 57c4e5e3c3e019613f31753be5052c32ba762e53
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T16:57:35Z

    Added ManagedStateTest for SpillableComplexComponent

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71095282
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,136 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.annotation.concurrent.Immutable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private final ManagedStateSpillableStateStore store;
    --- End diff --
    
    The SpillableComplexComponent Should be set as a property so different stores can be plugged in


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71225716
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,136 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.annotation.concurrent.Immutable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private final ManagedStateSpillableStateStore store;
    +
    +  protected final SpillableByteMapImpl<ImmutablePair<Window, K>, V> internValues;
    +  protected final SpillableByteArrayListMultimapImpl<Window, K> internKeys;
    +
    +  private SpillableWindowedKeyedStorage()
    +  {
    +    // for kryo
    +    store = null;
    +    internValues = null;
    +    internKeys = null;
    +  }
    +
    +  public SpillableWindowedKeyedStorage(long bucket, byte[] identifier, Serde<Window, Slice> serdeWindow, Serde<K, Slice> serdeKey, Serde<ImmutablePair<Window, K>, Slice> serdeWindowKey, Serde<V, Slice> serdeValue)
    +  {
    +    store = new ManagedStateSpillableStateStore();
    +    store.getCheckpointManager().setNeedBucketFile(false);
    +    internValues = new SpillableByteMapImpl<>(store, identifier, bucket, serdeWindowKey, serdeValue);
    +    internKeys = new SpillableByteArrayListMultimapImpl<>(store, identifier, bucket, serdeWindow, serdeKey);
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internKeys.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internKeys.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    List<K> keys = internKeys.get(window);
    +    for (K key : keys) {
    +      internValues.remove(new ImmutablePair<>(window, key));
    +    }
    +    internKeys.removeAll(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    List<K> keys = internKeys.get(fromWindow);
    +    internValues.remove(toWindow);
    +    for (K key : keys) {
    +      internKeys.put(toWindow, key);
    +      ImmutablePair<Window, K> oldKey = new ImmutablePair<>(fromWindow, key);
    +      ImmutablePair<Window, K> newKey = new ImmutablePair<>(toWindow, key);
    +
    +      V value = internValues.get(oldKey);
    +      internValues.remove(oldKey);
    +      internValues.put(newKey, value);
    +    }
    +    internKeys.removeAll(fromWindow);
    +  }
    +
    +  @Override
    +  public void beginApexWindow(long windowId)
    +  {
    +    store.beginWindow(windowId);
    --- End diff --
    
    Will make this change, but what about the checkpoint listener callbacks? Should they still be done on the store like what I'm doing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71096140
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,136 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.annotation.concurrent.Immutable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private final ManagedStateSpillableStateStore store;
    +
    +  protected final SpillableByteMapImpl<ImmutablePair<Window, K>, V> internValues;
    +  protected final SpillableByteArrayListMultimapImpl<Window, K> internKeys;
    --- End diff --
    
    This should be the SpillableByteArrayListMultimap interface


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71095257
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,136 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.annotation.concurrent.Immutable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private final ManagedStateSpillableStateStore store;
    --- End diff --
    
    The field here should be of type SpillableComplexComponent, that way ManagedState isn't hardcoded


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71095327
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,136 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.annotation.concurrent.Immutable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private final ManagedStateSpillableStateStore store;
    +
    +  protected final SpillableByteMapImpl<ImmutablePair<Window, K>, V> internValues;
    +  protected final SpillableByteArrayListMultimapImpl<Window, K> internKeys;
    +
    +  private SpillableWindowedKeyedStorage()
    +  {
    +    // for kryo
    +    store = null;
    +    internValues = null;
    +    internKeys = null;
    +  }
    +
    +  public SpillableWindowedKeyedStorage(long bucket, byte[] identifier, Serde<Window, Slice> serdeWindow, Serde<K, Slice> serdeKey, Serde<ImmutablePair<Window, K>, Slice> serdeWindowKey, Serde<V, Slice> serdeValue)
    +  {
    +    store = new ManagedStateSpillableStateStore();
    +    store.getCheckpointManager().setNeedBucketFile(false);
    +    internValues = new SpillableByteMapImpl<>(store, identifier, bucket, serdeWindowKey, serdeValue);
    +    internKeys = new SpillableByteArrayListMultimapImpl<>(store, identifier, bucket, serdeWindow, serdeKey);
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internKeys.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internKeys.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    List<K> keys = internKeys.get(window);
    +    for (K key : keys) {
    +      internValues.remove(new ImmutablePair<>(window, key));
    +    }
    +    internKeys.removeAll(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    List<K> keys = internKeys.get(fromWindow);
    +    internValues.remove(toWindow);
    +    for (K key : keys) {
    +      internKeys.put(toWindow, key);
    +      ImmutablePair<Window, K> oldKey = new ImmutablePair<>(fromWindow, key);
    +      ImmutablePair<Window, K> newKey = new ImmutablePair<>(toWindow, key);
    +
    +      V value = internValues.get(oldKey);
    +      internValues.remove(oldKey);
    +      internValues.put(newKey, value);
    +    }
    +    internKeys.removeAll(fromWindow);
    +  }
    +
    +  @Override
    +  public void beginApexWindow(long windowId)
    +  {
    +    store.beginWindow(windowId);
    --- End diff --
    
    spillableComplexComponent.beginWindow(windowId)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71473044
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,198 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.Spillable;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private SpillableStateStore store;
    +  private SpillableComplexComponentImpl sccImpl;
    +  private long bucket;
    +  @NotNull
    +  private String identifier;
    +  @NotNull
    +  private Serde<Window, Slice> windowSerde;
    +  @NotNull
    +  private Serde<ImmutablePair<Window, K>, Slice> windowKeyPairSerde;
    +  @NotNull
    +  private Serde<K, Slice> keySerde;
    +  @NotNull
    +  private Serde<V, Slice> valueSerde;
    +
    +  protected Spillable.SpillableByteMap<ImmutablePair<Window, K>, V> internValues;
    +  protected Spillable.SpillableByteArrayListMultimap<Window, K> internKeys;
    +
    +  public SpillableWindowedKeyedStorage()
    +  {
    +  }
    +
    +  public SpillableWindowedKeyedStorage(long bucket, String identifier,
    +      Serde<Window, Slice> windowSerde, Serde<ImmutablePair<Window, K>, Slice> windowKeyPairSerde, Serde<K, Slice> keySerde, Serde<V, Slice> valueSerde)
    +  {
    +    this.bucket = bucket;
    +    this.identifier = identifier;
    +    this.windowSerde = windowSerde;
    +    this.windowKeyPairSerde = windowKeyPairSerde;
    +    this.keySerde = keySerde;
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  public void setStore(SpillableStateStore store)
    +  {
    +    this.store = store;
    +  }
    +
    +  public void setBucket(long bucket)
    +  {
    +    this.bucket = bucket;
    +  }
    +
    +  public void setIdentifier(String identifier)
    +  {
    +    this.identifier = identifier;
    +  }
    +
    +  public void setWindowSerde(Serde<Window, Slice> windowSerde)
    +  {
    +    this.windowSerde = windowSerde;
    +  }
    +
    +  public void setWindowKeyPairSerde(Serde<ImmutablePair<Window, K>, Slice> windowKeyPairSerde)
    +  {
    +    this.windowKeyPairSerde = windowKeyPairSerde;
    +  }
    +
    +  public void setValueSerde(Serde<V, Slice> valueSerde)
    +  {
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internKeys.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internKeys.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    List<K> keys = internKeys.get(window);
    +    for (K key : keys) {
    +      internValues.remove(new ImmutablePair<>(window, key));
    +    }
    +    internKeys.removeAll(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    List<K> keys = internKeys.get(fromWindow);
    +    internValues.remove(toWindow);
    +    for (K key : keys) {
    +      internKeys.put(toWindow, key);
    +      ImmutablePair<Window, K> oldKey = new ImmutablePair<>(fromWindow, key);
    +      ImmutablePair<Window, K> newKey = new ImmutablePair<>(toWindow, key);
    +
    +      V value = internValues.get(oldKey);
    +      internValues.remove(oldKey);
    +      internValues.put(newKey, value);
    +    }
    +    internKeys.removeAll(fromWindow);
    +  }
    +
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store == null) {
    +      // provide a default managed state store
    +      store = new ManagedStateSpillableStateStore();
    +    }
    +    sccImpl = new SpillableComplexComponentImpl(store);
    +    internValues = sccImpl.newSpillableByteMap((identifier + "#values").getBytes(), bucket, windowKeyPairSerde, valueSerde);
    --- End diff --
    
    There is a constraint on the identifiers right now that they have to be the same number of bytes in length. Also you want the identifiers to be small since all the keys stored in managed state will be prefixed with it. For this case you can not specify an identifier. If you don't specify an identifier a single byte identifier will automatically generated for each data structure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71225979
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,136 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.annotation.concurrent.Immutable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private final ManagedStateSpillableStateStore store;
    +
    +  protected final SpillableByteMapImpl<ImmutablePair<Window, K>, V> internValues;
    +  protected final SpillableByteArrayListMultimapImpl<Window, K> internKeys;
    +
    +  private SpillableWindowedKeyedStorage()
    +  {
    +    // for kryo
    +    store = null;
    +    internValues = null;
    +    internKeys = null;
    +  }
    +
    +  public SpillableWindowedKeyedStorage(long bucket, byte[] identifier, Serde<Window, Slice> serdeWindow, Serde<K, Slice> serdeKey, Serde<ImmutablePair<Window, K>, Slice> serdeWindowKey, Serde<V, Slice> serdeValue)
    +  {
    +    store = new ManagedStateSpillableStateStore();
    +    store.getCheckpointManager().setNeedBucketFile(false);
    +    internValues = new SpillableByteMapImpl<>(store, identifier, bucket, serdeWindowKey, serdeValue);
    +    internKeys = new SpillableByteArrayListMultimapImpl<>(store, identifier, bucket, serdeWindow, serdeKey);
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internKeys.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internKeys.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    List<K> keys = internKeys.get(window);
    +    for (K key : keys) {
    +      internValues.remove(new ImmutablePair<>(window, key));
    +    }
    +    internKeys.removeAll(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    List<K> keys = internKeys.get(fromWindow);
    +    internValues.remove(toWindow);
    +    for (K key : keys) {
    +      internKeys.put(toWindow, key);
    +      ImmutablePair<Window, K> oldKey = new ImmutablePair<>(fromWindow, key);
    +      ImmutablePair<Window, K> newKey = new ImmutablePair<>(toWindow, key);
    +
    +      V value = internValues.get(oldKey);
    +      internValues.remove(oldKey);
    +      internValues.put(newKey, value);
    +    }
    +    internKeys.removeAll(fromWindow);
    +  }
    +
    --- End diff --
    
    Will do. Can you explain a little bit what bucket and identifier actually do here? Is it something that is used to name the directories in HDFS eventually?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): incorporating...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r72309272
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java ---
    @@ -0,0 +1,122 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.window;
    +
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
    +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
    +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
    +
    +import com.datatorrent.lib.util.KryoCloneUtils;
    +
    +/**
    + * Unit tests for Spillable Windowed Storage
    + */
    +public class SpillableWindowedStorageTest
    +{
    +  @Rule
    +  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
    +
    +  @Ignore
    +  @Test
    +  public void testWindowedPlainStorage()
    +  {
    +    SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>();
    +    Window window1 = new Window.TimeWindow<>(1000, 10);
    +    Window window2 = new Window.TimeWindow<>(1010, 10);
    +    Window window3 = new Window.TimeWindow<>(1020, 10);
    +    storage.setStore(testMeta.store);
    +    storage.setup(testMeta.operatorContext);
    +    storage.beginApexWindow(1000);
    +    storage.put(window1, 1);
    +    storage.put(window2, 2);
    +    storage.put(window3, 3);
    +    storage.endApexWindow();
    +    storage.beginApexWindow(1001);
    +    storage.put(window1, 4);
    +    storage.put(window2, 5);
    +    storage.endApexWindow();
    +    storage.beforeCheckpoint(1001);
    +    storage.checkpointed(1001);
    +
    +    SpillableWindowedPlainStorage<Integer> clonedStorage = KryoCloneUtils.cloneObject(storage);
    +
    +    storage.beginApexWindow(1002);
    +    storage.put(window1, 6);
    +    storage.put(window2, 7);
    +    storage.endApexWindow();
    +
    +    Assert.assertEquals(6L, storage.get(window1).longValue());
    +    Assert.assertEquals(7L, storage.get(window2).longValue());
    +    Assert.assertEquals(3L, storage.get(window3).longValue());
    +
    +    storage.beginApexWindow(1003);
    +    storage.put(window1, 8);
    +    storage.put(window2, 9);
    +    storage.endApexWindow();
    +
    +    // simulating crash here
    +    storage.teardown();
    +
    +    storage = clonedStorage;
    +    storage.setup(testMeta.operatorContext);
    +
    --- End diff --
    
    @ilooner The teardown and setup calls here are simulating a recovery. The underlying storage here has a size of 0 after setup. Please take a look when you have a chance. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: APEXMALHAR-2130 REVIEW ONLY (WindowedOperator...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r72494611
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java ---
    @@ -0,0 +1,188 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +
    +import org.apache.apex.malhar.lib.state.spillable.Spillable;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures
    + *
    + * @param <T> Type of the value per window
    + */
    +public class SpillableWindowedPlainStorage<T> implements WindowedStorage.WindowedPlainStorage<T>
    +{
    +  private SpillableStateStore store;
    +  private transient SpillableComplexComponentImpl sccImpl;
    +  private long bucket;
    +  private Serde<Window, Slice> windowSerde;
    +  private Serde<T, Slice> valueSerde;
    +
    +  protected transient Spillable.SpillableByteMap<Window, T> internMap;
    +
    +  public SpillableWindowedPlainStorage()
    +  {
    +  }
    +
    +  public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde)
    +  {
    +    this.bucket = bucket;
    +    this.windowSerde = windowSerde;
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  public void setStore(SpillableStateStore store)
    +  {
    +    this.store = store;
    +  }
    +
    +  public void setBucket(long bucket)
    +  {
    +    this.bucket = bucket;
    +  }
    +
    +  public void setWindowSerde(Serde<Window, Slice> windowSerde)
    +  {
    +    this.windowSerde = windowSerde;
    +  }
    +
    +  public void setValueSerde(Serde<T, Slice> valueSerde)
    +  {
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  @Override
    +  public void put(Window window, T value)
    +  {
    +    internMap.put(window, value);
    +  }
    +
    +  @Override
    +  public T get(Window window)
    +  {
    +    return internMap.get(window);
    +  }
    +
    +  @Override
    +  public Iterable<Map.Entry<Window, T>> entrySet()
    +  {
    +    return internMap.entrySet();
    +  }
    +
    +  @Override
    +  public Iterator<Map.Entry<Window, T>> iterator()
    +  {
    +    return internMap.entrySet().iterator();
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internMap.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internMap.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    internMap.remove(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    internMap.put(toWindow, internMap.remove(fromWindow));
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store == null) {
    +      // provide a default store
    +      store = new ManagedStateSpillableStateStore();
    +    }
    +    if (bucket == 0) {
    +      // choose a bucket that is almost guaranteed to be unique
    +      bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode();
    +    }
    +    // set default serdes
    +    if (windowSerde == null) {
    +      windowSerde = new SerdeKryoSlice<>();
    +    }
    +    if (valueSerde == null) {
    +      valueSerde = new SerdeKryoSlice<>();
    +    }
    +    sccImpl = new SpillableComplexComponentImpl(store);
    +    sccImpl.setup(context);
    +    internMap = sccImpl.newSpillableByteMap(bucket, windowSerde, valueSerde);
    --- End diff --
    
    @ilooner Wouldn't that mean we should make the SpillableByteMapImpl non transient and make it serializable by kryo? When I did that, it had this error:
    
    ```
    com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
    Serialization trace:
    internMap (org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage)
    
    	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
    	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
    	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
    	at com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:125)
    	at com.datatorrent.lib.util.KryoCloneUtils.cloneObject(KryoCloneUtils.java:145)
    	at org.apache.apex.malhar.lib.window.SpillableWindowedStorageTest.testWindowedPlainStorage(SpillableWindowedStorageTest.java:62)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
    	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
    	at com.intellij.junit4.JUnit4TestRunnerUtil$IgnoreIgnoredTestJUnit4ClassRunner.runChild(JUnit4TestRunnerUtil.java:365)
    	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    	at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    	at org.junit.runner.JUnitCore.run(JUnitCore.java:160)
    	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
    	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
    	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:253)
    	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
    Caused by: java.lang.UnsupportedOperationException
    	at org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.entrySet(SpillableByteMapImpl.java:161)
    	at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:80)
    	at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:21)
    	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
    	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
    	... 33 more
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71096090
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,136 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.annotation.concurrent.Immutable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private final ManagedStateSpillableStateStore store;
    +
    +  protected final SpillableByteMapImpl<ImmutablePair<Window, K>, V> internValues;
    --- End diff --
    
    This should be the SpillableByteMap interface



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: APEXMALHAR-2130 REVIEW ONLY (WindowedOperator...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r72389548
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java ---
    @@ -0,0 +1,188 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +
    +import org.apache.apex.malhar.lib.state.spillable.Spillable;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponentImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures
    + *
    + * @param <T> Type of the value per window
    + */
    +public class SpillableWindowedPlainStorage<T> implements WindowedStorage.WindowedPlainStorage<T>
    +{
    +  private SpillableStateStore store;
    +  private transient SpillableComplexComponentImpl sccImpl;
    +  private long bucket;
    +  private Serde<Window, Slice> windowSerde;
    +  private Serde<T, Slice> valueSerde;
    +
    +  protected transient Spillable.SpillableByteMap<Window, T> internMap;
    +
    +  public SpillableWindowedPlainStorage()
    +  {
    +  }
    +
    +  public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde)
    +  {
    +    this.bucket = bucket;
    +    this.windowSerde = windowSerde;
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  public void setStore(SpillableStateStore store)
    +  {
    +    this.store = store;
    +  }
    +
    +  public void setBucket(long bucket)
    +  {
    +    this.bucket = bucket;
    +  }
    +
    +  public void setWindowSerde(Serde<Window, Slice> windowSerde)
    +  {
    +    this.windowSerde = windowSerde;
    +  }
    +
    +  public void setValueSerde(Serde<T, Slice> valueSerde)
    +  {
    +    this.valueSerde = valueSerde;
    +  }
    +
    +  @Override
    +  public void put(Window window, T value)
    +  {
    +    internMap.put(window, value);
    +  }
    +
    +  @Override
    +  public T get(Window window)
    +  {
    +    return internMap.get(window);
    +  }
    +
    +  @Override
    +  public Iterable<Map.Entry<Window, T>> entrySet()
    +  {
    +    return internMap.entrySet();
    +  }
    +
    +  @Override
    +  public Iterator<Map.Entry<Window, T>> iterator()
    +  {
    +    return internMap.entrySet().iterator();
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internMap.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internMap.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    internMap.remove(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    internMap.put(toWindow, internMap.remove(fromWindow));
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +    if (store == null) {
    +      // provide a default store
    +      store = new ManagedStateSpillableStateStore();
    +    }
    +    if (bucket == 0) {
    +      // choose a bucket that is almost guaranteed to be unique
    +      bucket = (context.getValue(Context.DAGContext.APPLICATION_NAME) + "#" + context.getId()).hashCode();
    +    }
    +    // set default serdes
    +    if (windowSerde == null) {
    +      windowSerde = new SerdeKryoSlice<>();
    +    }
    +    if (valueSerde == null) {
    +      valueSerde = new SerdeKryoSlice<>();
    +    }
    +    sccImpl = new SpillableComplexComponentImpl(store);
    +    sccImpl.setup(context);
    +    internMap = sccImpl.newSpillableByteMap(bucket, windowSerde, valueSerde);
    --- End diff --
    
    @davidyan74 I think you are getting a size of zero because you are allocating a new spillable data structure here. You should check if sccImpl is null and only if it's null initialize it and all the SpillableDatastructures


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71095303
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,136 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.annotation.concurrent.Immutable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private final ManagedStateSpillableStateStore store;
    +
    +  protected final SpillableByteMapImpl<ImmutablePair<Window, K>, V> internValues;
    +  protected final SpillableByteArrayListMultimapImpl<Window, K> internKeys;
    +
    +  private SpillableWindowedKeyedStorage()
    +  {
    +    // for kryo
    +    store = null;
    +    internValues = null;
    +    internKeys = null;
    +  }
    +
    +  public SpillableWindowedKeyedStorage(long bucket, byte[] identifier, Serde<Window, Slice> serdeWindow, Serde<K, Slice> serdeKey, Serde<ImmutablePair<Window, K>, Slice> serdeWindowKey, Serde<V, Slice> serdeValue)
    +  {
    +    store = new ManagedStateSpillableStateStore();
    +    store.getCheckpointManager().setNeedBucketFile(false);
    +    internValues = new SpillableByteMapImpl<>(store, identifier, bucket, serdeWindowKey, serdeValue);
    --- End diff --
    
    Spillable data structures should be created in setup()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: APEXMALHAR-2130 REVIEW ONLY (WindowedOperator...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r72521168
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java ---
    @@ -0,0 +1,122 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.window;
    +
    +import org.junit.Assert;
    +import org.junit.Ignore;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
    +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedKeyedStorage;
    +import org.apache.apex.malhar.lib.window.impl.SpillableWindowedPlainStorage;
    +
    +import com.datatorrent.lib.util.KryoCloneUtils;
    +
    +/**
    + * Unit tests for Spillable Windowed Storage
    + */
    +public class SpillableWindowedStorageTest
    +{
    +  @Rule
    +  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
    +
    +  @Ignore
    +  @Test
    +  public void testWindowedPlainStorage()
    +  {
    +    SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>();
    +    Window window1 = new Window.TimeWindow<>(1000, 10);
    +    Window window2 = new Window.TimeWindow<>(1010, 10);
    +    Window window3 = new Window.TimeWindow<>(1020, 10);
    +    storage.setStore(testMeta.store);
    +    storage.setup(testMeta.operatorContext);
    +    storage.beginApexWindow(1000);
    +    storage.put(window1, 1);
    +    storage.put(window2, 2);
    +    storage.put(window3, 3);
    +    storage.endApexWindow();
    +    storage.beginApexWindow(1001);
    +    storage.put(window1, 4);
    +    storage.put(window2, 5);
    +    storage.endApexWindow();
    +    storage.beforeCheckpoint(1001);
    +    storage.checkpointed(1001);
    +
    +    SpillableWindowedPlainStorage<Integer> clonedStorage = KryoCloneUtils.cloneObject(storage);
    +
    +    storage.beginApexWindow(1002);
    +    storage.put(window1, 6);
    +    storage.put(window2, 7);
    +    storage.endApexWindow();
    +
    +    Assert.assertEquals(6L, storage.get(window1).longValue());
    +    Assert.assertEquals(7L, storage.get(window2).longValue());
    +    Assert.assertEquals(3L, storage.get(window3).longValue());
    +
    +    storage.beginApexWindow(1003);
    +    storage.put(window1, 8);
    +    storage.put(window2, 9);
    +    storage.endApexWindow();
    +
    +    // simulating crash here
    +    storage.teardown();
    +
    +    storage = clonedStorage;
    +    storage.setup(testMeta.operatorContext);
    +
    --- End diff --
    
    @ilooner I made both SpillableComplexComponentImpl and SpillableByteMap non-transient and made the necessary changes to have them serializable by kryo. Take a look at the commit a71d2cc. But it looks like the data is still not fetched from disk after the recovery.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71095332
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java ---
    @@ -0,0 +1,136 @@
    +package org.apache.apex.malhar.lib.window.impl;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +import javax.annotation.concurrent.Immutable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.window.Window;
    +import org.apache.apex.malhar.lib.window.WindowedStorage;
    +import org.apache.commons.lang3.tuple.ImmutablePair;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by david on 7/15/16.
    + */
    +public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.WindowedKeyedStorage<K, V>
    +{
    +  @NotNull
    +  private final ManagedStateSpillableStateStore store;
    +
    +  protected final SpillableByteMapImpl<ImmutablePair<Window, K>, V> internValues;
    +  protected final SpillableByteArrayListMultimapImpl<Window, K> internKeys;
    +
    +  private SpillableWindowedKeyedStorage()
    +  {
    +    // for kryo
    +    store = null;
    +    internValues = null;
    +    internKeys = null;
    +  }
    +
    +  public SpillableWindowedKeyedStorage(long bucket, byte[] identifier, Serde<Window, Slice> serdeWindow, Serde<K, Slice> serdeKey, Serde<ImmutablePair<Window, K>, Slice> serdeWindowKey, Serde<V, Slice> serdeValue)
    +  {
    +    store = new ManagedStateSpillableStateStore();
    +    store.getCheckpointManager().setNeedBucketFile(false);
    +    internValues = new SpillableByteMapImpl<>(store, identifier, bucket, serdeWindowKey, serdeValue);
    +    internKeys = new SpillableByteArrayListMultimapImpl<>(store, identifier, bucket, serdeWindow, serdeKey);
    +  }
    +
    +  @Override
    +  public boolean containsWindow(Window window)
    +  {
    +    return internKeys.containsKey(window);
    +  }
    +
    +  @Override
    +  public long size()
    +  {
    +    return internKeys.size();
    +  }
    +
    +  @Override
    +  public void remove(Window window)
    +  {
    +    List<K> keys = internKeys.get(window);
    +    for (K key : keys) {
    +      internValues.remove(new ImmutablePair<>(window, key));
    +    }
    +    internKeys.removeAll(window);
    +  }
    +
    +  @Override
    +  public void migrateWindow(Window fromWindow, Window toWindow)
    +  {
    +    List<K> keys = internKeys.get(fromWindow);
    +    internValues.remove(toWindow);
    +    for (K key : keys) {
    +      internKeys.put(toWindow, key);
    +      ImmutablePair<Window, K> oldKey = new ImmutablePair<>(fromWindow, key);
    +      ImmutablePair<Window, K> newKey = new ImmutablePair<>(toWindow, key);
    +
    +      V value = internValues.get(oldKey);
    +      internValues.remove(oldKey);
    +      internValues.put(newKey, value);
    +    }
    +    internKeys.removeAll(fromWindow);
    +  }
    +
    +  @Override
    +  public void beginApexWindow(long windowId)
    +  {
    +    store.beginWindow(windowId);
    +  }
    +
    +  @Override
    +  public void endApexWindow()
    +  {
    +    store.endWindow();
    --- End diff --
    
    spillableComplexComponent.endWindow()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: APEXMALHAR-2130 REVIEW ONLY (WindowedOperator...

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 closed the pull request at:

    https://github.com/apache/apex-malhar/pull/345


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #345: REVIEW ONLY (WindowedOperator): splitting Win...

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/345#discussion_r71096442
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java ---
    @@ -0,0 +1,135 @@
    +package org.apache.apex.malhar.lib.window.impl;
    --- End diff --
    
    Same comments above would apply here as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---