You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/09/22 16:46:49 UTC

apex-malhar git commit: APEXMALHAR-2248 #resolve Added interfaces and implementations of SpillableSet and SpillableSetMultimap

Repository: apex-malhar
Updated Branches:
  refs/heads/master d713e521e -> 7ac4a0ed7


APEXMALHAR-2248 #resolve Added interfaces and implementations of SpillableSet and SpillableSetMultimap


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/7ac4a0ed
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/7ac4a0ed
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/7ac4a0ed

Branch: refs/heads/master
Commit: 7ac4a0ed759b06c382fa54fc148114709c1452f2
Parents: d713e52
Author: David Yan <da...@datatorrent.com>
Authored: Mon Sep 19 17:58:12 2016 -0700
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu Sep 22 08:56:43 2016 -0700

----------------------------------------------------------------------
 .../malhar/lib/state/spillable/Spillable.java   |  35 +-
 .../spillable/SpillableComplexComponent.java    |  32 +-
 .../SpillableComplexComponentImpl.java          |   9 +
 .../lib/state/spillable/SpillableSetImpl.java   | 352 +++++++++++++++++++
 .../spillable/SpillableSetMultimapImpl.java     | 321 +++++++++++++++++
 .../malhar/lib/utils/serde/SerdeKryoSlice.java  | 100 ++++++
 .../malhar/lib/utils/serde/SerdeLongSlice.java  |  54 +++
 .../malhar/lib/utils/serde/SerdePairSlice.java  |  89 +++++
 .../state/spillable/SpillableSetImplTest.java   | 148 ++++++++
 .../spillable/SpillableSetMultimapImplTest.java | 298 ++++++++++++++++
 .../lib/utils/serde/SerdeKryoSliceTest.java     |  79 +++++
 .../lib/utils/serde/SerdePairSliceTest.java     |  44 +++
 12 files changed, 1546 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
index 4c9b997..849389b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
@@ -21,9 +21,11 @@ package org.apache.apex.malhar.lib.state.spillable;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 
 import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Multiset;
+import com.google.common.collect.SetMultimap;
 
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context.OperatorContext;
@@ -37,7 +39,7 @@ public interface Spillable
 {
   /**
    * This represents a spillable {@link java.util.List}. The underlying implementation
-   * of this list is similar to that of an {@link java.util.ArrayList}. User's that receive an
+   * of this list is similar to that of an {@link java.util.ArrayList}. Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
    * @param <T> The type of the data stored in the {@link SpillableArrayList}.
@@ -47,9 +49,19 @@ public interface Spillable
   }
 
   /**
+   * This represents a spillable {@link java.util.Set}. Users that receive an
+   * implementation of this interface don't need to worry about propagating operator call-backs
+   * to the data structure.
+   * @param <T> The type of the data stored in the {@link SpillableSet}.
+   */
+  interface SpillableSet<T> extends Set<T>
+  {
+  }
+
+  /**
    * This represents a spillable {@link java.util.Map}. Implementations make
    * some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is
-   * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). User's that receive an
+   * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
    * @param <K> The type of the keys.
@@ -62,7 +74,7 @@ public interface Spillable
   /**
    * This represents a spillable {@link com.google.common.collect.ListMultimap} implementation. Implementations make
    * some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is
-   * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). User's that receive an
+   * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
    * @param <K> The type of the keys.
@@ -73,9 +85,22 @@ public interface Spillable
   }
 
   /**
+   * This represents a spillable {@link com.google.common.collect.SetMultimap} implementation. Implementations make
+   * some assumptions about serialization and equality. Consider two keys K1 and K2. The assumption is
+   * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
+   * implementation of this interface don't need to worry about propagating operator call-backs
+   * to the data structure.
+   * @param <K> The type of the keys.
+   * @param <V> The type of the values.
+   */
+  interface SpillableSetMultimap<K, V> extends SetMultimap<K, V>
+  {
+  }
+
+  /**
    * This represents a spillable {@link com.google.common.collect.Multiset} implementation. Implementations make
    * some assumptions about serialization and equality. Consider two elements T1 and T2. The assumption is
-   * that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). User's that receive an
+   * that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs to the data structure.
    */
   interface SpillableByteMultiset<T> extends Multiset<T>
@@ -83,7 +108,7 @@ public interface Spillable
   }
 
   /**
-   * This represents a spillable {@link java.util.Queue} implementation. User's that receive an
+   * This represents a spillable {@link java.util.Queue} implementation. Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
    * @param <T> The type of the data stored in the queue.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index c63c7ef..e4836c4 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -39,7 +39,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * This is a method for creating a {@link SpillableArrayList}. This method
    * auto-generates an identifier for the data structure.
    * @param <T> The type of data stored in the {@link SpillableArrayList}.
-   * @param bucket The bucket that this {@link SpillableArrayList} will be spilled too.
+   * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to.
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}.
    * @return A {@link SpillableArrayList}.
    */
