You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/08/15 19:55:02 UTC

[2/3] apex-malhar git commit: APEXMALHAR-2048 Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
new file mode 100644
index 0000000..2a7947d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeListSlice.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes lists.
+ */
+@InterfaceStability.Evolving
+public class SerdeListSlice<T> implements Serde<List<T>, Slice>
+{
+  @NotNull
+  private Serde<T, Slice> serde;
+
+  private SerdeListSlice()
+  {
+    // for Kryo
+  }
+
+  /**
+   * Creates a {@link SerdeListSlice}.
+   * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
+   */
+  public SerdeListSlice(@NotNull Serde<T, Slice> serde)
+  {
+    this.serde = Preconditions.checkNotNull(serde);
+  }
+
+  @Override
+  public Slice serialize(List<T> objects)
+  {
+    Slice[] slices = new Slice[objects.size()];
+
+    int size = 4;
+
+    for (int index = 0; index < objects.size(); index++) {
+      Slice slice = serde.serialize(objects.get(index));
+      slices[index] = slice;
+      size += slice.length;
+    }
+
+    byte[] bytes = new byte[size];
+    int offset = 0;
+
+    byte[] sizeBytes = GPOUtils.serializeInt(objects.size());
+    System.arraycopy(sizeBytes, 0, bytes, offset, 4);
+    offset += 4;
+
+    for (int index = 0; index < slices.length; index++) {
+      Slice slice = slices[index];
+      System.arraycopy(slice.buffer, slice.offset, bytes, offset, slice.length);
+      offset += slice.length;
+    }
+
+    return new Slice(bytes);
+  }
+
+  @Override
+  public List<T> deserialize(Slice slice, MutableInt offset)
+  {
+    MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue());
+
+    int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset);
+    List<T> list = Lists.newArrayListWithCapacity(numElements);
+    sliceOffset.subtract(slice.offset);
+
+    for (int index = 0; index < numElements; index++) {
+      T object = serde.deserialize(slice, sliceOffset);
+      list.add(object);
+    }
+
+    offset.setValue(sliceOffset.intValue());
+    return list;
+  }
+
+  @Override
+  public List<T> deserialize(Slice slice)
+  {
+    return deserialize(slice, new MutableInt(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
new file mode 100644
index 0000000..80ee597
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An implementation of {@link Serde} which serializes and deserializes {@link String}s.
+ */
+@InterfaceStability.Evolving
+public class SerdeStringSlice implements Serde<String, Slice>
+{
+  @Override
+  public Slice serialize(String object)
+  {
+    return new Slice(GPOUtils.serializeString(object));
+  }
+
+  @Override
+  public String deserialize(Slice object, MutableInt offset)
+  {
+    offset.add(object.offset);
+    String string = GPOUtils.deserializeString(object.buffer, offset);
+    offset.subtract(object.offset);
+    return string;
+  }
+
+  @Override
+  public String deserialize(Slice object)
+  {
+    return deserialize(object, new MutableInt(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
new file mode 100644
index 0000000..b6a61f4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A utility class which contains static methods for manipulating byte arrays and {@link Slice}s
+ */
+@InterfaceStability.Evolving
+public class SliceUtils
+{
+  private SliceUtils()
+  {
+  }
+
+  /**
+   * Concatenates two byte arrays.
+   * @param a The first byte array to concatenate.
+   * @param b The second byte array to concatenate.
+   * @return The concatenated byte arrays.
+   */
+  public static byte[] concatenate(byte[] a, byte[] b)
+  {
+    byte[] output = new byte[a.length + b.length];
+
+    System.arraycopy(a, 0, output, 0, a.length);
+    System.arraycopy(b, 0, output, a.length, b.length);
+    return output;
+  }
+
+  /**
+   * Concatenates two {@link Slice}s
+   * @param a The first {@link Slice} to concatenate.
+   * @param b The second {@link Slice} to concatenate.
+   * @return The concatenated {@link Slice}.
+   */
+  public static Slice concatenate(Slice a, Slice b)
+  {
+    int size = a.length + b.length;
+    byte[] bytes = new byte[size];
+
+    System.arraycopy(a.buffer, a.offset, bytes, 0, a.length);
+    System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
+
+    return new Slice(bytes);
+  }
+
+  /**
+   * Concatenates a byte array with the contents of a {@link Slice}.
+   * @param a The byte array to concatenate. The contents of the byte array appear first in the concatenation.
+   * @param b The {@link Slice} to concatenate a byte array with.
+   * @return A {@link Slice} whose contents are the concatenation of the input byte array and {@link Slice}.
+   */
+  public static Slice concatenate(byte[] a, Slice b)
+  {
+    int size = a.length + b.length;
+    byte[] bytes = new byte[size];
+
+    System.arraycopy(a, 0, bytes, 0, a.length);
+    System.arraycopy(b.buffer, b.offset, bytes, a.length, b.length);
+
+    return new Slice(bytes);
+  }
+
+  /**
+   * Concatenates a byte array with the contents of a {@link Slice}.
+   * @param a The byte array to concatenate.
+   * @param b The {@link Slice} to concatenate a byte array with. The contents of the {@link Slice} appear first in the
+   * concatenation.
+   * @return A {@link Slice} whose contents are the concatenation of the input byte array and {@link Slice}.
+   */
+  public static Slice concatenate(Slice a, byte[] b)
+  {
+    int size = a.length + b.length;
+    byte[] bytes = new byte[size];
+
+    System.arraycopy(a.buffer, a.offset, bytes, 0, a.length);
+    System.arraycopy(b, 0, bytes, a.length, b.length);
+
+    return new Slice(bytes);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
index f3b2140..673054b 100644
--- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
+++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
@@ -26,15 +26,40 @@ import org.junit.runner.Description;
 
 import org.apache.commons.io.FileUtils;
 
+import com.google.common.base.Preconditions;
+
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.OutputPort;
 import com.datatorrent.api.Sink;
 import com.datatorrent.api.Stats;
 import com.datatorrent.api.StatsListener;
+import com.datatorrent.netlet.util.Slice;
 
 public class TestUtils
 {
+  public static byte[] getByte(int val)
+  {
+    Preconditions.checkArgument(val <= Byte.MAX_VALUE);
+    return new byte[]{(byte)val};
+  }
+
+  public static byte[] getBytes(int val)
+  {
+    byte[] bytes = new byte[4];
+    bytes[0] = (byte)(val & 0xFF);
+    bytes[1] = (byte)((val >> 8) & 0xFF);
+    bytes[2] = (byte)((val >> 16) & 0xFF);
+    bytes[3] = (byte)((val >> 24) & 0xFF);
+
+    return bytes;
+  }
+
+  public static Slice getSlice(int val)
+  {
+    return new Slice(getBytes(val));
+  }
+
   public static class TestInfo extends TestWatcher
   {
     public org.junit.runner.Description desc;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
index d3564ba..c57e0ca 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
@@ -44,7 +44,8 @@ import com.datatorrent.netlet.util.Slice;
 
 public class ManagedStateTestUtils
 {
-  static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue> unsavedBucket,
+  public static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue>
+      unsavedBucket,
       int keysPerTimeBucket) throws IOException
   {
     RemoteIterator<LocatedFileStatus> iterator = fileAccess.listFiles(bucketId);
@@ -82,7 +83,7 @@ public class ManagedStateTestUtils
     Assert.assertEquals("data of bucket" + bucketId, testBucket, fromDisk);
   }
 
-  static Map<Long, Map<Slice, Bucket.BucketedValue>> getTestData(int startBucket, int endBucket, int keyStart)
+  public static Map<Long, Map<Slice, Bucket.BucketedValue>> getTestData(int startBucket, int endBucket, int keyStart)
   {
     Map<Long, Map<Slice, Bucket.BucketedValue>> data = Maps.newHashMap();
     for (int i = startBucket; i < endBucket; i++) {
@@ -92,7 +93,7 @@ public class ManagedStateTestUtils
     return data;
   }
 
-  static Map<Slice, Bucket.BucketedValue> getTestBucketData(int keyStart, long timeBucketStart)
+  public static Map<Slice, Bucket.BucketedValue> getTestBucketData(int keyStart, long timeBucketStart)
   {
     Map<Slice, Bucket.BucketedValue> bucketData = Maps.newHashMap();
     for (int j = 0; j < 5; j++) {
@@ -103,14 +104,14 @@ public class ManagedStateTestUtils
     return bucketData;
   }
 
-  static Context.OperatorContext getOperatorContext(int operatorId, String applicationPath)
+  public static Context.OperatorContext getOperatorContext(int operatorId, String applicationPath)
   {
     Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
     attributes.put(DAG.APPLICATION_PATH, applicationPath);
     return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes);
   }
 
-  static Context.OperatorContext getOperatorContext(int operatorId)
+  public static Context.OperatorContext getOperatorContext(int operatorId)
   {
     Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
     return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes);
@@ -118,7 +119,7 @@ public class ManagedStateTestUtils
 
   private static final transient Logger LOG = LoggerFactory.getLogger(ManagedStateTestUtils.class);
 
-  static Slice getSliceFor(String x)
+  public static Slice getSliceFor(String x)
   {
     return new Slice(x.getBytes());
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java
deleted file mode 100644
index d73bd95..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemMultisetTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.apex.malhar.lib.state.spillable.inmem.InMemMultiset;
-
-import com.esotericsoftware.kryo.Kryo;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-
-public class InMemMultisetTest
-{
-  @Test
-  public void serializationTest()
-  {
-    InMemMultiset<String> set = new InMemMultiset<>();
-
-    set.add("a");
-    set.add("a");
-
-    InMemMultiset<String> cloned = KryoCloneUtils.cloneObject(new Kryo(), set);
-
-    Assert.assertEquals(2, cloned.count("a"));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java
deleted file mode 100644
index 573982a..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableArrayListTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableArrayList;
-
-import com.esotericsoftware.kryo.Kryo;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-
-public class InMemSpillableArrayListTest
-{
-  @Test
-  public void serializationTest()
-  {
-    InMemSpillableArrayList<String> list = new InMemSpillableArrayList<>();
-
-    list.add("a");
-    list.add("a");
-
-    InMemSpillableArrayList<String> cloned = KryoCloneUtils.cloneObject(new Kryo(), list);
-
-    Assert.assertEquals(2, list.size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java
deleted file mode 100644
index a6bf811..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/spillable/inmem/InMemSpillableByteArrayListMultimapTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.managed.spillable.inmem;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableByteArrayListMultimap;
-
-import com.esotericsoftware.kryo.Kryo;
-
-import com.datatorrent.lib.util.KryoCloneUtils;
-
-public class InMemSpillableByteArrayListMultimapTest
-{
-  @Test
-  public void serializationTest()
-  {
-    InMemSpillableByteArrayListMultimap<String, String> multimap =
-        new InMemSpillableByteArrayListMultimap<String, String>();
-
-    multimap.put("a", "b");
-    multimap.put("a", "c");
-
-    InMemSpillableByteArrayListMultimap<String, String> cloned = KryoCloneUtils.cloneObject(new Kryo(), multimap);
-
-    Assert.assertEquals(2, cloned.get("a").size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java
new file mode 100644
index 0000000..f40ae87
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGeneratorTest.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.util.TestUtils;
+
+public class SequentialSpillableIdentifierGeneratorTest
+{
+  @Test
+  public void dontAllowRegistrationAfterNextCallTest()
+  {
+    SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator();
+
+    gen.next();
+
+    boolean exception = false;
+
+    try {
+      gen.register(TestUtils.getByte(1));
+    } catch (Exception e) {
+      exception = true;
+    }
+
+    Assert.assertTrue(exception);
+  }
+
+  @Test
+  public void simpleSequentialIdGenerationTest()
+  {
+    SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator();
+
+    for (int index = 0; index < (((int)Byte.MAX_VALUE) + 1); index++) {
+      byte[] id = gen.next();
+
+      checkId(index, id);
+    }
+
+    boolean threwException = false;
+
+    try {
+      gen.next();
+    } catch (Exception e) {
+      threwException = true;
+    }
+
+    Assert.assertTrue(threwException);
+  }
+
+  @Test
+  public void registerFirst()
+  {
+    SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator();
+    gen.register(TestUtils.getByte(0));
+
+    byte[] id = gen.next();
+
+    Assert.assertArrayEquals(TestUtils.getByte(1), id);
+  }
+
+  @Test
+  public void registerLast()
+  {
+    SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator();
+    gen.register(TestUtils.getByte(Byte.MAX_VALUE));
+
+    for (int index = 0; index <= (((int)Byte.MAX_VALUE) - 1); index++) {
+      byte[] id = gen.next();
+
+      checkId(index, id);
+    }
+
+    boolean threwException = false;
+
+    try {
+      gen.next();
+    } catch (Exception e) {
+      threwException = true;
+    }
+
+    Assert.assertTrue(threwException);
+  }
+
+  @Test
+  public void intermingledRegistered()
+  {
+    SequentialSpillableIdentifierGenerator gen = new SequentialSpillableIdentifierGenerator();
+
+    gen.register(TestUtils.getByte(1));
+    gen.register(TestUtils.getByte(2));
+    gen.register(TestUtils.getByte(5));
+    gen.register(TestUtils.getByte(7));
+
+    checkId(0, gen.next());
+    checkId(3, gen.next());
+    checkId(4, gen.next());
+    checkId(6, gen.next());
+    checkId(8, gen.next());
+    checkId(9, gen.next());
+  }
+
+  private void checkId(int val, byte[] id)
+  {
+    Assert.assertEquals(1, id.length);
+    Assert.assertEquals(val, id[0]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
new file mode 100644
index 0000000..af05c88
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
@@ -0,0 +1,594 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class SpillableArrayListImplTest
+{
+  public static final byte[] ID1 = new byte[]{(byte)0};
+  public static final byte[] ID2 = new byte[]{(byte)1};
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+  @Test
+  public void simpleAddGetAndSetTest1()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleAddGetAndSetTest1Helper(store);
+  }
+
+  @Test
+  public void simpleAddGetAndSetManagedStateTest1()
+  {
+    simpleAddGetAndSetTest1Helper(testMeta.store);
+  }
+
+  public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
+  {
+    SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
+        new SerdeStringSlice(), 1);
+
+    store.setup(testMeta.operatorContext);
+    list.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    SpillableTestUtils.checkOutOfBounds(list, 0);
+    Assert.assertEquals(0, list.size());
+
+    list.add("a");
+
+    SpillableTestUtils.checkOutOfBounds(list, 1);
+    Assert.assertEquals(1, list.size());
+
+    Assert.assertEquals("a", list.get(0));
+
+    list.addAll(Lists.newArrayList("a", "b", "c"));
+
+    Assert.assertEquals(4, list.size());
+
+    Assert.assertEquals("a", list.get(0));
+    Assert.assertEquals("a", list.get(1));
+    Assert.assertEquals("b", list.get(2));
+    Assert.assertEquals("c", list.get(3));
+
+    SpillableTestUtils.checkOutOfBounds(list, 4);
+
+    list.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("a"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
+
+    Assert.assertEquals(4, list.size());
+
+    Assert.assertEquals("a", list.get(0));
+    Assert.assertEquals("a", list.get(1));
+    Assert.assertEquals("b", list.get(2));
+    Assert.assertEquals("c", list.get(3));
+
+    list.add("tt");
+    list.add("ab");
+    list.add("99");
+    list.add("oo");
+
+    Assert.assertEquals("tt", list.get(4));
+    Assert.assertEquals("ab", list.get(5));
+    Assert.assertEquals("99", list.get(6));
+    Assert.assertEquals("oo", list.get(7));
+
+    list.set(1, "111");
+
+    Assert.assertEquals("a", list.get(0));
+    Assert.assertEquals("111", list.get(1));
+    Assert.assertEquals("b", list.get(2));
+    Assert.assertEquals("c", list.get(3));
+    Assert.assertEquals("tt", list.get(4));
+    Assert.assertEquals("ab", list.get(5));
+    Assert.assertEquals("99", list.get(6));
+    Assert.assertEquals("oo", list.get(7));
+
+    list.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("111"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 4, Lists.newArrayList("tt"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 5, Lists.newArrayList("ab"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 6, Lists.newArrayList("99"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 7, Lists.newArrayList("oo"));
+
+    list.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    list.teardown();
+    store.teardown();
+  }
+
+  @Test
+  public void simpleAddGetAndSetTest3()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleAddGetAndSetTest3Helper(store);
+  }
+
+  @Test
+  public void simpleAddGetAndSetManagedStateTest3()
+  {
+    simpleAddGetAndSetTest3Helper(testMeta.store);
+  }
+
+  private void simpleAddGetAndSetTest3Helper(SpillableStateStore store)
+  {
+    SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
+        new SerdeStringSlice(), 3);
+
+    store.setup(testMeta.operatorContext);
+    list.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    SpillableTestUtils.checkOutOfBounds(list, 0);
+    Assert.assertEquals(0, list.size());
+
+    list.add("a");
+
+    SpillableTestUtils.checkOutOfBounds(list, 1);
+    Assert.assertEquals(1, list.size());
+
+    Assert.assertEquals("a", list.get(0));
+
+    list.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
+
+    Assert.assertEquals(8, list.size());
+
+    Assert.assertEquals("a", list.get(0));
+    Assert.assertEquals("a", list.get(1));
+    Assert.assertEquals("b", list.get(2));
+    Assert.assertEquals("c", list.get(3));
+    Assert.assertEquals("d", list.get(4));
+    Assert.assertEquals("e", list.get(5));
+    Assert.assertEquals("f", list.get(6));
+    Assert.assertEquals("g", list.get(7));
+
+    SpillableTestUtils.checkOutOfBounds(list, 20);
+
+    list.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "a", "b"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("c", "d", "e"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g"));
+
+    Assert.assertEquals(8, list.size());
+
+    Assert.assertEquals("a", list.get(0));
+    Assert.assertEquals("a", list.get(1));
+    Assert.assertEquals("b", list.get(2));
+    Assert.assertEquals("c", list.get(3));
+    Assert.assertEquals("d", list.get(4));
+    Assert.assertEquals("e", list.get(5));
+    Assert.assertEquals("f", list.get(6));
+    Assert.assertEquals("g", list.get(7));
+
+    list.add("tt");
+    list.add("ab");
+    list.add("99");
+    list.add("oo");
+
+    Assert.assertEquals("tt", list.get(8));
+    Assert.assertEquals("ab", list.get(9));
+    Assert.assertEquals("99", list.get(10));
+    Assert.assertEquals("oo", list.get(11));
+
+    list.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "a", "b"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("c", "d", "e"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g", "tt"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("ab", "99", "oo"));
+
+    list.set(1, "111");
+    list.set(3, "222");
+    list.set(5, "333");
+    list.set(11, "444");
+
+    Assert.assertEquals("a", list.get(0));
+    Assert.assertEquals("111", list.get(1));
+    Assert.assertEquals("b", list.get(2));
+    Assert.assertEquals("222", list.get(3));
+    Assert.assertEquals("d", list.get(4));
+    Assert.assertEquals("333", list.get(5));
+    Assert.assertEquals("f", list.get(6));
+    Assert.assertEquals("g", list.get(7));
+    Assert.assertEquals("tt", list.get(8));
+    Assert.assertEquals("ab", list.get(9));
+    Assert.assertEquals("99", list.get(10));
+    Assert.assertEquals("444", list.get(11));
+
+    list.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a", "111", "b"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("222", "d", "333"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("f", "g", "tt"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("ab", "99", "444"));
+
+    list.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    list.teardown();
+    store.teardown();
+  }
+
+  @Test
+  public void simpleMultiListTest()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleMultiListTestHelper(store);
+  }
+
+  @Test
+  public void simpleMultiListManagedStateTest()
+  {
+    simpleMultiListTestHelper(testMeta.store);
+  }
+
+  public void simpleMultiListTestHelper(SpillableStateStore store)
+  {
+    SpillableArrayListImpl<String> list1 = new SpillableArrayListImpl<>(0L, ID1, store,
+        new SerdeStringSlice(), 1);
+
+    SpillableArrayListImpl<String> list2 = new SpillableArrayListImpl<>(0L, ID2, store,
+        new SerdeStringSlice(), 1);
+
+    store.setup(testMeta.operatorContext);
+    list1.setup(testMeta.operatorContext);
+    list2.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    list1.beginWindow(windowId);
+    list2.beginWindow(windowId);
+
+    SpillableTestUtils.checkOutOfBounds(list1, 0);
+    Assert.assertEquals(0, list1.size());
+
+    list1.add("a");
+
+    SpillableTestUtils.checkOutOfBounds(list2, 0);
+
+    list2.add("2a");
+
+    SpillableTestUtils.checkOutOfBounds(list1, 1);
+    SpillableTestUtils.checkOutOfBounds(list2, 1);
+
+    Assert.assertEquals(1, list1.size());
+    Assert.assertEquals(1, list2.size());
+
+    Assert.assertEquals("a", list1.get(0));
+    Assert.assertEquals("2a", list2.get(0));
+
+    list1.addAll(Lists.newArrayList("a", "b", "c"));
+    list2.addAll(Lists.newArrayList("2a", "2b"));
+
+    Assert.assertEquals(4, list1.size());
+    Assert.assertEquals(3, list2.size());
+
+    Assert.assertEquals("a", list1.get(0));
+    Assert.assertEquals("a", list1.get(1));
+    Assert.assertEquals("b", list1.get(2));
+    Assert.assertEquals("c", list1.get(3));
+
+    Assert.assertEquals("2a", list2.get(0));
+    Assert.assertEquals("2a", list2.get(1));
+    Assert.assertEquals("2b", list2.get(2));
+
+    SpillableTestUtils.checkOutOfBounds(list1, 4);
+    SpillableTestUtils.checkOutOfBounds(list2, 3);
+
+    list1.endWindow();
+    list2.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("a"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
+
+    SpillableTestUtils.checkValue(store, 0L, ID2, 0, Lists.newArrayList("2a"));
+    SpillableTestUtils.checkValue(store, 0L, ID2, 1, Lists.newArrayList("2a"));
+    SpillableTestUtils.checkValue(store, 0L, ID2, 2, Lists.newArrayList("2b"));
+
+    windowId++;
+    store.beginWindow(windowId);
+    list1.beginWindow(windowId);
+    list2.beginWindow(windowId);
+
+    Assert.assertEquals(4, list1.size());
+    Assert.assertEquals(3, list2.size());
+
+    Assert.assertEquals("a", list1.get(0));
+    Assert.assertEquals("a", list1.get(1));
+    Assert.assertEquals("b", list1.get(2));
+    Assert.assertEquals("c", list1.get(3));
+
+    Assert.assertEquals("2a", list2.get(0));
+    Assert.assertEquals("2a", list2.get(1));
+    Assert.assertEquals("2b", list2.get(2));
+
+    list1.add("tt");
+    list1.add("ab");
+    list1.add("99");
+    list1.add("oo");
+
+    list2.add("2tt");
+    list2.add("2ab");
+
+    Assert.assertEquals("tt", list1.get(4));
+    Assert.assertEquals("ab", list1.get(5));
+    Assert.assertEquals("99", list1.get(6));
+    Assert.assertEquals("oo", list1.get(7));
+
+    Assert.assertEquals("2tt", list2.get(3));
+    Assert.assertEquals("2ab", list2.get(4));
+
+    list1.set(1, "111");
+    list2.set(1, "2111");
+
+    Assert.assertEquals("a", list1.get(0));
+    Assert.assertEquals("111", list1.get(1));
+    Assert.assertEquals("b", list1.get(2));
+    Assert.assertEquals("c", list1.get(3));
+    Assert.assertEquals("tt", list1.get(4));
+    Assert.assertEquals("ab", list1.get(5));
+    Assert.assertEquals("99", list1.get(6));
+    Assert.assertEquals("oo", list1.get(7));
+
+    Assert.assertEquals("2a", list2.get(0));
+    Assert.assertEquals("2111", list2.get(1));
+    Assert.assertEquals("2b", list2.get(2));
+    Assert.assertEquals("2tt", list2.get(3));
+    Assert.assertEquals("2ab", list2.get(4));
+
+    list1.endWindow();
+    list2.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    list1.beginWindow(windowId);
+    list2.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, ID1, 0, Lists.newArrayList("a"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 1, Lists.newArrayList("111"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 2, Lists.newArrayList("b"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 3, Lists.newArrayList("c"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 4, Lists.newArrayList("tt"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 5, Lists.newArrayList("ab"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 6, Lists.newArrayList("99"));
+    SpillableTestUtils.checkValue(store, 0L, ID1, 7, Lists.newArrayList("oo"));
+
+    SpillableTestUtils.checkValue(store, 0L, ID2, 0, Lists.newArrayList("2a"));
+    SpillableTestUtils.checkValue(store, 0L, ID2, 1, Lists.newArrayList("2111"));
+    SpillableTestUtils.checkValue(store, 0L, ID2, 2, Lists.newArrayList("2b"));
+    SpillableTestUtils.checkValue(store, 0L, ID2, 3, Lists.newArrayList("2tt"));
+    SpillableTestUtils.checkValue(store, 0L, ID2, 4, Lists.newArrayList("2ab"));
+
+    list1.endWindow();
+    list2.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    list1.teardown();
+    list2.teardown();
+    store.teardown();
+  }
+
+  @Test
+  public void recoveryManagedStateTest()
+  {
+    SpillableStateStore store = testMeta.store;
+
+    SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
+        new SerdeStringSlice(), 3);
+
+    store.setup(testMeta.operatorContext);
+    list.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    SpillableTestUtils.checkOutOfBounds(list, 0);
+
+    list.add("a");
+    list.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
+
+    Assert.assertEquals(8, list.size());
+
+    list.endWindow();
+    store.endWindow();
+
+    windowId++;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    list.add("tt");
+    list.add("ab");
+    list.add("99");
+    list.add("oo");
+
+    list.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    list.set(1, "111");
+    list.set(3, "222");
+    list.set(5, "333");
+    list.set(11, "444");
+
+    list.endWindow();
+    store.endWindow();
+
+    windowId++;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    list.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    long activationWindow = windowId;
+    SpillableArrayListImpl<String> clonedList = KryoCloneUtils.cloneObject(list);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    list.set(1, "111111");
+    list.set(3, "222222");
+    list.add("xyz");
+
+    list.endWindow();
+    store.endWindow();
+
+    list.teardown();
+    store.teardown();
+
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+    attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow);
+    Context.OperatorContext context =
+        new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+
+    list = clonedList;
+    store = clonedList.getStore();
+
+    store.setup(context);
+    list.setup(context);
+
+    windowId = activationWindow + 1L;
+    store.beginWindow(windowId);
+    list.beginWindow(windowId);
+
+    Assert.assertEquals("a", list.get(0));
+    Assert.assertEquals("111", list.get(1));
+    Assert.assertEquals("b", list.get(2));
+    Assert.assertEquals("222", list.get(3));
+    Assert.assertEquals("d", list.get(4));
+    Assert.assertEquals("333", list.get(5));
+    Assert.assertEquals("f", list.get(6));
+    Assert.assertEquals("g", list.get(7));
+    Assert.assertEquals("tt", list.get(8));
+    Assert.assertEquals("ab", list.get(9));
+    Assert.assertEquals("99", list.get(10));
+    Assert.assertEquals("444", list.get(11));
+    Assert.assertEquals(12, list.size());
+
+    list.endWindow();
+    store.endWindow();
+
+    list.teardown();
+    store.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
new file mode 100644
index 0000000..42d7d20
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImplTest.java
@@ -0,0 +1,341 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class SpillableByteArrayListMultimapImplTest
+{
+  public static final byte[] ID1 = new byte[]{(byte)0};
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+  @Test
+  public void simpleMultiKeyTest()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleMultiKeyTestHelper(store);
+  }
+
+  @Test
+  public void simpleMultiKeyManagedStateTest()
+  {
+    simpleMultiKeyTestHelper(testMeta.store);
+  }
+
+  public void simpleMultiKeyTestHelper(SpillableStateStore store)
+  {
+    SpillableByteArrayListMultimapImpl<String, String> map =
+        new SpillableByteArrayListMultimapImpl<String, String>(store, ID1, 0L, new SerdeStringSlice(),
+        new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long nextWindowId = 0L;
+    nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+    nextWindowId++;
+
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(1, map.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    nextWindowId = simpleMultiKeyTestHelper(store, map, "b", nextWindowId);
+    nextWindowId++;
+
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(2, map.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    simpleMultiKeyTestHelper(store, map, "c", nextWindowId);
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(3, map.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    map.teardown();
+    store.teardown();
+  }
+
+  public long simpleMultiKeyTestHelper(SpillableStateStore store,
+      SpillableByteArrayListMultimapImpl<String, String> map, String key, long nextWindowId)
+  {
+    SerdeStringSlice serdeString = new SerdeStringSlice();
+    SerdeIntSlice serdeInt = new SerdeIntSlice();
+
+    Slice keySlice = serdeString.serialize(key);
+
+    byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertNull(map.get(key));
+
+    Assert.assertFalse(map.containsKey(key));
+
+    map.put(key, "a");
+
+    Assert.assertTrue(map.containsKey(key));
+
+    List<String> list1 = map.get(key);
+    Assert.assertEquals(1, list1.size());
+
+    Assert.assertEquals("a", list1.get(0));
+
+    list1.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
+
+    Assert.assertEquals(8, list1.size());
+
+    Assert.assertEquals("a", list1.get(0));
+    Assert.assertEquals("a", list1.get(1));
+    Assert.assertEquals("b", list1.get(2));
+    Assert.assertEquals("c", list1.get(3));
+    Assert.assertEquals("d", list1.get(4));
+    Assert.assertEquals("e", list1.get(5));
+    Assert.assertEquals("f", list1.get(6));
+    Assert.assertEquals("g", list1.get(7));
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    SpillableTestUtils.checkValue(store, 0L,
+        SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 8, 0, serdeInt);
+
+    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e",
+        "f", "g"));
+
+    List<String> list2 = map.get(key);
+
+    Assert.assertEquals(8, list2.size());
+
+    Assert.assertEquals("a", list2.get(0));
+    Assert.assertEquals("a", list2.get(1));
+    Assert.assertEquals("b", list2.get(2));
+    Assert.assertEquals("c", list2.get(3));
+    Assert.assertEquals("d", list2.get(4));
+    Assert.assertEquals("e", list2.get(5));
+    Assert.assertEquals("f", list2.get(6));
+    Assert.assertEquals("g", list2.get(7));
+
+    list2.add("tt");
+    list2.add("ab");
+    list2.add("99");
+    list2.add("oo");
+
+    Assert.assertEquals("tt", list2.get(8));
+    Assert.assertEquals("ab", list2.get(9));
+    Assert.assertEquals("99", list2.get(10));
+    Assert.assertEquals("oo", list2.get(11));
+
+    Assert.assertEquals(12, list2.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(12, list2.size());
+
+    SpillableTestUtils.checkValue(store, 0L,
+        SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt);
+
+    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e",
+        "f", "g", "tt", "ab", "99", "oo"));
+
+    List<String> list3 = map.get(key);
+
+    list3.set(1, "111");
+    list3.set(3, "222");
+    list3.set(5, "333");
+    list3.set(11, "444");
+
+    Assert.assertEquals("a", list3.get(0));
+    Assert.assertEquals("111", list3.get(1));
+    Assert.assertEquals("b", list3.get(2));
+    Assert.assertEquals("222", list3.get(3));
+    Assert.assertEquals("d", list3.get(4));
+    Assert.assertEquals("333", list3.get(5));
+    Assert.assertEquals("f", list3.get(6));
+    Assert.assertEquals("g", list3.get(7));
+    Assert.assertEquals("tt", list3.get(8));
+    Assert.assertEquals("ab", list3.get(9));
+    Assert.assertEquals("99", list3.get(10));
+    Assert.assertEquals("444", list3.get(11));
+
+    Assert.assertEquals(12, list2.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    SpillableTestUtils.checkValue(store, 0L,
+        SliceUtils.concatenate(keyBytes, SpillableByteArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt);
+
+    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d", "333",
+        "f", "g", "tt", "ab", "99", "444"));
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(nextWindowId);
+
+    return nextWindowId;
+  }
+
+  @Test
+  public void recoveryTestWithManagedState()
+  {
+    SpillableStateStore store = testMeta.store;
+
+    SpillableByteArrayListMultimapImpl<String, String> map =
+        new SpillableByteArrayListMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long nextWindowId = 0L;
+    nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+    long activationWindow = nextWindowId;
+    nextWindowId++;
+
+    SpillableByteArrayListMultimapImpl<String, String> clonedMap = KryoCloneUtils.cloneObject(map);
+    store.checkpointed(nextWindowId);
+    store.committed(nextWindowId);
+
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    List<String> list1 = map.get("a");
+
+    Assert.assertEquals(12, list1.size());
+
+    Assert.assertEquals("a", list1.get(0));
+    Assert.assertEquals("111", list1.get(1));
+    Assert.assertEquals("b", list1.get(2));
+    Assert.assertEquals("222", list1.get(3));
+    Assert.assertEquals("d", list1.get(4));
+    Assert.assertEquals("333", list1.get(5));
+    Assert.assertEquals("f", list1.get(6));
+    Assert.assertEquals("g", list1.get(7));
+    Assert.assertEquals("tt", list1.get(8));
+    Assert.assertEquals("ab", list1.get(9));
+    Assert.assertEquals("99", list1.get(10));
+    Assert.assertEquals("444", list1.get(11));
+
+    list1.add("111");
+
+    Assert.assertEquals("a", list1.get(0));
+    Assert.assertEquals("111", list1.get(1));
+    Assert.assertEquals("b", list1.get(2));
+    Assert.assertEquals("222", list1.get(3));
+    Assert.assertEquals("d", list1.get(4));
+    Assert.assertEquals("333", list1.get(5));
+    Assert.assertEquals("f", list1.get(6));
+    Assert.assertEquals("g", list1.get(7));
+    Assert.assertEquals("tt", list1.get(8));
+    Assert.assertEquals("ab", list1.get(9));
+    Assert.assertEquals("99", list1.get(10));
+    Assert.assertEquals("444", list1.get(11));
+    Assert.assertEquals("111", list1.get(12));
+
+    Assert.assertEquals(13, list1.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    map.teardown();
+    store.teardown();
+
+    map = clonedMap;
+    store = map.getStore();
+
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+    attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow);
+    Context.OperatorContext context =
+        new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+
+    store.setup(context);
+    map.setup(context);
+
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    SerdeStringSlice serdeString = new SerdeStringSlice();
+    Slice keySlice = serdeString.serialize("a");
+    byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
+
+    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d",
+        "333", "f", "g", "tt", "ab", "99", "444"));
+
+    Assert.assertEquals(1, map.size());
+    Assert.assertEquals(12, map.get("a").size());
+
+    map.endWindow();
+    store.endWindow();
+
+    map.teardown();
+    store.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
new file mode 100644
index 0000000..63f7b79
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImplTest.java
@@ -0,0 +1,484 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class SpillableByteMapImplTest
+{
+  public static final byte[] ID1 = new byte[]{(byte)0};
+  public static final byte[] ID2 = new byte[]{(byte)1};
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+  @Test
+  public void simpleGetAndPutTest()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleGetAndPutTestHelper(store);
+  }
+
+  @Test
+  public void simpleGetAndPutManagedStateTest()
+  {
+    simpleGetAndPutTestHelper(testMeta.store);
+  }
+
+  private void simpleGetAndPutTestHelper(SpillableStateStore store)
+  {
+    SerdeStringSlice sss = new SerdeStringSlice();
+
+    SpillableByteMapImpl<String, String> map = new SpillableByteMapImpl<>(store, ID1, 0L,
+        new SerdeStringSlice(),
+        new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    Assert.assertEquals(0, map.size());
+
+    map.put("a", "1");
+    map.put("b", "2");
+    map.put("c", "3");
+
+    Assert.assertEquals(3, map.size());
+
+    Assert.assertEquals("1", map.get("a"));
+    Assert.assertEquals("2", map.get("b"));
+    Assert.assertEquals("3", map.get("c"));
+    Assert.assertEquals(null, map.get("d"));
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+    Assert.assertEquals(3, map.size());
+
+    Assert.assertEquals("1", map.get("a"));
+    Assert.assertEquals("2", map.get("b"));
+    Assert.assertEquals("3", map.get("c"));
+    Assert.assertEquals(null, map.get("d"));
+
+    map.put("d", "4");
+    map.put("e", "5");
+    map.put("f", "6");
+
+    Assert.assertEquals(6, map.size());
+
+    Assert.assertEquals("4", map.get("d"));
+    Assert.assertEquals("5", map.get("e"));
+    Assert.assertEquals("6", map.get("f"));
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, "3");
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    map.teardown();
+    store.teardown();
+  }
+
+  @Test
+  public void simpleRemoveTest()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleRemoveTestHelper(store);
+  }
+
+  @Test
+  public void simpleRemoveManagedStateTest()
+  {
+    simpleRemoveTestHelper(testMeta.store);
+  }
+
+  private void simpleRemoveTestHelper(SpillableStateStore store)
+  {
+    SerdeStringSlice sss = new SerdeStringSlice();
+
+    SpillableByteMapImpl<String, String> map = new SpillableByteMapImpl<>(store, ID1, 0L,
+        new SerdeStringSlice(),
+        new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    Assert.assertEquals(0, map.size());
+
+    map.put("a", "1");
+    map.put("b", "2");
+    map.put("c", "3");
+
+    Assert.assertEquals(3, map.size());
+
+    map.remove("b");
+    map.remove("c");
+
+    Assert.assertEquals("1", map.get("a"));
+    Assert.assertEquals(null, map.get("b"));
+    Assert.assertEquals(null, map.get("c"));
+    Assert.assertEquals(null, map.get("d"));
+
+    Assert.assertEquals(1, map.size());
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    Assert.assertEquals(1, map.size());
+
+    Assert.assertEquals("1", map.get("a"));
+    Assert.assertEquals(null, map.get("b"));
+    Assert.assertEquals(null, map.get("c"));
+    Assert.assertEquals(null, map.get("d"));
+
+    map.put("d", "4");
+    map.put("e", "5");
+    map.put("f", "6");
+
+    Assert.assertEquals(4, map.size());
+
+    Assert.assertEquals("4", map.get("d"));
+    Assert.assertEquals("5", map.get("e"));
+    Assert.assertEquals("6", map.get("f"));
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+    map.remove("a");
+    map.remove("d");
+    Assert.assertEquals(null, map.get("a"));
+    Assert.assertEquals(null, map.get("b"));
+    Assert.assertEquals(null, map.get("c"));
+    Assert.assertEquals(null, map.get("d"));
+    Assert.assertEquals("5", map.get("e"));
+    Assert.assertEquals("6", map.get("f"));
+    Assert.assertEquals(null, map.get("g"));
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, "4");
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "d", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "e", ID1, "5");
+    SpillableTestUtils.checkValue(store, 0L, "f", ID1, "6");
+    SpillableTestUtils.checkValue(store, 0L, "g", ID1, null);
+
+    map.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    map.teardown();
+    store.teardown();
+  }
+
+  @Test
+  public void multiMapPerBucketTest()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    multiMapPerBucketTestHelper(store);
+  }
+
+  @Test
+  public void multiMapPerBucketManagedStateTest()
+  {
+    multiMapPerBucketTestHelper(testMeta.store);
+  }
+
+  public void multiMapPerBucketTestHelper(SpillableStateStore store)
+  {
+    SerdeStringSlice sss = new SerdeStringSlice();
+
+    SpillableByteMapImpl<String, String> map1 = new SpillableByteMapImpl<>(store, ID1, 0L,
+        new SerdeStringSlice(),
+        new SerdeStringSlice());
+    SpillableByteMapImpl<String, String> map2 = new SpillableByteMapImpl<>(store, ID2, 0L,
+        new SerdeStringSlice(),
+        new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map1.setup(testMeta.operatorContext);
+    map2.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    map1.beginWindow(windowId);
+    map2.beginWindow(windowId);
+
+    map1.put("a", "1");
+
+    Assert.assertEquals("1", map1.get("a"));
+    Assert.assertEquals(null, map2.get("a"));
+
+    map2.put("a", "a1");
+
+    Assert.assertEquals("1", map1.get("a"));
+    Assert.assertEquals("a1", map2.get("a"));
+
+    map1.put("b", "2");
+    map2.put("c", "3");
+
+    Assert.assertEquals("1", map1.get("a"));
+    Assert.assertEquals("2", map1.get("b"));
+
+    Assert.assertEquals("a1", map2.get("a"));
+    Assert.assertEquals(null, map2.get("b"));
+    Assert.assertEquals("3", map2.get("c"));
+
+    map1.endWindow();
+    map2.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map1.beginWindow(windowId);
+    map2.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, "1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID1, "2");
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
+    SpillableTestUtils.checkValue(store, 0L, "b", ID2, null);
+    SpillableTestUtils.checkValue(store, 0L, "c", ID2, "3");
+
+    map1.remove("a");
+
+    Assert.assertEquals(null, map1.get("a"));
+    Assert.assertEquals("a1", map2.get("a"));
+
+    map1.endWindow();
+    map2.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    map1.beginWindow(windowId);
+    map2.beginWindow(windowId);
+
+    SpillableTestUtils.checkValue(store, 0L, "a", ID1, null);
+    SpillableTestUtils.checkValue(store, 0L, "a", ID2, "a1");
+
+    map1.endWindow();
+    map2.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+
+    map1.teardown();
+    map2.teardown();
+    store.teardown();
+  }
+
+  @Test
+  public void recoveryWithManagedStateTest() throws Exception
+  {
+    SerdeStringSlice sss = new SerdeStringSlice();
+
+    SpillableByteMapImpl<String, String> map1 = new SpillableByteMapImpl<>(testMeta.store, ID1, 0L,
+        new SerdeStringSlice(),
+        new SerdeStringSlice());
+
+    testMeta.store.setup(testMeta.operatorContext);
+    map1.setup(testMeta.operatorContext);
+
+    testMeta.store.beginWindow(0);
+    map1.beginWindow(0);
+    map1.put("x", "1");
+    map1.put("y", "2");
+    map1.put("z", "3");
+    map1.put("zz", "33");
+    Assert.assertEquals(4, map1.size());
+    map1.endWindow();
+    testMeta.store.endWindow();
+
+    testMeta.store.beginWindow(1);
+    map1.beginWindow(1);
+    Assert.assertEquals(4, map1.size());
+    map1.put("x", "4");
+    map1.put("y", "5");
+    map1.remove("zz");
+    Assert.assertEquals(3, map1.size());
+    map1.endWindow();
+    testMeta.store.endWindow();
+    testMeta.store.beforeCheckpoint(1);
+    testMeta.store.checkpointed(1);
+
+    SpillableByteMapImpl<String, String> clonedMap1 = KryoCloneUtils.cloneObject(map1);
+
+    testMeta.store.beginWindow(2);
+    map1.beginWindow(2);
+    Assert.assertEquals(3, map1.size());
+    map1.put("x", "6");
+    map1.put("y", "7");
+    map1.put("w", "8");
+    Assert.assertEquals(4, map1.size());
+    map1.endWindow();
+    testMeta.store.endWindow();
+
+    // simulating crash here
+    map1.teardown();
+    testMeta.store.teardown();
+
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+    attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 1L);
+    Context.OperatorContext context =
+        new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+
+    map1 = clonedMap1;
+    map1.getStore().setup(context);
+    map1.setup(testMeta.operatorContext);
+
+    map1.getStore().beginWindow(2);
+    map1.beginWindow(2);
+    Assert.assertEquals(3, map1.size());
+    Assert.assertEquals("4", map1.get("x"));
+    Assert.assertEquals("5", map1.get("y"));
+    Assert.assertEquals("3", map1.get("z"));
+    map1.endWindow();
+    map1.getStore().endWindow();
+
+    map1.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
new file mode 100644
index 0000000..67db6ba
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+public class SpillableComplexComponentImplTest
+{
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+  @Test
+  public void simpleIntegrationTest()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleIntegrationTestHelper(store);
+  }
+
+  @Test
+  public void simpleIntegrationManagedStateTest()
+  {
+    simpleIntegrationTestHelper(testMeta.store);
+  }
+  
+  public void simpleIntegrationTestHelper(SpillableStateStore store)
+  {
+    SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(store);
+
+    Spillable.SpillableComponent scList =
+        (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new SerdeStringSlice());
+    Spillable.SpillableComponent scMap =
+        (Spillable.SpillableComponent)sccImpl.newSpillableByteMap(0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+    sccImpl.setup(testMeta.operatorContext);
+
+    sccImpl.beginWindow(0L);
+
+    sccImpl.endWindow();
+
+    sccImpl.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
new file mode 100644
index 0000000..00ea58d
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This class contains utility methods that can be used by Spillable data structure unit tests.
+ */
+public class SpillableTestUtils
+{
+  public static SerdeStringSlice SERDE_STRING_SLICE = new SerdeStringSlice();
+  public static SerdeListSlice<String> SERDE_STRING_LIST_SLICE = new SerdeListSlice(new SerdeStringSlice());
+
+  private SpillableTestUtils()
+  {
+    //Shouldn't instantiate this
+  }
+
+  static class TestMeta extends TestWatcher
+  {
+    ManagedStateSpillableStateStore store;
+    Context.OperatorContext operatorContext;
+    String applicationPath;
+
+    @Override
+    protected void starting(Description description)
+    {
+      TestUtils.deleteTargetTestClassFolder(description);
+      store = new ManagedStateSpillableStateStore();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+      ((FileAccessFSImpl)store.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+      operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      TestUtils.deleteTargetTestClassFolder(description);
+    }
+  }
+
+  public static Slice getKeySlice(byte[] id, String key)
+  {
+    return SliceUtils.concatenate(id, SERDE_STRING_SLICE.serialize(key));
+  }
+
+  public static Slice getKeySlice(byte[] id, int index, String key)
+  {
+    return SliceUtils.concatenate(id,
+        SliceUtils.concatenate(GPOUtils.serializeInt(index),
+            SERDE_STRING_SLICE.serialize(key)));
+  }
+
+  public static void checkValue(SpillableStateStore store, long bucketId, String key,
+      byte[] prefix, String expectedValue)
+  {
+    checkValue(store, bucketId, SliceUtils.concatenate(prefix, SERDE_STRING_SLICE.serialize(key)).buffer,
+        expectedValue, 0, SERDE_STRING_SLICE);
+  }
+
+  public static void checkValue(SpillableStateStore store, long bucketId,
+      byte[] prefix, int index, List<String> expectedValue)
+  {
+    checkValue(store, bucketId, SliceUtils.concatenate(prefix, GPOUtils.serializeInt(index)), expectedValue, 0,
+        SERDE_STRING_LIST_SLICE);
+  }
+
+  public static <T> void  checkValue(SpillableStateStore store, long bucketId, byte[] bytes,
+      T expectedValue, int offset, Serde<T, Slice> serde)
+  {
+    Slice slice = store.getSync(bucketId, new Slice(bytes));
+
+    if (slice == null || slice.length == 0) {
+      if (expectedValue != null) {
+        Assert.assertEquals(expectedValue, slice);
+      } else {
+        return;
+      }
+    }
+
+    T string = serde.deserialize(slice, new MutableInt(offset));
+
+    Assert.assertEquals(expectedValue, string);
+  }
+
+  public static void checkOutOfBounds(SpillableArrayListImpl<String> list, int index)
+  {
+    boolean exceptionThrown = false;
+
+    try {
+      list.get(index);
+    } catch (IndexOutOfBoundsException ex) {
+      exceptionThrown = true;
+    }
+
+    Assert.assertTrue(exceptionThrown);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
new file mode 100644
index 0000000..8033a7d
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+/**
+ * Created by tfarkas on 6/4/16.
+ */
+public class TimeBasedPriorityQueueTest
+{
+  @Test
+  public void simpleInsertAndRemoveTest()
+  {
+    TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>();
+    queue.upSert("a");
+    queue.remove("a");
+
+    overRemoveTest(queue, 1);
+  }
+
+  @Test
+  public void simpleInsertAndLRURemoveTest()
+  {
+    TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>();
+    queue.upSert("a");
+
+    Set<String> set = queue.removeLRU(1);
+
+    Assert.assertEquals(Sets.newHashSet("a"), set);
+  }
+
+  @Test
+  public void simpleLRUTest() throws Exception
+  {
+    TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>();
+
+    queue.upSert("a");
+    Thread.sleep(1L);
+
+    queue.upSert("b");
+    Thread.sleep(1L);
+
+    queue.upSert("a");
+
+    Set<String> set = queue.removeLRU(1);
+
+    Assert.assertEquals(Sets.newHashSet("b"), set);
+  }
+
+  @Test
+  public void complexLRUTest() throws Exception
+  {
+    //0, 3, 6, 9
+    //1, 4, 7
+    //2, 5, 8
+
+    TimeBasedPriorityQueue<String> queue = new TimeBasedPriorityQueue<String>();
+
+    for (int counter = 0; counter < 10; counter++) {
+      String val = "" + counter;
+
+      queue.upSert(val);
+      Thread.sleep(1L);
+    }
+
+    for (int counter = 0; counter < 10; counter++) {
+      if (counter % 3 != 1) {
+        continue;
+      }
+
+      String val = "" + counter;
+      queue.remove(val);
+    }
+
+    for (int counter = 0; counter < 10; counter++) {
+      if (counter % 3 != 0) {
+        continue;
+      }
+
+      String val = "" + counter;
+      queue.upSert(val);
+      Thread.sleep(1L);
+    }
+
+    //2, 5, 8, 0, 3, 6, 9
+
+    overRemoveTest(queue, 8);
+
+    Set<String> expiredValues = queue.removeLRU(3);
+
+    Assert.assertEquals(Sets.newHashSet("2", "5", "8"), expiredValues);
+
+    overRemoveTest(queue, 6);
+
+    expiredValues = queue.removeLRU(4);
+
+    Assert.assertEquals(Sets.newHashSet("0", "3", "6", "9"), expiredValues);
+  }
+
+  private void overRemoveTest(TimeBasedPriorityQueue<String> queue, int removeCount)
+  {
+    boolean exceptionThrown = false;
+
+    try {
+      queue.removeLRU(removeCount);
+    } catch (IllegalArgumentException e) {
+      exceptionThrown = true;
+    }
+
+    Assert.assertTrue(exceptionThrown);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java
new file mode 100644
index 0000000..b1710dd
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCacheTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Sets;
+
+public class WindowBoundedMapCacheTest
+{
+  @Test
+  public void simplePutGetTest()
+  {
+    WindowBoundedMapCache<String, String> cache = new WindowBoundedMapCache<>();
+
+    long windowId = 0L;
+
+    windowId++;
+
+    cache.put("1", "a");
+    Assert.assertEquals("a", cache.get("1"));
+
+    cache.endWindow();
+
+    windowId++;
+
+    Assert.assertEquals("a", cache.get("1"));
+
+    cache.endWindow();
+  }
+
+  @Test
+  public void getChangedGetRemovedTest()
+  {
+    WindowBoundedMapCache<String, String> cache = new WindowBoundedMapCache<>();
+
+    long windowId = 0L;
+
+    windowId++;
+
+    cache.put("1", "a");
+    cache.put("2", "b");
+
+    Assert.assertEquals(Sets.newHashSet("1", "2"), cache.getChangedKeys());
+    Assert.assertEquals(Sets.newHashSet(), cache.getRemovedKeys());
+
+    cache.endWindow();
+
+    windowId++;
+
+    cache.remove("1");
+
+    Assert.assertEquals(Sets.newHashSet(), cache.getChangedKeys());
+    Assert.assertEquals(Sets.newHashSet("1"), cache.getRemovedKeys());
+
+    Assert.assertEquals(null, cache.get("1"));
+    Assert.assertEquals("b", cache.get("2"));
+
+    cache.endWindow();
+
+    windowId++;
+
+    Assert.assertEquals(Sets.newHashSet(), cache.getChangedKeys());
+    Assert.assertEquals(Sets.newHashSet(), cache.getRemovedKeys());
+
+    cache.endWindow();
+  }
+
+  @Test
+  public void expirationTest() throws Exception
+  {
+    WindowBoundedMapCache<String, String> cache = new WindowBoundedMapCache<>(2);
+
+    long windowId = 0L;
+
+    windowId++;
+
+    cache.put("1", "a");
+    Thread.sleep(1L);
+    cache.put("2", "b");
+    Thread.sleep(1L);
+    cache.put("3", "c");
+
+    Assert.assertEquals(Sets.newHashSet("1", "2", "3"), cache.getChangedKeys());
+
+    cache.endWindow();
+
+    windowId++;
+
+    Assert.assertEquals(null, cache.get("1"));
+    Assert.assertEquals("b", cache.get("2"));
+    Assert.assertEquals("c", cache.get("3"));
+
+    Assert.assertEquals(Sets.newHashSet(), cache.getChangedKeys());
+    Assert.assertEquals(Sets.newHashSet(), cache.getRemovedKeys());
+
+    cache.endWindow();
+  }
+}