You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@apex.apache.org by ilooner <gi...@git.apache.org> on 2016/06/21 06:59:50 UTC

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

GitHub user ilooner opened a pull request:

    https://github.com/apache/apex-malhar/pull/324

    Spillable Datastructures PR for review only

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ilooner/incubator-apex-malhar APEXMALHAR-2048_pull

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/apex-malhar/pull/324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #324
    
----
commit c2c3f0acfcdf033a0e3044967ab3f8048f719259
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-06-05T00:11:20Z

    - Intermediate commit.

commit 1bee1ed0308470ff35a739ab6f9e94d53debddb8
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-06-13T06:03:21Z

    Intermediate commit

commit 60acf68b96f2145af3d90b410c6b20613347f881
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-06-21T06:58:09Z

    Intermediate commit

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72126631
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,262 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  private boolean isRunning = false;
    +  private boolean isInWindow = false;
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    --- End diff --
    
    Yes, I have the latest. Thanks for looking into this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72828686
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java ---
    @@ -411,45 +441,48 @@ public void recoveryTest()
         testMeta.store.setup(testMeta.operatorContext);
         map1.setup(testMeta.operatorContext);
     
    -    map1.beginWindow(1000);
    +    System.out.println("0");
    +    testMeta.store.beginWindow(0);
    +    map1.beginWindow(0);
         map1.put("x", "1");
         map1.put("y", "2");
         map1.put("z", "3");
         map1.endWindow();
    -    map1.beginWindow(1001);
    +    testMeta.store.endWindow();
    +
    +    System.out.println("1");
    +    testMeta.store.beginWindow(1);
    +    map1.beginWindow(1);
         map1.put("x", "4");
         map1.put("y", "5");
         map1.endWindow();
    -
    -    testMeta.store.beforeCheckpoint(1001);
    -    testMeta.store.checkpointed(1001);
    +    testMeta.store.endWindow();
    +    testMeta.store.beforeCheckpoint(1);
    +    testMeta.store.checkpointed(1);
     
         SpillableByteMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1);
     
    -    map1.beginWindow(1002);
    -    map1.put("x", "6");
    -    map1.put("y", "7");
    -    map1.endWindow();
    -
    -    Assert.assertEquals("6", map1.get("x"));
    -    Assert.assertEquals("7", map1.get("y"));
    -    Assert.assertEquals("3", map1.get("z"));
    -
    -    map1.beginWindow(1003);
    -    map1.put("x", "8");
    -    map1.put("y", "9");
    +    System.out.println("2");
    +    testMeta.store.beginWindow(2);
    +    map1.beginWindow(2);
    +    map1.put("x1", "6");
    +    map1.put("y1", "7");
         map1.endWindow();
    +    testMeta.store.endWindow();
     
         // simulating crash here
         map1.teardown();
         testMeta.store.teardown();
     
    +    System.out.println("Recovering");
    +
         map1 = clonedMap1;
         map1.getStore().setup(testMeta.operatorContext);
         map1.setup(testMeta.operatorContext);
    --- End diff --
    
    what is the activation window? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032183
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java ---
    @@ -16,30 +16,32 @@
      * specific language governing permissions and limitations
      * under the License.
      */
    -package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
    +package org.apache.apex.malhar.lib.utils.serde;
     
    -import org.junit.Assert;
    -import org.junit.Test;
    +import org.apache.commons.lang3.mutable.MutableInt;
     
    -import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableByteArrayListMultimap;
    +import com.datatorrent.netlet.util.Slice;
     
    -import com.esotericsoftware.kryo.Kryo;
    -
    -import com.datatorrent.lib.util.KryoCloneUtils;
    -
    -public class InMemSpillableByteArrayListMultimapTest
    +/**
    + * Created by tfarkas on 7/22/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032473
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java ---
    @@ -0,0 +1,597 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
    +
    +import com.google.common.collect.Lists;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.lib.helper.OperatorContextTestHelper;
    +import com.datatorrent.lib.util.KryoCloneUtils;
    +
    +/**
    + * Created by tfarkas on 6/19/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032214
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java ---
    @@ -0,0 +1,50 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.utils.serde;
    +
    +import org.apache.commons.lang3.mutable.MutableInt;
    +
    +import com.datatorrent.lib.appdata.gpo.GPOUtils;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73075053
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +
    +import com.datatorrent.api.Component;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +
    +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
    --- End diff --
    
    I am not in favor of adding a WindowListener interface. The reason is that I can't see any practical use-case where operators will hold  a collection of just components/window listeners (as shown in the example) and just invoke begin and end window on them collectively.
    
    From the examples I have seen, an operator needs these components for special operations and will have a dedicated reference to them so accessing them is direct.
    
    Basically, to me an interface makes sense when you provide an api which can have different implementations. Here we are trying to provide an api for completely different functional components.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73941677
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java ---
    @@ -0,0 +1,39 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    --- End diff --
    
    I think this should be moved to a more generic package, possibly in Apex Core. But I think it's okay for now to put it there. Just annotate with InterfaceStability.Unstable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72308835
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,262 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  private boolean isRunning = false;
    +  private boolean isInWindow = false;
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    --- End diff --
    
    @ilooner I added the toByteArray call on my PR and it works.
    However, the simulation of a crash did not work. The cloned store has 0 elements. Please take a look at my PR and see what I am doing wrong. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032285
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java ---
    @@ -0,0 +1,51 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.utils.serde;
    +
    +import org.apache.commons.lang3.mutable.MutableInt;
    +
    +import com.datatorrent.lib.appdata.gpo.GPOUtils;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/14/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner closed the pull request at:

    https://github.com/apache/apex-malhar/pull/324


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72203089
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,262 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  private boolean isRunning = false;
    +  private boolean isInWindow = false;
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    --- End diff --
    
    @davidyan74 The issue is that in containsKey toByteArray should be called on the key slice, but it is not. Don't have access to my laptop to fix now will fix tomorrow.
    
    The general algorithm for simulating a crash is to call the checkpoint callbacks clone the store with TestUtils.clone then do operations for a few more windows and call teardown. Then you take the cloned store you created with TestUtils.clone call setup, and resume at the window after the checkpointed window.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032134
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java ---
    @@ -0,0 +1,55 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.utils.serde;
    +
    +import org.apache.commons.lang3.mutable.MutableInt;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032779
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemorySpillableStateStoreTest.java ---
    @@ -0,0 +1,63 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable.inmem;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import com.datatorrent.lib.util.TestUtils;
    +
    +/**
    + * Created by tfarkas on 6/6/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73046598
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java ---
    @@ -0,0 +1,37 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +/**
    + * Classes implementing this interface can be used as generators for identifiers for Spillable data structures.
    + */
    --- End diff --
    
    Tim, 
    Where is this used? In the Application or the Operator. Can you put that information here so it is clear how to use it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by brightchen <gi...@git.apache.org>.