@@ -49,7 +49,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * This is a method for creating a {@link SpillableArrayList}.
    * @param <T> The type of data stored in the {@link SpillableArrayList}.
    * @param identifier The identifier for this {@link SpillableArrayList}.
-   * @param bucket The bucket that this {@link SpillableArrayList} will be spilled too.
+   * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to.
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}.
    * @return A {@link SpillableArrayList}.
    */
@@ -60,7 +60,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * auto-generates an identifier for the data structure.
    * @param <K> The type of the keys.
    * @param <V> The type of the values.
-   * @param bucket The bucket that this {@link SpillableByteMap} will be spilled too.
+   * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to.
    * @param serdeKey The Serializer/Deserializer to use for the map's keys.
    * @param serdeValue The Serializer/Deserializer to use for the map's values.
    * @return A {@link SpillableByteMap}.
@@ -73,7 +73,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param <K> The type of the keys.
    * @param <V> The type of the values.
    * @param identifier The identifier for this {@link SpillableByteMap}.
-   * @param bucket The bucket that this {@link SpillableByteMap} will be spilled too.
+   * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to.
    * @param serdeKey The Serializer/Deserializer to use for the map's keys.
    * @param serdeValue The Serializer/Deserializer to use for the map's values.
    * @return A {@link SpillableByteMap}.
@@ -86,7 +86,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * auto-generates an identifier for the data structure.
    * @param <K> The type of the keys.
    * @param <V> The type of the values in the map's lists.
-   * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled too.
+   * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to.
    * @param serdeKey The Serializer/Deserializer to use for the map's keys.
    * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
    * @return A {@link SpillableByteArrayListMultimap}.
@@ -99,7 +99,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param <K> The type of the keys.
    * @param <V> The type of the values in the map's lists.
    * @param identifier The identifier for this {@link SpillableByteArrayListMultimap}.
-   * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled too.
+   * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to.
    * @param serdeKey The Serializer/Deserializer to use for the map's keys.
    * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
    * @return A {@link SpillableByteArrayListMultimap}.
@@ -109,10 +109,22 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
       Serde<V, Slice> serdeValue);
 
   /**
+   * This is a method for creating a {@link SpillableSetMultimap}.
+   * @param <K> The type of the keys.
+   * @param <V> The type of the values in the map's lists.
+   * @param bucket The bucket that this {@link SpillableSetMultimap} will be spilled to.
+   * @param serdeKey The Serializer/Deserializer to use for the map's keys.
+   * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
+   * @return A {@link SpillableSetMultimap}.
+   */
+  <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
+      Slice> serdeKey, Serde<V, Slice> serdeValue);
+
+  /**
    * This is a method for creating a {@link SpillableByteMultiset}. This method
    * auto-generates an identifier for the data structure.
    * @param <T> The type of the elements.
-   * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled too.
+   * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to.
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}.
    * @return A {@link SpillableByteMultiset}.
    */
@@ -122,7 +134,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * This is a method for creating a {@link SpillableByteMultiset}.
    * @param <T> The type of the elements.
    * @param identifier The identifier for this {@link SpillableByteMultiset}.
-   * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled too.
+   * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to.
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}.
    * @return A {@link SpillableByteMultiset}.
    */
@@ -132,7 +144,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * This is a method for creating a {@link SpillableQueue}. This method
    * auto-generates an identifier for the data structure.
    * @param <T> The type of the data stored in the {@link SpillableQueue}.
-   * @param bucket The bucket that this {@link SpillableQueue} will be spilled too.
+   * @param bucket The bucket that this {@link SpillableQueue} will be spilled to.
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
    * @return A {@link SpillableQueue}.
    */
@@ -142,7 +154,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * This is a method for creating a {@link SpillableQueue}.
    * @param <T> The type of the data stored in the {@link SpillableQueue}.
    * @param identifier The identifier for this {@link SpillableByteArrayListMultimap}.
-   * @param bucket The bucket that this {@link SpillableQueue} will be spilled too.
+   * @param bucket The bucket that this {@link SpillableQueue} will be spilled to.
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
    * @return A {@link SpillableQueue}.
    */

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index aeb02c5..9c3defc 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -119,6 +119,15 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
     return map;
   }
 
+  public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
+      Slice> serdeKey, Serde<V, Slice> serdeValue)
+  {
+    SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<K, V>(store,
+        identifierGenerator.next(), bucket, serdeKey, serdeValue);
+    componentList.add(map);
+    return map;
+  }
+
   public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
new file mode 100644
index 0000000..122cd2d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -0,0 +1,352 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
+ * @param <T> The type of object stored in the {@link SpillableSetImpl}.
+ *
+ * @since 3.5.0
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable.SpillableComponent
+{
+  private static class ListNode<T>
+  {
+    ListNode()
+    {
+    }
+
+    ListNode(boolean valid, T next)
+    {
+      this.valid = valid;
+      this.next = next;
+    }
+
+    boolean valid;
+    T next;
+  }
+
+  public static class SerdeListNodeSlice<T> implements Serde<ListNode<T>, Slice>
+  {
+    private Serde<T, Slice> serde;
+    private static Slice falseSlice = new Slice(new byte[]{0});
+    private static Slice trueSlice = new Slice(new byte[]{1});
+
+    public SerdeListNodeSlice(@NotNull Serde<T, Slice> serde)
+    {
+      this.serde = Preconditions.checkNotNull(serde);
+    }
+
+    @Override
+    public Slice serialize(ListNode<T> object)
+    {
+      int size = 0;
+
+      Slice slice1 = object.valid ? trueSlice : falseSlice;
+      size += 1;
+      Slice slice2 = serde.serialize(object.next);
+      size += slice2.length;
+
+      byte[] bytes = new byte[size];
+      System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
+      System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
+
+      return new Slice(bytes);
+    }
+
+    @Override
+    public ListNode<T> deserialize(Slice slice, MutableInt offset)
+    {
+      ListNode<T> result = new ListNode<>();
+      result.valid = slice.buffer[offset.intValue()] != 0;
+      offset.add(1);
+      result.next = serde.deserialize(slice, offset);
+      return result;
+    }
+
+    @Override
+    public ListNode<T> deserialize(Slice object)
+    {
+      return deserialize(object, new MutableInt(0));
+    }
+  }
+
+  @NotNull
+  private SpillableStateStore store;
+  @NotNull
+  private SpillableByteMapImpl<T, ListNode<T>> map;
+
+  private T head;
+  private int size;
+
+  private SpillableSetImpl()
+  {
+    //for kryo
+  }
+
+  public SpillableStateStore getStore()
+  {
+    return store;
+  }
+
+  /**
+   * Creates a {@link SpillableSetImpl}.
+   * @param bucketId The Id of the bucket used to store this
+   * {@link SpillableSetImpl} in the provided {@link SpillableStateStore}.
+   * @param prefix The Id of this {@link SpillableSetImpl}.
+   * @param store The {@link SpillableStateStore} in which to spill to.
+   * @param serde The {@link Serde} to use when serializing and deserializing data.
+   */
+  public SpillableSetImpl(long bucketId, @NotNull byte[] prefix,
+      @NotNull SpillableStateStore store,
+      @NotNull Serde<T, Slice> serde)
+  {
+    this.store = Preconditions.checkNotNull(store);
+
+    map = new SpillableByteMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde));
+  }
+
+  public void setSize(int size)
+  {
+    Preconditions.checkArgument(size >= 0);
+    this.size = size;
+  }
+
+  public void setHead(T head)
+  {
+    Preconditions.checkNotNull(head);
+    this.head = head;
+  }
+
+  public T getHead()
+  {
+    return head;
+  }
+
+  @Override
+  public int size()
+  {
+    return size;
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return size == 0;
+  }
+
+  @Override
+  public boolean contains(Object o)
+  {
+    T t = (T)o;
+    ListNode<T> node = map.get(t);
+    return node != null && node.valid;
+  }
+
+  @Override
+  public Iterator<T> iterator()
+  {
+    return new Iterator<T>()
+    {
+      T cur = head;
+      T prev = null;
+
+      @Override
+      public boolean hasNext()
+      {
+        while (cur != null) {
+          ListNode<T> node = map.get(cur);
+          if (node.valid) {
+            return true;
+          }
+          if (cur.equals(node.next)) {
+            break;
+          } else {
+            cur = node.next;
+          }
+        }
+        return false;
+      }
+
+      @Override
+      public T next()
+      {
+        while (cur != null) {
+          ListNode<T> node = map.get(cur);
+          try {
+            if (node.valid) {
+              prev = cur;
+              return prev;
+            }
+          } finally {
+            if (cur.equals(node.next)) {
+              cur = null;
+            } else {
+              cur = node.next;
+            }
+          }
+        }
+        throw new NoSuchElementException();
+      }
+
+      @Override
+      public void remove()
+      {
+        ListNode<T> node = map.get(prev);
+        node.valid = false;
+        map.put(prev, node);
+        size--;
+      }
+    };
+  }
+
+  @Override
+  public Object[] toArray()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T1> T1[] toArray(T1[] t1s)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean add(T t)
+  {
+    Preconditions.checkArgument((size() + 1) > 0);
+    ListNode<T> node = map.get(t);
+    if (node == null) {
+      map.put(t, new ListNode<>(true, head == null ? t : head));
+      head = t;
+      size++;
+      return true;
+    } else if (!node.valid) {
+      node.valid = true;
+      map.put(t, node);
+      size++;
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean remove(Object o)
+  {
+    T t = (T)o;
+    ListNode<T> node = map.get(t);
+    if (node == null || !node.valid) {
+      return false;
+    } else {
+      node.valid = false;
+      map.put(t, node);
+      size--;
+      return true;
+    }
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> collection)
+  {
+    for (Object item : collection) {
+      if (!contains(item)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends T> collection)
+  {
+    for (T element: collection) {
+      add(element);
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> collection)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> collection)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+    Iterator<T> it = iterator();
+    while (it.hasNext()) {
+      it.next();
+      it.remove();
+    }
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    map.setup(context);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    map.beginWindow(windowId);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    map.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+    map.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
new file mode 100644
index 0000000..951ef76
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -0,0 +1,321 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdePairSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}.
+ *
+ * @since 3.5.0
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMultimap<K, V>,
+    Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] META_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
+
+  private transient WindowBoundedMapCache<K, SpillableSetImpl<V>> cache = new WindowBoundedMapCache<>();
+
+  @NotNull
+  private SpillableByteMapImpl<Slice, Pair<Integer, V>> map;
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde<K, Slice> serdeKey;
+  private Serde<V, Slice> serdeValue;
+  private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>();
+
+  private SpillableSetMultimapImpl()
+  {
+    // for kryo
+  }
+
+  /**
+   * Creates a {@link SpillableSetMultimapImpl}.
+   * @param store The {@link SpillableStateStore} in which to spill to.
+   * @param identifier The Id of this {@link SpillableSetMultimapImpl}.
+   * @param bucket The Id of the bucket used to store this
+   * {@link SpillableSetMultimapImpl} in the provided {@link SpillableStateStore}.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+   */
+  public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
+      Serde<K, Slice> serdeKey,
+      Serde<V, Slice> serdeValue)
+  {
+    this.store = Preconditions.checkNotNull(store);
+    this.identifier = Preconditions.checkNotNull(identifier);
+    this.bucket = bucket;
+    this.serdeKey = Preconditions.checkNotNull(serdeKey);
+    this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue));
+  }
+
+  public SpillableStateStore getStore()
+  {
+    return store;
+  }
+
+  @Override
+  public Set<V> get(@NotNull K key)
+  {
+    return getHelper(key);
+  }
+
+  private SpillableSetImpl<V> getHelper(@NotNull K key)
+  {
+    SpillableSetImpl<V> spillableSet = cache.get(key);
+
+    if (spillableSet == null) {
+      Slice keySlice = serdeKey.serialize(key);
+      Pair<Integer, V> meta = map.get(SliceUtils.concatenate(keySlice, META_KEY_SUFFIX));
+
+      if (meta == null) {
+        return null;
+      }
+
+      Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
+      spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+      spillableSet.setSize(meta.getLeft());
+      spillableSet.setHead(meta.getRight());
+    }
+
+    cache.put(key, spillableSet);
+
+    return spillableSet;
+  }
+
+  @Override
+  public Set<K> keySet()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset<K> keys()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<V> values()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<Map.Entry<K, V>> entries()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Note that this always returns null because the set is no longer valid after this call
+   *
+   * @param key
+   * @return null
+   */
+  @Override
+  public Set<V> removeAll(@NotNull Object key)
+  {
+    SpillableSetImpl<V> spillableSet = getHelper((K)key);
+    if (spillableSet != null) {
+      cache.remove((K)key);
+      Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+      map.remove(keySlice);
+      spillableSet.clear();
+      removedSets.add(spillableSet);
+    }
+    return null;
+  }
+
+  @Override
+  public void clear()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int size()
+  {
+    // TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys
+    return map.size();
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return map.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(Object key)
+  {
+    if (cache.contains((K)key)) {
+      return true;
+    }
+    Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+    return map.containsKey(keySlice);
+  }
+
+  @Override
+  public boolean containsValue(@NotNull Object value)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean containsEntry(Object key, Object value)
+  {
+    Set<V> set = get((K)key);
+    if (set == null) {
+      return false;
+    } else {
+      return set.contains(value);
+    }
+  }
+
+  @Override
+  public boolean put(K key, V value)
+  {
+    SpillableSetImpl<V> spillableSet = getHelper(key);
+
+    if (spillableSet == null) {
+      Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
+      spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+      cache.put(key, spillableSet);
+    }
+    spillableSet.add(value);
+    return true;
+  }
+
+  @Override
+  public boolean remove(@NotNull Object key, @NotNull Object value)
+  {
+    Set<V> set = get((K)key);
+    if (set == null) {
+      return false;
+    } else {
+      return set.remove(value);
+    }
+  }
+
+  @Override
+  public boolean putAll(@Nullable K key, Iterable<? extends V> values)
+  {
+    boolean changed = false;
+
+    for (V value: values) {
+      changed |= put(key, value);
+    }
+
+    return changed;
+  }
+
+  @Override
+  public boolean putAll(Multimap<? extends K, ? extends V> multimap)
+  {
+    boolean changed = false;
+
+    for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) {
+      changed |= put(entry.getKey(), entry.getValue());
+    }
+
+    return changed;
+  }
+
+  @Override
+  public Set<V> replaceValues(K key, Iterable<? extends V> values)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<K, Collection<V>> asMap()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    map.setup(context);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    map.beginWindow(windowId);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    for (K key: cache.getChangedKeys()) {
+
+      SpillableSetImpl<V> spillableSet = cache.get(key);
+      spillableSet.endWindow();
+
+      map.put(SliceUtils.concatenate(serdeKey.serialize(key), META_KEY_SUFFIX),
+          new ImmutablePair<>(spillableSet.size(), spillableSet.getHead()));
+    }
+
+    for (SpillableSetImpl removedSet : removedSets) {
+      removedSet.endWindow();
+    }
+
+    cache.endWindow();
+    map.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+    map.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
new file mode 100644
index 0000000..d4b9488
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.io.ByteArrayOutputStream;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
+ * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or
+ * incompatible changes to the class being serialized.
+ *
+ * @param <T> The type being serialized
+ */
+@InterfaceStability.Evolving
+public class SerdeKryoSlice<T> implements Serde<T, Slice>
+{
+  // Setup ThreadLocal of Kryo instances
+  private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
+  {
+    protected Kryo initialValue()
+    {
+      Kryo kryo = new Kryo();
+      // configure kryo instance, customize settings
+      return kryo;
+    }
+  };
+
+  private final Class<? extends T> clazz;
+
+  public SerdeKryoSlice()
+  {
+    this.clazz = null;
+  }
+
+  public SerdeKryoSlice(Class<? extends T> clazz)
+  {
+    this.clazz = clazz;
+  }
+
+  @Override
+  public Slice serialize(T object)
+  {
+    Kryo kryo = kryos.get();
+    ByteArrayOutputStream stream = new ByteArrayOutputStream();
+    Output output = new Output(stream);
+    if (clazz == null) {
+      kryo.writeClassAndObject(output, object);
+    } else {
+      kryo.writeObject(output, object);
+    }
+    return new Slice(output.toBytes());
+  }
+
+  @Override
+  public T deserialize(Slice slice, MutableInt offset)
+  {
+    byte[] bytes = slice.toByteArray();
+    Kryo kryo = kryos.get();
+    Input input = new Input(bytes, offset.intValue(), bytes.length - offset.intValue());
+    T object;
+    if (clazz == null) {
+      object = (T)kryo.readClassAndObject(input);
+    } else {
+      object = kryo.readObject(input, clazz);
+    }
+    offset.setValue(bytes.length - input.position());
+    return object;
+  }
+
+  @Override
+  public T deserialize(Slice slice)
+  {
+    return deserialize(slice, new MutableInt(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
new file mode 100644
index 0000000..6fe07d9
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class SerdeLongSlice implements Serde<Long, Slice>
+{
+  @Override
+  public Slice serialize(Long object)
+  {
+    return new Slice(GPOUtils.serializeLong(object));
+  }
+
+  @Override
+  public Long deserialize(Slice slice, MutableInt offset)
+  {
+    long val = GPOUtils.deserializeLong(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
+    offset.add(8);
+    return val;
+  }
+
+  @Override
+  public Long deserialize(Slice object)
+  {
+    return deserialize(object, new MutableInt(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
new file mode 100644
index 0000000..59cf282
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes pairs.
+ */
+@InterfaceStability.Evolving
+public class SerdePairSlice<T1, T2> implements Serde<Pair<T1, T2>, Slice>
+{
+  @NotNull
+  private Serde<T1, Slice> serde1;
+  @NotNull
+  private Serde<T2, Slice> serde2;
+
+  private SerdePairSlice()
+  {
+    // for Kryo
+  }
+
+  /**
+   * Creates a {@link SerdePairSlice}.
+   * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair
+   * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair
+   */
+  public SerdePairSlice(@NotNull Serde<T1, Slice> serde1, @NotNull Serde<T2, Slice> serde2)
+  {
+    this.serde1 = Preconditions.checkNotNull(serde1);
+    this.serde2 = Preconditions.checkNotNull(serde2);
+  }
+
+  @Override
+  public Slice serialize(Pair<T1, T2> pair)
+  {
+    int size = 0;
+
+    Slice slice1 = serde1.serialize(pair.getLeft());
+    size += slice1.length;
+    Slice slice2 = serde2.serialize(pair.getRight());
+    size += slice2.length;
+
+    byte[] bytes = new byte[size];
+    System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
+    System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
+
+    return new Slice(bytes);
+  }
+
+  @Override
+  public Pair<T1, T2> deserialize(Slice slice, MutableInt offset)
+  {
+    T1 first = serde1.deserialize(slice, offset);
+    T2 second = serde2.deserialize(slice, offset);
+    return new ImmutablePair<>(first, second);
+  }
+
+  @Override
+  public Pair<T1, T2> deserialize(Slice slice)
+  {
+    return deserialize(slice, new MutableInt(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
new file mode 100644
index 0000000..3883191
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.HashSet;
+import java.util.Iterator;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.google.common.collect.Lists;
+
+public class SpillableSetImplTest
+{
+  public static final byte[] ID1 = new byte[]{(byte)0};
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+  @Test
+  public void simpleAddGetAndSetTest1()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleAddGetAndSetTest1Helper(store);
+  }
+
+  @Test
+  public void simpleAddGetAndSetManagedStateTest1()
+  {
+    simpleAddGetAndSetTest1Helper(testMeta.store);
+  }
+
+  public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
+  {
+    SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    set.setup(testMeta.operatorContext);
+
+    long windowId = 0L;
+    store.beginWindow(windowId);
+    set.beginWindow(windowId);
+
+    Assert.assertEquals(0, set.size());
+
+    set.add("a");
+
+    Assert.assertEquals(1, set.size());
+
+    Assert.assertTrue(set.contains("a"));
+
+    set.addAll(Lists.newArrayList("a", "b", "c"));
+
+    Assert.assertEquals(3, set.size());
+
+    Assert.assertTrue(set.contains("a"));
+    Assert.assertTrue(set.contains("b"));
+    Assert.assertTrue(set.contains("c"));
+
+    HashSet<String> result = new HashSet<>();
+    Iterator<String> it = set.iterator();
+    int i = 0;
+    while (it.hasNext()) {
+      result.add(it.next());
+      i++;
+    }
+    Assert.assertTrue(result.containsAll(Lists.newArrayList("a", "b", "c")));
+    Assert.assertEquals(3, i);
+
+    it = set.iterator();
+    while (it.hasNext()) {
+      if ("b".equals(it.next())) {
+        it.remove();
+      }
+    }
+    Assert.assertEquals(2, set.size());
+    Assert.assertTrue(set.contains("a"));
+    Assert.assertFalse(set.contains("b"));
+    Assert.assertTrue(set.contains("c"));
+
+    set.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    set.beginWindow(windowId);
+
+    set.add("tt");
+    set.add("ab");
+    set.add("99");
+    set.add("oo");
+
+    Assert.assertTrue(set.contains("tt"));
+    Assert.assertTrue(set.contains("ab"));
+    Assert.assertTrue(set.contains("99"));
+    Assert.assertTrue(set.contains("oo"));
+
+    set.remove("ab");
+
+    Assert.assertTrue(set.contains("tt"));
+    Assert.assertFalse(set.contains("ab"));
+    Assert.assertTrue(set.contains("99"));
+    Assert.assertTrue(set.contains("oo"));
+
+    set.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    windowId++;
+    store.beginWindow(windowId);
+    set.beginWindow(windowId);
+
+    set.endWindow();
+    store.endWindow();
+    store.beforeCheckpoint(windowId);
+    store.checkpointed(windowId);
+    store.committed(windowId);
+
+    set.teardown();
+    store.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
new file mode 100644
index 0000000..e9903ec
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Iterator;
+import java.util.Random;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
+public class SpillableSetMultimapImplTest
+{
+  public static final byte[] ID1 = new byte[]{(byte)0};
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+  @Test
+  public void simpleMultiKeyTest()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleMultiKeyTestHelper(store);
+  }
+
+  @Test
+  public void simpleMultiKeyManagedStateTest()
+  {
+    simpleMultiKeyTestHelper(testMeta.store);
+  }
+
+  public void simpleMultiKeyTestHelper(SpillableStateStore store)
+  {
+    SpillableSetMultimapImpl<String, String> map =
+        new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(),
+        new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long nextWindowId = 0L;
+    nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+    nextWindowId++;
+
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(1, map.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    nextWindowId = simpleMultiKeyTestHelper(store, map, "b", nextWindowId);
+    nextWindowId++;
+
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(2, map.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    simpleMultiKeyTestHelper(store, map, "c", nextWindowId);
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(3, map.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    map.teardown();
+    store.teardown();
+  }
+
+  public long simpleMultiKeyTestHelper(SpillableStateStore store,
+      SpillableSetMultimapImpl<String, String> map, String key, long nextWindowId)
+  {
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertNull(map.get(key));
+
+    Assert.assertFalse(map.containsKey(key));
+
+    map.put(key, "a");
+
+    Assert.assertTrue(map.containsKey(key));
+
+    Set<String> set1 = map.get(key);
+    Assert.assertEquals(1, set1.size());
+    Iterator<String> it = set1.iterator();
+
+    Assert.assertEquals("a", it.next());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    map.removeAll(key);
+    Assert.assertFalse(map.containsKey(key));
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertFalse(map.containsKey(key));
+    map.put(key, "a");
+    set1 = map.get(key);
+    Assert.assertEquals(1, set1.size());
+    set1.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
+    Assert.assertEquals(7, set1.size());
+
+    Set<String> referenceSet = Sets.newHashSet("a", "b", "c", "d", "e", "f", "g");
+    Assert.assertTrue(referenceSet.containsAll(set1));
+    Assert.assertTrue(set1.containsAll(referenceSet));
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Set<String> set2 = map.get(key);
+
+    Assert.assertEquals(7, set2.size());
+    Assert.assertTrue(referenceSet.containsAll(set2));
+    Assert.assertTrue(set2.containsAll(referenceSet));
+
+    set2.add("tt");
+    set2.add("ab");
+    set2.add("99");
+    set2.add("oo");
+    referenceSet = Sets.newHashSet("a", "b", "c", "d", "e", "f", "g", "tt", "ab", "99", "oo");
+    Assert.assertTrue(referenceSet.containsAll(set2));
+    Assert.assertTrue(set2.containsAll(referenceSet));
+
+    Assert.assertEquals(11, set2.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(11, set2.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    return nextWindowId;
+  }
+
+  @Test
+  public void recoveryTestWithManagedState()
+  {
+    SpillableStateStore store = testMeta.store;
+
+    SpillableSetMultimapImpl<String, String> map =
+        new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long nextWindowId = 0L;
+    nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+    long activationWindow = nextWindowId;
+    store.beforeCheckpoint(nextWindowId);
+    SpillableSetMultimapImpl<String, String> clonedMap = KryoCloneUtils.cloneObject(map);
+    store.checkpointed(nextWindowId);
+    store.committed(nextWindowId);
+
+    nextWindowId++;
+
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Set<String> set1 = map.get("a");
+
+    Assert.assertEquals(11, set1.size());
+
+    Set<String> referenceSet = Sets.newHashSet("a", "b", "c", "d", "e", "f", "g", "tt", "ab", "99", "oo");
+    Assert.assertTrue(referenceSet.containsAll(set1));
+    Assert.assertTrue(set1.containsAll(referenceSet));
+
+    set1.add("111");
+
+    Assert.assertTrue(set1.contains("111"));
+
+    Assert.assertEquals(12, set1.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    map.teardown();
+    store.teardown();
+
+    map = clonedMap;
+    store = map.getStore();
+
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+    attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow);
+    Context.OperatorContext context =
+        new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+
+    store.setup(context);
+    map.setup(context);
+
+    nextWindowId = activationWindow + 1;
+
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(1, map.size());
+    Assert.assertTrue(map.containsKey("a"));
+    Assert.assertEquals(11, map.get("a").size());
+
+    map.endWindow();
+    store.endWindow();
+
+    map.teardown();
+    store.teardown();
+  }
+  
+  @Test
+  public void testLoad()
+  {
+    Random random = new Random();
+    final int keySize = 1000000;
+    final int valueSize = 100000000;
+    final int numOfEntry = 100000;
+
+    SpillableStateStore store = testMeta.store;
+    
+    SpillableByteArrayListMultimapImpl<String, String> multimap = new SpillableByteArrayListMultimapImpl<>(
+        this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+    
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+    Context.OperatorContext context =
+        new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+    store.setup(context);
+    multimap.setup(context);
+
+    store.beginWindow(1);
+    multimap.beginWindow(1);
+    for (int i = 0; i < numOfEntry; ++i) {
+      multimap.put(String.valueOf(random.nextInt(keySize)), String.valueOf(random.nextInt(valueSize)));
+    }
+    multimap.endWindow();
+    store.endWindow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
new file mode 100644
index 0000000..b780f66
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * SerdeKryoSlice unit tests
+ */
+public class SerdeKryoSliceTest
+{
+  public static class TestPojo
+  {
+    private TestPojo()
+    {
+    }
+
+    public TestPojo(int intValue, String stringValue)
+    {
+      this.intValue = intValue;
+      this.stringValue = stringValue;
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+      TestPojo o = (TestPojo)other;
+      return intValue == o.intValue && stringValue.equals(o.stringValue);
+    }
+
+    int intValue;
+    String stringValue;
+  }
+
+  @Test
+  public void stringListTest()
+  {
+    SerdeKryoSlice<ArrayList> serdeList = new SerdeKryoSlice<>(ArrayList.class);
+
+    ArrayList<String> stringList = Lists.newArrayList("a", "b", "c");
+    Slice slice = serdeList.serialize(stringList);
+    List<String> deserializedList = serdeList.deserialize(slice);
+    Assert.assertEquals(stringList, deserializedList);
+  }
+
+  @Test
+  public void pojoTest()
+  {
+    SerdeKryoSlice<TestPojo> serdePojo = new SerdeKryoSlice<>();
+    TestPojo pojo = new TestPojo(345, "xyz");
+    Slice slice = serdePojo.serialize(pojo);
+    TestPojo deserializedPojo = serdePojo.deserialize(slice);
+    Assert.assertEquals(pojo, deserializedPojo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/7ac4a0ed/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
new file mode 100644
index 0000000..6684a9f
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SerdePairSliceTest
+{
+  @Test
+  public void simpleSerdeTest()
+  {
+    SerdePairSlice<String, Integer> serdePair = new SerdePairSlice<>(new SerdeStringSlice(), new SerdeIntSlice());
+
+    Pair<String, Integer> pair = new ImmutablePair<>("abc", 123);
+
+    Slice slice = serdePair.serialize(pair);
+
+    Pair<String, Integer> deserializedPair = serdePair.deserialize(slice);
+
+    Assert.assertEquals(pair, deserializedPair);
+  }
+}