Github user brightchen commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72536975
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java ---
    @@ -0,0 +1,210 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +import org.apache.commons.lang3.ArrayUtils;
    +import org.apache.commons.lang3.mutable.MutableInt;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent
    +{
    +  @NotNull
    +  private SpillableStateStore store;
    +  @NotNull
    +  private byte[] identifier;
    +  private long bucket;
    +  @NotNull
    +  private Serde<K, Slice> serdeKey;
    +  @NotNull
    +  private Serde<V, Slice> serdeValue;
    +
    +  private int size = 0;
    +
    +  private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
    --- End diff --
    
    why this is transient while SpillableByteArrayListMultimapImpl.cache is non-transient? Are you try to let the platform checkpoint/load for cache data and using the store to manage already persistentted data? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032245
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java ---
    @@ -0,0 +1,103 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.utils.serde;
    +
    +import java.util.List;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.commons.lang3.mutable.MutableInt;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +
    +import com.datatorrent.lib.appdata.gpo.GPOUtils;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/11/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72829248
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java ---
    @@ -411,45 +441,48 @@ public void recoveryTest()
         testMeta.store.setup(testMeta.operatorContext);
         map1.setup(testMeta.operatorContext);
     
    -    map1.beginWindow(1000);
    +    System.out.println("0");
    +    testMeta.store.beginWindow(0);
    +    map1.beginWindow(0);
         map1.put("x", "1");
         map1.put("y", "2");
         map1.put("z", "3");
         map1.endWindow();
    -    map1.beginWindow(1001);
    +    testMeta.store.endWindow();
    +
    +    System.out.println("1");
    +    testMeta.store.beginWindow(1);
    +    map1.beginWindow(1);
         map1.put("x", "4");
         map1.put("y", "5");
         map1.endWindow();
    -
    -    testMeta.store.beforeCheckpoint(1001);
    -    testMeta.store.checkpointed(1001);
    +    testMeta.store.endWindow();
    +    testMeta.store.beforeCheckpoint(1);
    +    testMeta.store.checkpointed(1);
     
         SpillableByteMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1);
     
    -    map1.beginWindow(1002);
    -    map1.put("x", "6");
    -    map1.put("y", "7");
    -    map1.endWindow();
    -
    -    Assert.assertEquals("6", map1.get("x"));
    -    Assert.assertEquals("7", map1.get("y"));
    -    Assert.assertEquals("3", map1.get("z"));
    -
    -    map1.beginWindow(1003);
    -    map1.put("x", "8");
    -    map1.put("y", "9");
    +    System.out.println("2");
    --- End diff --
    
    nit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73075245
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +
    +import com.datatorrent.api.Component;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +
    +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
    --- End diff --
    
    The reason I brought this up is the WindowedOperator should have no knowledge of any Spillable components. Yet, I need to have a way to have WindowedOperator somehow call SpillableStateStore.beginWindow() and endWindow(). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73031644
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java ---
    @@ -0,0 +1,78 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Set;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Sets;
    +
    +public class SequentialSpillableIdentifierGenerator implements SpillableIdentifierGenerator
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73079678
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +
    +import com.datatorrent.api.Component;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +
    +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
    --- End diff --
    
    @davidyan74 
    the components that we write need to work in context of operator which is why they have ```beginWindow``` and ```endWindow``` in their contract. However, all these components provide different functionalities and therefore an api with multiple implementations.
    
    They are not interchange-able. For eg, there is a ```WindowBoundedService``` which has ```beginWindow``` and ```endWindow``` in its API but there is no practical use case where someone will use a ```WindowListener``` and set WindowBounderService as the implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73079738
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +
    +import com.datatorrent.api.Component;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +
    +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
    --- End diff --
    
    The WindowedOperator only needs a storage that it can grab data from, and the interface of which is defined in the WindowedStorage interfaces. The user of WindowedOperator can set the storage implementation before setup().  That's why it does not have any knowledge of Spillable components. Take a look at the interface WindowedStorage here: https://github.com/davidyan74/apex-malhar/blob/66da57e08e31edb15e83a050f3940dd276851c23/library/src/main/java/org/apache/apex/malhar/lib/window/WindowedStorage.java
    
    If user of the WindowedOperator wants a custom storage implementation, they can simply implement those interfaces without having to deal with Spillable components.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72129259
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,262 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  private boolean isRunning = false;
    +  private boolean isInWindow = false;
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    --- End diff --
    
    By the way, I tried to simulate a recovery by adding this in SpillableTestUtils.TestMeta:
    ```java
        public void simulateCrash()
        {
          store = new ManagedStateSpillableStateStore();
          applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
          ((FileAccessFSImpl)store.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
        }
    ```
    and doing this:
    ```java
        SpillableWindowedPlainStorage<Integer> storage = new SpillableWindowedPlainStorage<>();
        Window window1 = new Window.TimeWindow<>(1000, 10);
        Window window2 = new Window.TimeWindow<>(1010, 10);
        Window window3 = new Window.TimeWindow<>(1020, 10);
        storage.setStore(testMeta.store);
        storage.setup(testMeta.operatorContext);
        storage.beginApexWindow(1001);
        storage.put(window1, 4);
        storage.put(window2, 5);
        storage.endApexWindow();
        storage.beforeCheckpoint(1001);
        storage.checkpointed(1001);
        storage.beginApexWindow(1002);
        storage.put(window1, 6);
        storage.put(window2, 7);
        storage.endApexWindow();
    
        Assert.assertEquals(6L, storage.get(window1).longValue());
        Assert.assertEquals(7L, storage.get(window2).longValue());
        Assert.assertEquals(3L, storage.get(window3).longValue());
    
        testMeta.simulateCrash();
    
        // simulate recovery
        storage = new SpillableWindowedPlainStorage<>();
        storage.setStore(testMeta.store);
        storage.setup(testMeta.operatorContext);
        storage.beginApexWindow(1002);
        Assert.assertEquals(4L, storage.get(window1).longValue());
        Assert.assertEquals(5L, storage.get(window2).longValue());
        Assert.assertEquals(3L, storage.get(window3).longValue());
    ```
     in my unit test but didn't work. Any pointers?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner closed the pull request at:

    https://github.com/apache/apex-malhar/pull/324


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by sandeshh <gi...@git.apache.org>.
Github user sandeshh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r69600767
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java ---
    @@ -0,0 +1,128 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +
    +public class TimeBasedPriorityQueue<T>
    --- End diff --
    
    Have you looked at https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/DelayQueue.html ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032033
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java ---
    @@ -0,0 +1,116 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable.inmem;
    +
    +import java.util.Map;
    +import java.util.concurrent.Future;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
    +
    +import com.google.common.collect.Maps;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/6/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73809225
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,292 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.esotericsoftware.kryo.DefaultSerializer;
    +import com.esotericsoftware.kryo.serializers.FieldSerializer;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +@DefaultSerializer(FieldSerializer.class)
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +  private transient boolean isRunning = false;
    +  private transient boolean isInWindow = false;
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  private SpillableByteArrayListMultimapImpl()
    +  {
    +    // for kryo
    +  }
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    +  }
    +
    +  public SpillableStateStore getStore()
    +  {
    +    return store;
    +  }
    +
    +  @Override
    +  public List<V> get(@Nullable K key)
    +  {
    +    return getHelper(key);
    +  }
    +
    +  private SpillableArrayListImpl<V> getHelper(@Nullable K key)
    +  {
    +    SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
    +
    +    if (spillableArrayList == null) {
    +      Slice keySlice = serdeKey.serialize(key);
    +      Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray());
    --- End diff --
    
    @davidyan74 no the Spillable map prefixes each key with its identifier internally. It is not necessary for you to manually prefix the keys yourself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032354
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java ---
    @@ -0,0 +1,70 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.utils.serde;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +public class SliceUtils
    --- End diff --
    
    javadoc for each of the utility  methods


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r68559232
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,241 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    +  }
    +
    +  @Override
    +  public List<V> get(@Nullable K key)
    +  {
    +    return getHelper(key);
    +  }
    +
    +  private SpillableArrayListImpl<V> getHelper(@Nullable K key)
    +  {
    +    SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
    +
    --- End diff --
    
    How to get the value for the keys which are expired in cache?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73073791
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,292 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.esotericsoftware.kryo.DefaultSerializer;
    +import com.esotericsoftware.kryo.serializers.FieldSerializer;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +@DefaultSerializer(FieldSerializer.class)
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +  private transient boolean isRunning = false;
    +  private transient boolean isInWindow = false;
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  private SpillableByteArrayListMultimapImpl()
    +  {
    +    // for kryo
    +  }
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    +  }
    +
    +  public SpillableStateStore getStore()
    +  {
    +    return store;
    +  }
    +
    +  @Override
    +  public List<V> get(@Nullable K key)
    +  {
    +    return getHelper(key);
    +  }
    +
    +  private SpillableArrayListImpl<V> getHelper(@Nullable K key)
    +  {
    +    SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
    +
    +    if (spillableArrayList == null) {
    +      Slice keySlice = serdeKey.serialize(key);
    +      Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray());
    --- End diff --
    
    I see that you removed the identifier as part of the key that stores the size. Would this create a conflict with different identifiers?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r68559266
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java ---
    @@ -0,0 +1,191 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +import org.apache.commons.lang3.ArrayUtils;
    +import org.apache.commons.lang3.mutable.MutableInt;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent
    +{
    +  @NotNull
    +  private SpillableStateStore store;
    +  @NotNull
    +  private byte[] identifier;
    +  private long bucket;
    +  @NotNull
    +  private Serde<K, Slice> serdeKey;
    +  @NotNull
    +  private Serde<V, Slice> serdeValue;
    +
    +  private int size = 0;
    +
    +  private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
    +  private transient MutableInt tempOffset = new MutableInt();
    +
    +  private SpillableByteMapImpl()
    +  {
    +    //for kryo
    +  }
    +
    +  public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +  }
    +
    +  @Override
    +  public int size()
    +  {
    +    return size;
    +  }
    +
    +  @Override
    +  public boolean isEmpty()
    +  {
    +    return size == 0;
    +  }
    +
    +  @Override
    +  public boolean containsKey(Object o)
    +  {
    +    return get(o) != null;
    +  }
    +
    +  @Override
    +  public boolean containsValue(Object o)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public V get(Object o)
    +  {
    +    K key = (K)o;
    +
    +    if (cache.getRemovedKeys().contains(key)) {
    +      return null;
    +    }
    +
    +    V val = cache.get(key);
    +
    +    if (val != null) {
    +      return val;
    +    }
    +
    +    Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
    +
    +    if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
    +      return null;
    +    }
    +
    +    tempOffset.setValue(valSlice.offset + identifier.length);
    +    return serdeValue.deserialize(valSlice, tempOffset);
    +  }
    +
    +  @Override
    +  public V put(K k, V v)
    +  {
    +    V value = get(k);
    +
    +    if (value == null) {
    +      size++;
    +    }
    +
    +    cache.put(k, v);
    +
    +    return value;
    +  }
    +
    +  @Override
    +  public V remove(Object o)
    +  {
    +    V value = get(o);
    +
    +    if (value != null) {
    +      size--;
    +    }
    +
    +    cache.remove((K)o);
    +
    +    return value;
    +  }
    +
    +  @Override
    +  public void putAll(Map<? extends K, ? extends V> map)
    +  {
    +    for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
    +      put(entry.getKey(), entry.getValue());
    +    }
    +  }
    +
    +  @Override
    +  public void clear()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public Set<K> keySet()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public Collection<V> values()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public Set<Entry<K, V>> entrySet()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public void setup(Context.OperatorContext context)
    +  {
    +  }
    --- End diff --
    
    I think store.setup() has to be called in setup method because SpillableStateStore extends from Component interface. Similarly the other methods like beginWindow, endWindow, teardown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r68559252
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,241 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    +  }
    +
    +  @Override
    +  public List<V> get(@Nullable K key)
    +  {
    +    return getHelper(key);
    +  }
    +
    +  private SpillableArrayListImpl<V> getHelper(@Nullable K key)
    +  {
    +    SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
    +
    +    if (spillableArrayList == null) {
    +      Slice keyPrefix = serdeKey.serialize(key);
    +      Integer size = map.get(SliceUtils.concatenate(keyPrefix, SIZE_KEY_SUFFIX));
    +
    +      if (size == null) {
    +        return null;
    +      }
    +
    +      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.buffer, store, serdeValue);
    +      spillableArrayList.setSize(size);
    +
    +      cache.put(key, spillableArrayList);
    +    }
    +
    +    return spillableArrayList;
    +  }
    +
    +  @Override
    +  public Set<K> keySet()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public Multiset<K> keys()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public Collection<V> values()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public Collection<Map.Entry<K, V>> entries()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public List<V> removeAll(@Nullable Object key)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public void clear()
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public int size()
    +  {
    +    return map.size();
    +  }
    +
    +  @Override
    +  public boolean isEmpty()
    +  {
    +    return map.isEmpty();
    +  }
    +
    +  @Override
    +  public boolean containsKey(@Nullable Object key)
    +  {
    +    return map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key), SIZE_KEY_SUFFIX));
    +  }
    +
    +  @Override
    +  public boolean containsValue(@Nullable Object value)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean containsEntry(@Nullable Object key, @Nullable Object value)
    +  {
    +    throw new UnsupportedOperationException();
    +  }
    +
    +  @Override
    +  public boolean put(@Nullable K key, @Nullable V value)
    +  {
    +    SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
    +
    +    if (spillableArrayList == null) {
    +      spillableArrayList = new SpillableArrayListImpl<V>(bucket, serdeKey.serialize(key).buffer, store, serdeValue);
    +
    +      cache.put(key, spillableArrayList);
    +    }
    +
    +    spillableArrayList.add(value);
    --- End diff --
    
    I think, map update also be in put(), otherwise, size will change only at endWindow().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73046023
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java ---
    @@ -0,0 +1,37 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +/**
    + * Classes implementing this interface can be used as generators for identifiers for Spillable data structures.
    + */
    +public interface SpillableIdentifierGenerator
    +{
    +  /**
    +   * Generators the next valid identifier for a Spillable data structure.
    +   * @return A byte array which represents the next valid identifier for a Spillable data structure.
    +   */
    +  public byte[] next();
    --- End diff --
    
    nit: public is redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72001616
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java ---
    @@ -0,0 +1,458 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * Created by tfarkas on 6/19/16.
    + */
    +public class SpillableArrayListImplTest
    +{
    +  public static final byte[] ID1 = new byte[]{(byte)0};
    +  public static final byte[] ID2 = new byte[]{(byte)1};
    +
    +  @Rule
    +  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
    +
    +  @Test
    +  public void simpleAddGetAndSetTest1()
    +  {
    +    InMemSpillableStateStore store = new InMemSpillableStateStore();
    +
    +    simpleAddGetAndSetTest1Helper(store);
    +  }
    +
    +  @Test
    +  public void simpleAddGetAndSetManagedStateTest1()
    +  {
    +    simpleAddGetAndSetTest1Helper(testMeta.store);
    +  }
    +
    +  public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
    +  {
    +    SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
    +        new SerdeStringSlice(), 1);
    +
    +    store.setup(testMeta.operatorContext);
    +    list.setup(testMeta.operatorContext);
    +
    +    long windowId = 0L;
    +    store.beginWindow(windowId);
    +    list.beginWindow(windowId);
    +    windowId++;
    --- End diff --
    
    Should this increment be done before the next beginWindow()? It looks like the incremented windowId is used for the checkpoint callbacks later on but shouldn't the callbacks get the windowId before the increment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032752
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java ---
    @@ -0,0 +1,119 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import com.google.common.collect.Sets;
    +
    +/**
    + * Created by tfarkas on 6/4/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73969296
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +
    +import com.datatorrent.api.Component;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +
    +/**
    + * Implementations of this interface are used by Spillable datastructures to spill data to disk.
    + */
    +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
    +    Operator.CheckpointNotificationListener, WindowListener
    +{
    --- End diff --
    
    Can we add ```hasBeenSetup()``` to this?
    I see in the tests that the ```setup()``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032414
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java ---
    @@ -0,0 +1,128 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import com.datatorrent.lib.util.TestUtils;
    +
    +/**
    + * Created by tfarkas on 6/5/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r68560022
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,241 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    +  }
    +
    +  @Override
    +  public List<V> get(@Nullable K key)
    +  {
    +    return getHelper(key);
    +  }
    +
    +  private SpillableArrayListImpl<V> getHelper(@Nullable K key)
    +  {
    +    SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
    +
    +    if (spillableArrayList == null) {
    +      Slice keyPrefix = serdeKey.serialize(key);
    +      Integer size = map.get(SliceUtils.concatenate(keyPrefix, SIZE_KEY_SUFFIX));
    +
    --- End diff --
    
    Accessing the value from map should be as follows: 
    map.get(SliceUtils.concatenate().buffer)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73075544
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +
    +import com.datatorrent.api.Component;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +
    +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
    --- End diff --
    
    Also, as you can see, we are duplicating the beginWindow and endWindow method declaration in multiple interfaces. (see SpillableComponent and SpillableStateStore)  This can be avoided with a single generic WindowListener interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032673
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java ---
    @@ -0,0 +1,134 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.List;
    +
    +import org.junit.Assert;
    +import org.junit.rules.TestWatcher;
    +import org.junit.runner.Description;
    +
    +import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils;
    +import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +import org.apache.commons.lang3.mutable.MutableInt;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.lib.appdata.gpo.GPOUtils;
    +import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
    +import com.datatorrent.lib.util.TestUtils;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/18/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73031608
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java ---
    @@ -0,0 +1,237 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.io.Serializable;
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +import org.apache.commons.lang3.ArrayUtils;
    +import org.apache.commons.lang3.mutable.MutableInt;
    +
    +import com.esotericsoftware.kryo.DefaultSerializer;
    +import com.esotericsoftware.kryo.serializers.FieldSerializer;
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +@DefaultSerializer(FieldSerializer.class)
    +public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent,
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72828429
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java ---
    @@ -411,45 +441,48 @@ public void recoveryTest()
         testMeta.store.setup(testMeta.operatorContext);
         map1.setup(testMeta.operatorContext);
     
    -    map1.beginWindow(1000);
    +    System.out.println("0");
    --- End diff --
    
    Please remove system.out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032099
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java ---
    @@ -16,29 +16,19 @@
      * specific language governing permissions and limitations
      * under the License.
      */
    -package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
    +package org.apache.apex.malhar.lib.state.spillable.managed;
     
    -import org.junit.Assert;
    -import org.junit.Test;
    +import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl;
    +import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
     
    -import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableArrayList;
    +import com.esotericsoftware.kryo.DefaultSerializer;
    +import com.esotericsoftware.kryo.serializers.FieldSerializer;
     
    -import com.esotericsoftware.kryo.Kryo;
    -
    -import com.datatorrent.lib.util.KryoCloneUtils;
    -
    -public class InMemSpillableArrayListTest
    +@DefaultSerializer(FieldSerializer.class)
    +public class ManagedStateSpillableStateStore extends ManagedStateImpl implements SpillableStateStore
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73079717
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +
    +import com.datatorrent.api.Component;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +
    +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
    --- End diff --
    
    @ilooner 
    Why does this not extend SpillableComponent?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73073399
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +
    +import com.datatorrent.api.Component;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +
    +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
    --- End diff --
    
    Should we make this more generic and have a separate interface called WindowListener that is basically:
    ```java
    interface WindowListener
    {
      void beginWindow(long windowId);
      void endWindow();
    }
    ```
    and have SpillableStateStore extends WindowListener, BucketedState, and Component<Context.OperatorContext>?
    The reason is the operator code will have to call these beginWindow and endWindow callbacks and it's better to make this as generic as possible.
    
    For example, I would have something like this in the operator code:
    ```java
    class SomeOperator extends BaseOperator
    {
      List<Component> components;
      
      @Override
      void setup(OperatorContext context)
      {
        for (Component c : components) {
          c.setup(context);
        }
      }
    
      @Override
      void beginWindow(long windowId)
      {
        for (Component c : components) {
          if (c instanceof WindowListener) {
            ((WindowListener)c).beginWindow(windowId);
          }
        }
      }
      // similar for endWindow and CheckpointListener callbacks
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73031870
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +
    +import com.datatorrent.api.Component;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +
    +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73047558
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java ---
    @@ -0,0 +1,202 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.List;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    --- End diff --
    
    Tim,
    It will be helpful if you provide a small example in the java docs that where (either operator or application),  new components to complex components are added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73031476
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,292 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.esotericsoftware.kryo.DefaultSerializer;
    +import com.esotericsoftware.kryo.serializers.FieldSerializer;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    --- End diff --
    
    Please remove the author and replace this by a proper javadoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032713
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java ---
    @@ -0,0 +1,134 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Set;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import com.google.common.collect.Sets;
    +
    +/**
    + * Created by tfarkas on 6/4/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r68559236
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,241 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    +  }
    +
    +  @Override
    +  public List<V> get(@Nullable K key)
    +  {
    +    return getHelper(key);
    +  }
    +
    +  private SpillableArrayListImpl<V> getHelper(@Nullable K key)
    +  {
    +    SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
    +
    +    if (spillableArrayList == null) {
    +      Slice keyPrefix = serdeKey.serialize(key);
    +      Integer size = map.get(SliceUtils.concatenate(keyPrefix, SIZE_KEY_SUFFIX));
    +
    --- End diff --
    
    What's the usage of "SIZE_KEY_SUFFIX"?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032644
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java ---
    @@ -0,0 +1,88 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
    +
    +/**
    + * Created by tfarkas on 7/17/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032590
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java ---
    @@ -0,0 +1,487 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.lib.helper.OperatorContextTestHelper;
    +import com.datatorrent.lib.util.KryoCloneUtils;
    +
    +/**
    + * Created by tfarkas on 6/6/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73046073
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java ---
    @@ -0,0 +1,37 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +/**
    + * Classes implementing this interface can be used as generators for identifiers for Spillable data structures.
    + */
    +public interface SpillableIdentifierGenerator
    +{
    +  /**
    +   * Generators the next valid identifier for a Spillable data structure.
    +   * @return A byte array which represents the next valid identifier for a Spillable data structure.
    +   */
    +  public byte[] next();
    +
    +  /**
    +   * Registers the given identifier with this {@link SpillableIdentifierGenerator}.
    +   * @param identifier The identifier to register with this {@link SpillableIdentifierGenerator}.
    +   */
    +  public void register(byte[] identifier);
    --- End diff --
    
    nit: public is redundant.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72913406
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java ---
    @@ -0,0 +1,458 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * Created by tfarkas on 6/19/16.
    + */
    +public class SpillableArrayListImplTest
    +{
    +  public static final byte[] ID1 = new byte[]{(byte)0};
    +  public static final byte[] ID2 = new byte[]{(byte)1};
    +
    +  @Rule
    +  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
    +
    +  @Test
    +  public void simpleAddGetAndSetTest1()
    +  {
    +    InMemSpillableStateStore store = new InMemSpillableStateStore();
    +
    +    simpleAddGetAndSetTest1Helper(store);
    +  }
    +
    +  @Test
    +  public void simpleAddGetAndSetManagedStateTest1()
    +  {
    +    simpleAddGetAndSetTest1Helper(testMeta.store);
    +  }
    +
    +  public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
    +  {
    +    SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
    +        new SerdeStringSlice(), 1);
    +
    +    store.setup(testMeta.operatorContext);
    +    list.setup(testMeta.operatorContext);
    +
    +    long windowId = 0L;
    +    store.beginWindow(windowId);
    +    list.beginWindow(windowId);
    +    windowId++;
    --- End diff --
    
    Yes will make change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72829218
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java ---
    @@ -411,45 +441,48 @@ public void recoveryTest()
         testMeta.store.setup(testMeta.operatorContext);
         map1.setup(testMeta.operatorContext);
     
    -    map1.beginWindow(1000);
    +    System.out.println("0");
    +    testMeta.store.beginWindow(0);
    +    map1.beginWindow(0);
         map1.put("x", "1");
         map1.put("y", "2");
         map1.put("z", "3");
         map1.endWindow();
    -    map1.beginWindow(1001);
    +    testMeta.store.endWindow();
    +
    +    System.out.println("1");
    --- End diff --
    
    nit: System.out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72123238
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,262 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  private boolean isRunning = false;
    +  private boolean isInWindow = false;
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    --- End diff --
    
    I got a ClassCastException due to this line in my unit tests:
    ```
    java.lang.ClassCastException: com.datatorrent.netlet.util.Slice cannot be cast to [B
    	at org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde.serialize(PassThruByteArraySliceSerde.java:10)
    	at org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.get(SpillableByteMapImpl.java:94)
    	at org.apache.apex.malhar.lib.state.spillable.SpillableByteMapImpl.containsKey(SpillableByteMapImpl.java:70)
    	at org.apache.apex.malhar.lib.state.spillable.SpillableByteArrayListMultimapImpl.containsKey(SpillableByteArrayListMultimapImpl.java:139)
    ```
    Shouldn't the 4th parameter be just serdeKey instead of the PassThruByteArraySliceSerde?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73031909
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java ---
    @@ -0,0 +1,128 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Iterator;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +
    +public class TimeBasedPriorityQueue<T>
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73079896
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +
    +import com.datatorrent.api.Component;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +
    +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
    --- End diff --
    
    Also, please take a look at https://github.com/davidyan74/apex-malhar/blob/66da57e08e31edb15e83a050f3940dd276851c23/library/src/main/java/org/apache/apex/malhar/lib/window/impl/AbstractWindowedOperator.java also. Note the beginWindow, endWindow, and the checkpoint callback implementations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73031950
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java ---
    @@ -0,0 +1,119 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Map;
    +import java.util.Set;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Maps;
    +import com.google.common.collect.Sets;
    +
    +public class WindowBoundedMapCache<K, V>
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by ilooner <gi...@git.apache.org>.
GitHub user ilooner reopened a pull request:

    https://github.com/apache/apex-malhar/pull/324

    Spillable Datastructures PR for review only

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ilooner/incubator-apex-malhar APEXMALHAR-2048_pull

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/apex-malhar/pull/324.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #324
    
----
commit b72ee18e6f7cf0a9d30b195439912d96acafb3b4
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-17T21:32:34Z

    Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap

commit 184653a23f662f78e7e6a7e1d53ffb0efbdb7127
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-18T01:56:41Z

    Added SpillableComplexComponentImpl

commit 9f17b4ba9233e3f46746505941a378236193f719
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-18T03:29:07Z

    Added propagating callbacks to store

commit fe41f0c20235aac3ca57facc2491ecfba11d20a7
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-21T06:38:48Z

    Added checkpoint callbacks to spillable complex components
    Added some half completed tests

commit e5cacbbc9e1ae18f5cf0938f13377f75a3a99cd2
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T04:09:54Z

    Finished unit test for SpillableArrayListMultimap

commit dc258b8900688264f349307737b59b096dbc3d2b
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T05:19:37Z

    Added unit test which uses managed state

commit ed9924b810a76c39404701da81f4753ab68af5a5
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T06:55:18Z

    Finished adding managed state tests for SpillableByteMap

commit 43da17d9633dc2405b596b25697b6b4b0baef69f
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T16:31:53Z

    Added ManagedStateTests For SpillableArrayList

commit 1343ccf4ccc5099d7abcf7f99ab2ed648baeef08
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T16:49:00Z

    Added managed state tests for SpillableArrayListMultimap

commit 57c4e5e3c3e019613f31753be5052c32ba762e53
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-24T16:57:35Z

    Added ManagedStateTest for SpillableComplexComponent

commit ebc822c4ba87111152f464395cf01221addc6d34
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-27T06:38:51Z

    Fixed broken containsKey method and added test

commit 983924215af2af7b683a3b654c320b7026b3165d
Author: David Yan <da...@datatorrent.com>
Date:   2016-07-28T17:27:01Z

    isolated unit test for spillablebytemapimpl recovery

commit 68ca96332dce24aa739e547ba87058dc35bb56df
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-29T08:10:28Z

    Testing

commit a7796d61d80111faa24543ddc05b17c2944138be
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-07-31T18:44:23Z

    Added recovery test for SpillableArrayListMultimap and added license headers

commit b3b4aacf44127c79553fc289f1f9e1f7c17b2ec6
Author: Timothy Farkas <ti...@datatorrent.com>
Date:   2016-08-01T03:34:18Z

     - Fixed bug
     - Added more recovery tests

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r68926769
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,241 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    --- End diff --
    
    Why do we need to configure the bucket?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032527
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java ---
    @@ -0,0 +1,344 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.List;
    +
    +import org.junit.Assert;
    +import org.junit.Rule;
    +import org.junit.Test;
    +
    +import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.collect.Lists;
    +
    +import com.datatorrent.api.Attribute;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.DAG;
    +import com.datatorrent.lib.helper.OperatorContextTestHelper;
    +import com.datatorrent.lib.util.KryoCloneUtils;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 7/17/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73031805
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java ---
    @@ -0,0 +1,202 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.List;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73966670
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java ---
    @@ -0,0 +1,324 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.ListIterator;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
    +
    +import com.esotericsoftware.kryo.DefaultSerializer;
    +import com.esotericsoftware.kryo.serializers.FieldSerializer;
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
    + * @param <T> The type of object stored in the {@link SpillableArrayListImpl}.
    + */
    +@DefaultSerializer(FieldSerializer.class)
    +public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T>, Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +  private long bucketId;
    --- End diff --
    
    It seems ```bucketId```, ```prefix``` are unused. Also please add getter/setter for batchsize.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73047976
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java ---
    @@ -0,0 +1,202 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.List;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Lists;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableComplexComponentImpl implements SpillableComplexComponent
    +{
    +  private List<SpillableComponent> componentList = Lists.newArrayList();
    +
    +  private boolean isRunning = false;
    +  private boolean isInWindow = false;
    +
    +  @NotNull
    +  private SpillableStateStore store;
    +
    +  @NotNull
    +  private SpillableIdentifierGenerator identifierGenerator;
    +
    +  public SpillableComplexComponentImpl(SpillableStateStore store)
    --- End diff --
    
    Isn't this missing no-args constructor? Kryo serialization may fail. 
    I usually add the following test in each non-transient component to make sure they are kryo serializable:
    ```
      @Test
      public void testSerde() throws IOException
      {
        ManagedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
        Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
      }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r68559313
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java ---
    @@ -0,0 +1,52 @@
    +package org.apache.apex.malhar.lib.utils.serde;
    +
    +import com.datatorrent.netlet.util.Slice;
    +
    +public class SliceUtils
    +{
    +  private SliceUtils()
    +  {
    +  }
    +
    +  public static byte[] concatenate(byte[] a, byte[] b)
    +  {
    +    byte[] output = new byte[a.length + b.length];
    +
    +    System.arraycopy(a, 0, output, 0, a.length);
    +    System.arraycopy(b, 0, output, a.length, b.length);
    +    return output;
    +  }
    +
    +  public static Slice concatenate(Slice a, Slice b)
    +  {
    +    int size = a.length + b.length;
    +    byte[] bytes = new byte[size];
    +
    +    System.arraycopy(a.buffer, a.offset, bytes, 0, a.length);
    +    System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
    +
    +    return new Slice(bytes);
    +  }
    +
    +  public static Slice concatenate(byte[] a, Slice b)
    +  {
    +    int size = a.length + b.length;
    +    byte[] bytes = new byte[size];
    +
    +    System.arraycopy(a, 0, bytes, 0, a.length);
    +    System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
    +
    +    return new Slice(bytes);
    +  }
    +
    +  public static Slice concatenate(Slice a, byte[] b)
    +  {
    +    int size = a.length + b.length;
    +    byte[] bytes = new byte[size];
    +
    +    System.arraycopy(a, a.offset, bytes, 0, a.length);
    --- End diff --
    
    a.buffer instead of a.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72829289
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java ---
    @@ -411,45 +441,48 @@ public void recoveryTest()
         testMeta.store.setup(testMeta.operatorContext);
         map1.setup(testMeta.operatorContext);
     
    -    map1.beginWindow(1000);
    +    System.out.println("0");
    +    testMeta.store.beginWindow(0);
    +    map1.beginWindow(0);
         map1.put("x", "1");
         map1.put("y", "2");
         map1.put("z", "3");
         map1.endWindow();
    -    map1.beginWindow(1001);
    +    testMeta.store.endWindow();
    +
    +    System.out.println("1");
    +    testMeta.store.beginWindow(1);
    +    map1.beginWindow(1);
         map1.put("x", "4");
         map1.put("y", "5");
         map1.endWindow();
    -
    -    testMeta.store.beforeCheckpoint(1001);
    -    testMeta.store.checkpointed(1001);
    +    testMeta.store.endWindow();
    +    testMeta.store.beforeCheckpoint(1);
    +    testMeta.store.checkpointed(1);
     
         SpillableByteMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1);
     
    -    map1.beginWindow(1002);
    -    map1.put("x", "6");
    -    map1.put("y", "7");
    -    map1.endWindow();
    -
    -    Assert.assertEquals("6", map1.get("x"));
    -    Assert.assertEquals("7", map1.get("y"));
    -    Assert.assertEquals("3", map1.get("z"));
    -
    -    map1.beginWindow(1003);
    -    map1.put("x", "8");
    -    map1.put("y", "9");
    +    System.out.println("2");
    +    testMeta.store.beginWindow(2);
    +    map1.beginWindow(2);
    +    map1.put("x1", "6");
    +    map1.put("y1", "7");
         map1.endWindow();
    +    testMeta.store.endWindow();
     
         // simulating crash here
         map1.teardown();
         testMeta.store.teardown();
     
    +    System.out.println("Recovering");
    --- End diff --
    
    nit: sys.out


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72655301
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java ---
    @@ -0,0 +1,210 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +import org.apache.commons.lang3.ArrayUtils;
    +import org.apache.commons.lang3.mutable.MutableInt;
    +
    +import com.google.common.base.Preconditions;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent
    +{
    +  @NotNull
    +  private SpillableStateStore store;
    +  @NotNull
    +  private byte[] identifier;
    +  private long bucket;
    +  @NotNull
    +  private Serde<K, Slice> serdeKey;
    +  @NotNull
    +  private Serde<V, Slice> serdeValue;
    +
    +  private int size = 0;
    +
    +  private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
    --- End diff --
    
    good catch @brightchenSpillableByteArrayListMultimapImpl.cache should be transient


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by davidyan74 <gi...@git.apache.org>.
Github user davidyan74 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73032806
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSliceTest.java ---
    @@ -16,29 +16,33 @@
      * specific language governing permissions and limitations
      * under the License.
      */
    -package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
    +package org.apache.apex.malhar.lib.utils.serde;
    +
    +import java.util.List;
     
     import org.junit.Assert;
     import org.junit.Test;
     
    -import org.apache.apex.malhar.lib.state.spillable.inmem.InMemMultiset;
    -
    -import com.esotericsoftware.kryo.Kryo;
    +import com.google.common.collect.Lists;
     
    -import com.datatorrent.lib.util.KryoCloneUtils;
    +import com.datatorrent.netlet.util.Slice;
     
    -public class InMemMultisetTest
    +/**
    + * Created by tfarkas on 6/19/16.
    --- End diff --
    
    javadoc


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72829166
  
    --- Diff: library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java ---
    @@ -411,45 +441,48 @@ public void recoveryTest()
         testMeta.store.setup(testMeta.operatorContext);
         map1.setup(testMeta.operatorContext);
     
    -    map1.beginWindow(1000);
    +    System.out.println("0");
    +    testMeta.store.beginWindow(0);
    +    map1.beginWindow(0);
         map1.put("x", "1");
         map1.put("y", "2");
         map1.put("z", "3");
         map1.endWindow();
    -    map1.beginWindow(1001);
    +    testMeta.store.endWindow();
    +
    +    System.out.println("1");
    +    testMeta.store.beginWindow(1);
    +    map1.beginWindow(1);
         map1.put("x", "4");
         map1.put("y", "5");
         map1.endWindow();
    -
    -    testMeta.store.beforeCheckpoint(1001);
    -    testMeta.store.checkpointed(1001);
    +    testMeta.store.endWindow();
    +    testMeta.store.beforeCheckpoint(1);
    +    testMeta.store.checkpointed(1);
     
         SpillableByteMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1);
     
    -    map1.beginWindow(1002);
    -    map1.put("x", "6");
    -    map1.put("y", "7");
    -    map1.endWindow();
    -
    -    Assert.assertEquals("6", map1.get("x"));
    -    Assert.assertEquals("7", map1.get("y"));
    -    Assert.assertEquals("3", map1.get("z"));
    -
    -    map1.beginWindow(1003);
    -    map1.put("x", "8");
    -    map1.put("y", "9");
    +    System.out.println("2");
    +    testMeta.store.beginWindow(2);
    +    map1.beginWindow(2);
    +    map1.put("x1", "6");
    +    map1.put("y1", "7");
         map1.endWindow();
    +    testMeta.store.endWindow();
     
         // simulating crash here
         map1.teardown();
         testMeta.store.teardown();
     
    +    System.out.println("Recovering");
    +
         map1 = clonedMap1;
         map1.getStore().setup(testMeta.operatorContext);
         map1.setup(testMeta.operatorContext);
    --- End diff --
    
    If activation window = -1, then ManagedState will not care about the data saved after the activation window


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by ilooner <gi...@git.apache.org>.
Github user ilooner commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r72124180
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,262 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  private boolean isRunning = false;
    +  private boolean isInWindow = false;
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    --- End diff --
    
    No it shouldn't. That error existed before but I fixed it with my latest commits. Do you have the latest? If so I'll check tonight to see if I can reproduce it in my tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chandnisingh <gi...@git.apache.org>.
Github user chandnisingh commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r73076627
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java ---
    @@ -0,0 +1,33 @@
    +/**
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import org.apache.apex.malhar.lib.state.BucketedState;
    +
    +import com.datatorrent.api.Component;
    +import com.datatorrent.api.Context;
    +import com.datatorrent.api.Operator;
    +
    +public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
    --- End diff --
    
    Maybe I am missing something but please bear with my questions:
    1. Why should a WindowedOperator not have any knowledge of SpillableComponents?
    2. Let's say if above is the case, then what is going to be the API of WindowedOperator with respect to storage. If WindowedOperator is specifying the contract for a pluggable storage than an implementation of SpillableStateStore can implement that and therefore can be set on WindowedOperator. However,  I think SpillableStateStore  itself can be that contract. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] apex-malhar pull request #324: Spillable Datastructures PR for review only

Posted by chaithu14 <gi...@git.apache.org>.
Github user chaithu14 commented on a diff in the pull request:

    https://github.com/apache/apex-malhar/pull/324#discussion_r68926798
  
    --- Diff: library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java ---
    @@ -0,0 +1,241 @@
    +package org.apache.apex.malhar.lib.state.spillable;
    +
    +import java.util.Collection;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +
    +import javax.annotation.Nullable;
    +import javax.validation.constraints.NotNull;
    +
    +import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
    +import org.apache.apex.malhar.lib.utils.serde.Serde;
    +import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
    +import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
    +
    +import com.google.common.base.Preconditions;
    +import com.google.common.collect.Multimap;
    +import com.google.common.collect.Multiset;
    +
    +import com.datatorrent.api.Context;
    +import com.datatorrent.netlet.util.Slice;
    +
    +/**
    + * Created by tfarkas on 6/12/16.
    + */
    +public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
    +    Spillable.SpillableComponent
    +{
    +  public static final int DEFAULT_BATCH_SIZE = 1000;
    +  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
    +
    +  private int batchSize = DEFAULT_BATCH_SIZE;
    +
    +  private WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
    +
    +  @NotNull
    +  private SpillableByteMapImpl<byte[], Integer> map;
    +
    +  private SpillableStateStore store;
    +  private byte[] identifier;
    +  private long bucket;
    +  private Serde<K, Slice> serdeKey;
    +  private Serde<V, Slice> serdeValue;
    +
    +  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
    +      Serde<K, Slice> serdeKey,
    +      Serde<V, Slice> serdeValue)
    +  {
    +    this.store = Preconditions.checkNotNull(store);
    +    this.identifier = Preconditions.checkNotNull(identifier);
    +    this.bucket = bucket;
    +    this.serdeKey = Preconditions.checkNotNull(serdeKey);
    +    this.serdeValue = Preconditions.checkNotNull(serdeValue);
    +
    +    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
    +  }
    +
    +  @Override
    +  public List<V> get(@Nullable K key)
    +  {
    +    return getHelper(key);
    +  }
    +
    --- End diff --
    
    I think, we need to provide the asynchronous get method also.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---