You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by hs...@apache.org on 2016/09/27 23:08:27 UTC

[2/2] apex-malhar git commit: APEXMALHAR-2267 #resolve renamed spillable data structures to remove the word "Byte"

APEXMALHAR-2267 #resolve renamed spillable data structures to remove the word "Byte"


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f5f1943d
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f5f1943d
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f5f1943d

Branch: refs/heads/master
Commit: f5f1943d2bcc2b61240fab649828c1aaf520d22b
Parents: c19c80d
Author: David Yan <da...@datatorrent.com>
Authored: Fri Sep 23 12:46:20 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Tue Sep 27 14:26:46 2016 -0700

----------------------------------------------------------------------
 .../lib/join/AbstractInnerJoinOperator.java     |  12 +-
 .../AbstractManagedStateInnerJoinOperator.java  |   4 +-
 .../managed/ManagedTimeStateMultiValue.java     |   4 +-
 .../malhar/lib/state/spillable/Spillable.java   |  21 +-
 .../state/spillable/SpillableArrayListImpl.java |   6 +-
 .../SpillableArrayListMultimapImpl.java         | 310 ++++++++++++
 .../SpillableByteArrayListMultimapImpl.java     | 310 ------------
 .../state/spillable/SpillableByteMapImpl.java   | 237 ---------
 .../spillable/SpillableComplexComponent.java    |  86 ++--
 .../SpillableComplexComponentImpl.java          |  24 +-
 .../lib/state/spillable/SpillableMapImpl.java   | 237 +++++++++
 .../lib/state/spillable/SpillableSetImpl.java   |   4 +-
 .../spillable/SpillableSetMultimapImpl.java     |   4 +-
 .../impl/SpillableWindowedKeyedStorage.java     |   4 +-
 .../impl/SpillableWindowedPlainStorage.java     |   4 +-
 .../SpillableArrayListMultimapImplTest.java     | 370 ++++++++++++++
 .../SpillableByteArrayListMultimapImplTest.java | 371 --------------
 .../spillable/SpillableByteMapImplTest.java     | 484 -------------------
 .../SpillableComplexComponentImplTest.java      |   2 +-
 .../state/spillable/SpillableMapImplTest.java   | 484 +++++++++++++++++++
 .../spillable/SpillableSetMultimapImplTest.java |   3 +-
 .../malhar/lib/window/WindowedOperatorTest.java |   4 +-
 22 files changed, 1495 insertions(+), 1490 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
index 816ca58..c1ebdd5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractInnerJoinOperator.java
@@ -88,8 +88,8 @@ public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
   private boolean isLeftKeyPrimary = false;
   private boolean isRightKeyPrimary = false;
   protected SpillableComplexComponent component;
-  protected Spillable.SpillableByteArrayListMultimap<K,T> stream1Data;
-  protected Spillable.SpillableByteArrayListMultimap<K,T> stream2Data;
+  protected Spillable.SpillableListMultimap<K,T> stream1Data;
+  protected Spillable.SpillableListMultimap<K,T> stream2Data;
 
   /**
    * Process the tuple which are received from input ports with the following steps:
@@ -103,12 +103,12 @@ public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
    */
   protected void processTuple(T tuple, boolean isStream1Data)
   {
-    Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
+    Spillable.SpillableListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
     K key = extractKey(tuple,isStream1Data);
     if (!store.put(key, tuple)) {
       return;
     }
-    Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
+    Spillable.SpillableListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
     joinStream(tuple,isStream1Data, valuestore.get(key));
   }
 
@@ -210,8 +210,8 @@ public abstract class AbstractInnerJoinOperator<K,T> extends BaseOperator
    */
   public void createStores()
   {
-    stream1Data = component.newSpillableByteArrayListMultimap(0,null,null);
-    stream2Data = component.newSpillableByteArrayListMultimap(0,null,null);
+    stream1Data = component.newSpillableArrayListMultimap(0,null,null);
+    stream2Data = component.newSpillableArrayListMultimap(0,null,null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
index 8b19ebc..c82c3e3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/join/AbstractManagedStateInnerJoinOperator.java
@@ -93,13 +93,13 @@ public abstract class AbstractManagedStateInnerJoinOperator<K,T> extends Abstrac
   @Override
   protected void processTuple(T tuple, boolean isStream1Data)
   {
-    Spillable.SpillableByteArrayListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
+    Spillable.SpillableListMultimap<K,T> store = isStream1Data ? stream1Data : stream2Data;
     K key = extractKey(tuple,isStream1Data);
     long timeBucket = extractTime(tuple,isStream1Data);
     if (!((ManagedTimeStateMultiValue)store).put(key, tuple,timeBucket)) {
       return;
     }
-    Spillable.SpillableByteArrayListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
+    Spillable.SpillableListMultimap<K, T> valuestore = isStream1Data ? stream2Data : stream1Data;
     Future<List> future = ((ManagedTimeStateMultiValue)valuestore).getAsync(key);
     if (future.isDone()) {
       try {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
index 3ca43a4..beeeb4e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateMultiValue.java
@@ -42,7 +42,7 @@ import com.datatorrent.lib.codec.KryoSerializableStreamCodec;
 import com.datatorrent.netlet.util.Slice;
 
 /**
- * Concrete implementation of SpillableByteArrayListMultimap which is needed for join operator.
+ * Concrete implementation of SpillableListMultimap which is needed for join operator.
  *
  * <b>Properties:</b><br>
  * <b>isKeyContainsMultiValue</b>: Specifies whether the key has multiple value or not. <br>
@@ -52,7 +52,7 @@ import com.datatorrent.netlet.util.Slice;
  * @since 3.5.0
  */
 @org.apache.hadoop.classification.InterfaceStability.Evolving
-public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableByteArrayListMultimap<K,V>
+public class ManagedTimeStateMultiValue<K,V> implements Spillable.SpillableListMultimap<K,V>
 {
   private transient StreamCodec streamCodec = null;
   private boolean isKeyContainsMultiValue = false;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
index 849389b..6b765a8 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
@@ -38,13 +38,13 @@ import com.datatorrent.api.Context.OperatorContext;
 public interface Spillable
 {
   /**
-   * This represents a spillable {@link java.util.List}. The underlying implementation
-   * of this list is similar to that of an {@link java.util.ArrayList}. Users that receive an
+   * This represents a spillable {@link java.util.List}. Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
-   * @param <T> The type of the data stored in the {@link SpillableArrayList}.
+   *
+   * @param <T> The type of the data stored in the {@link SpillableList}.
    */
-  interface SpillableArrayList<T> extends List<T>
+  interface SpillableList<T> extends List<T>
   {
   }
 
@@ -52,6 +52,7 @@ public interface Spillable
    * This represents a spillable {@link java.util.Set}. Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
+   *
    * @param <T> The type of the data stored in the {@link SpillableSet}.
    */
   interface SpillableSet<T> extends Set<T>
@@ -64,10 +65,11 @@ public interface Spillable
    * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
+   *
    * @param <K> The type of the keys.
    * @param <V> The type of the values.
    */
-  interface SpillableByteMap<K, V> extends Map<K, V>
+  interface SpillableMap<K, V> extends Map<K, V>
   {
   }
 
@@ -77,10 +79,11 @@ public interface Spillable
    * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
+   *
    * @param <K> The type of the keys.
    * @param <V> The type of the values.
    */
-  interface SpillableByteArrayListMultimap<K, V> extends ListMultimap<K, V>
+  interface SpillableListMultimap<K, V> extends ListMultimap<K, V>
   {
   }
 
@@ -90,6 +93,7 @@ public interface Spillable
    * that K1.equals(K2) should be consistent with K1.toByteArray().equals(K2.toByteArray()). Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
+   *
    * @param <K> The type of the keys.
    * @param <V> The type of the values.
    */
@@ -102,8 +106,10 @@ public interface Spillable
    * some assumptions about serialization and equality. Consider two elements T1 and T2. The assumption is
    * that T1.equals(T2) should be consistent with T1.toByteArray().equals(T2.toByteArray()). Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs to the data structure.
+   *
+   * @param <T> The type of the data stored in the set.
    */
-  interface SpillableByteMultiset<T> extends Multiset<T>
+  interface SpillableMultiset<T> extends Multiset<T>
   {
   }
 
@@ -111,6 +117,7 @@ public interface Spillable
    * This represents a spillable {@link java.util.Queue} implementation. Users that receive an
    * implementation of this interface don't need to worry about propagating operator call-backs
    * to the data structure.
+   *
    * @param <T> The type of the data stored in the queue.
    */
   interface SpillableQueue<T> extends Queue<T>

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
index 4ea1923..a59872c 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
@@ -47,7 +47,7 @@ import com.datatorrent.netlet.util.Slice;
  */
 @DefaultSerializer(FieldSerializer.class)
 @InterfaceStability.Evolving
-public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T>, Spillable.SpillableComponent
+public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Spillable.SpillableComponent
 {
   public static final int DEFAULT_BATCH_SIZE = 1000;
 
@@ -60,7 +60,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T
   @NotNull
   private Serde<T, Slice> serde;
   @NotNull
-  private SpillableByteMapImpl<Integer, List<T>> map;
+  private SpillableMapImpl<Integer, List<T>> map;
 
   private boolean sizeCached = false;
   private int size;
@@ -93,7 +93,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T
     this.store = Preconditions.checkNotNull(store);
     this.serde = Preconditions.checkNotNull(serde);
 
-    map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(),
+    map = new SpillableMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(),
         new SerdeCollectionSlice<>(serde, (Class<List<T>>)(Class)ArrayList.class));
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
new file mode 100644
index 0000000..0944583
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}.
+ *
+ * @since 3.5.0
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableArrayListMultimapImpl<K, V> implements Spillable.SpillableListMultimap<K, V>,
+    Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
+
+  private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
+  private transient boolean isRunning = false;
+  private transient boolean isInWindow = false;
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+  @NotNull
+  private SpillableMapImpl<Slice, Integer> map;
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde<K, Slice> serdeKey;
+  private Serde<V, Slice> serdeValue;
+
+  private SpillableArrayListMultimapImpl()
+  {
+    // for kryo
+  }
+
+  /**
+   * Creates a {@link SpillableArrayListMultimapImpl}.
+   * @param store The {@link SpillableStateStore} in which to spill to.
+   * @param identifier The Id of this {@link SpillableArrayListMultimapImpl}.
+   * @param bucket The Id of the bucket used to store this
+   * {@link SpillableArrayListMultimapImpl} in the provided {@link SpillableStateStore}.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+   */
+  public SpillableArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
+      Serde<K, Slice> serdeKey,
+      Serde<V, Slice> serdeValue)
+  {
+    this.store = Preconditions.checkNotNull(store);
+    this.identifier = Preconditions.checkNotNull(identifier);
+    this.bucket = bucket;
+    this.serdeKey = Preconditions.checkNotNull(serdeKey);
+    this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+    map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice());
+  }
+
+  public SpillableStateStore getStore()
+  {
+    return store;
+  }
+
+  @Override
+  public List<V> get(@Nullable K key)
+  {
+    return getHelper(key);
+  }
+
+  private SpillableArrayListImpl<V> getHelper(@Nullable K key)
+  {
+    SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
+
+    if (spillableArrayList == null) {
+      Slice keySlice = serdeKey.serialize(key);
+      Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX));
+
+      if (size == null) {
+        return null;
+      }
+
+      Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
+      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+      spillableArrayList.setSize(size);
+    }
+
+    cache.put(key, spillableArrayList);
+
+    return spillableArrayList;
+  }
+
+  @Override
+  public Set<K> keySet()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset<K> keys()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<V> values()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<Map.Entry<K, V>> entries()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<V> removeAll(@Nullable Object key)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int size()
+  {
+    // TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys
+    return map.size();
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return map.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(@Nullable Object key)
+  {
+    return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
+        SIZE_KEY_SUFFIX));
+  }
+
+  @Override
+  public boolean containsValue(@Nullable Object value)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean containsEntry(@Nullable Object key, @Nullable Object value)
+  {
+    SpillableArrayListImpl<V> spillableArrayList = getHelper((K)key);
+    if (spillableArrayList == null) {
+      return false;
+    }
+    for (int i = 0; i < spillableArrayList.size(); i++) {
+      V v = spillableArrayList.get(i);
+      if (v == null) {
+        if (value == null) {
+          return true;
+        }
+      } else {
+        if (v.equals(value)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean put(@Nullable K key, @Nullable V value)
+  {
+    SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
+
+    if (spillableArrayList == null) {
+      Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
+      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+
+      cache.put(key, spillableArrayList);
+    }
+
+    spillableArrayList.add(value);
+    return true;
+  }
+
+  @Override
+  public boolean remove(@Nullable Object key, @Nullable Object value)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean putAll(@Nullable K key, Iterable<? extends V> values)
+  {
+    boolean changed = false;
+
+    for (V value: values) {
+      changed |= put(key, value);
+    }
+
+    return changed;
+  }
+
+  @Override
+  public boolean putAll(Multimap<? extends K, ? extends V> multimap)
+  {
+    boolean changed = false;
+
+    for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) {
+      changed |= put(entry.getKey(), entry.getValue());
+    }
+
+    return changed;
+  }
+
+  @Override
+  public List<V> replaceValues(K key, Iterable<? extends V> values)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<K, Collection<V>> asMap()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    map.setup(context);
+    isRunning = true;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    map.beginWindow(windowId);
+    isInWindow = true;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    isInWindow = false;
+    for (K key: cache.getChangedKeys()) {
+
+      SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
+      spillableArrayList.endWindow();
+
+      Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX),
+          spillableArrayList.size());
+    }
+
+    Preconditions.checkState(cache.getRemovedKeys().isEmpty());
+    cache.endWindow();
+    map.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+    isRunning = false;
+    map.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
deleted file mode 100644
index c0466bd..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
+++ /dev/null
@@ -1,310 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-import javax.validation.constraints.NotNull;
-
-import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
-import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multiset;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}.
- *
- * @since 3.5.0
- */
-@DefaultSerializer(FieldSerializer.class)
-@InterfaceStability.Evolving
-public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
-    Spillable.SpillableComponent
-{
-  public static final int DEFAULT_BATCH_SIZE = 1000;
-  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
-
-  private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
-  private transient boolean isRunning = false;
-  private transient boolean isInWindow = false;
-
-  private int batchSize = DEFAULT_BATCH_SIZE;
-  @NotNull
-  private SpillableByteMapImpl<Slice, Integer> map;
-  private SpillableStateStore store;
-  private byte[] identifier;
-  private long bucket;
-  private Serde<K, Slice> serdeKey;
-  private Serde<V, Slice> serdeValue;
-
-  private SpillableByteArrayListMultimapImpl()
-  {
-    // for kryo
-  }
-
-  /**
-   * Creates a {@link SpillableByteArrayListMultimapImpl}.
-   * @param store The {@link SpillableStateStore} in which to spill to.
-   * @param identifier The Id of this {@link SpillableByteArrayListMultimapImpl}.
-   * @param bucket The Id of the bucket used to store this
-   * {@link SpillableByteArrayListMultimapImpl} in the provided {@link SpillableStateStore}.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
-   */
-  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
-  {
-    this.store = Preconditions.checkNotNull(store);
-    this.identifier = Preconditions.checkNotNull(identifier);
-    this.bucket = bucket;
-    this.serdeKey = Preconditions.checkNotNull(serdeKey);
-    this.serdeValue = Preconditions.checkNotNull(serdeValue);
-
-    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice());
-  }
-
-  public SpillableStateStore getStore()
-  {
-    return store;
-  }
-
-  @Override
-  public List<V> get(@Nullable K key)
-  {
-    return getHelper(key);
-  }
-
-  private SpillableArrayListImpl<V> getHelper(@Nullable K key)
-  {
-    SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
-
-    if (spillableArrayList == null) {
-      Slice keySlice = serdeKey.serialize(key);
-      Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX));
-
-      if (size == null) {
-        return null;
-      }
-
-      Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
-      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
-      spillableArrayList.setSize(size);
-    }
-
-    cache.put(key, spillableArrayList);
-
-    return spillableArrayList;
-  }
-
-  @Override
-  public Set<K> keySet()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Multiset<K> keys()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Collection<V> values()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Collection<Map.Entry<K, V>> entries()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public List<V> removeAll(@Nullable Object key)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void clear()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int size()
-  {
-    // TODO: This is actually wrong since in a Multimap, size() should return the number of entries, not the number of distinct keys
-    return map.size();
-  }
-
-  @Override
-  public boolean isEmpty()
-  {
-    return map.isEmpty();
-  }
-
-  @Override
-  public boolean containsKey(@Nullable Object key)
-  {
-    return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
-        SIZE_KEY_SUFFIX));
-  }
-
-  @Override
-  public boolean containsValue(@Nullable Object value)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean containsEntry(@Nullable Object key, @Nullable Object value)
-  {
-    SpillableArrayListImpl<V> spillableArrayList = getHelper((K)key);
-    if (spillableArrayList == null) {
-      return false;
-    }
-    for (int i = 0; i < spillableArrayList.size(); i++) {
-      V v = spillableArrayList.get(i);
-      if (v == null) {
-        if (value == null) {
-          return true;
-        }
-      } else {
-        if (v.equals(value)) {
-          return true;
-        }
-      }
-    }
-    return false;
-  }
-
-  @Override
-  public boolean put(@Nullable K key, @Nullable V value)
-  {
-    SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
-
-    if (spillableArrayList == null) {
-      Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
-      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
-
-      cache.put(key, spillableArrayList);
-    }
-
-    spillableArrayList.add(value);
-    return true;
-  }
-
-  @Override
-  public boolean remove(@Nullable Object key, @Nullable Object value)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public boolean putAll(@Nullable K key, Iterable<? extends V> values)
-  {
-    boolean changed = false;
-
-    for (V value: values) {
-      changed |= put(key, value);
-    }
-
-    return changed;
-  }
-
-  @Override
-  public boolean putAll(Multimap<? extends K, ? extends V> multimap)
-  {
-    boolean changed = false;
-
-    for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) {
-      changed |= put(entry.getKey(), entry.getValue());
-    }
-
-    return changed;
-  }
-
-  @Override
-  public List<V> replaceValues(K key, Iterable<? extends V> values)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Map<K, Collection<V>> asMap()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-    map.setup(context);
-    isRunning = true;
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-    map.beginWindow(windowId);
-    isInWindow = true;
-  }
-
-  @Override
-  public void endWindow()
-  {
-    isInWindow = false;
-    for (K key: cache.getChangedKeys()) {
-
-      SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
-      spillableArrayList.endWindow();
-
-      Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX),
-          spillableArrayList.size());
-    }
-
-    Preconditions.checkState(cache.getRemovedKeys().isEmpty());
-    cache.endWindow();
-    map.endWindow();
-  }
-
-  @Override
-  public void teardown()
-  {
-    isRunning = false;
-    map.teardown();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
deleted file mode 100644
index f36f2dc..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
+++ /dev/null
@@ -1,237 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.apex.malhar.lib.state.BucketedState;
-import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.google.common.base.Preconditions;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * A Spillable implementation of {@link Map}
- * @param <K> The types of keys.
- * @param <V> The types of values.
- *
- * @since 3.5.0
- */
-@DefaultSerializer(FieldSerializer.class)
-@InterfaceStability.Evolving
-public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent,
-    Serializable
-{
-  private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
-  private transient MutableInt tempOffset = new MutableInt();
-
-  @NotNull
-  private SpillableStateStore store;
-  @NotNull
-  private byte[] identifier;
-  private long bucket;
-  @NotNull
-  private Serde<K, Slice> serdeKey;
-  @NotNull
-  private Serde<V, Slice> serdeValue;
-
-  private int size = 0;
-
-  private SpillableByteMapImpl()
-  {
-    //for kryo
-  }
-
-  /**
-   * Creats a {@link SpillableByteMapImpl}.
-   * @param store The {@link SpillableStateStore} in which to spill to.
-   * @param identifier The Id of this {@link SpillableByteMapImpl}.
-   * @param bucket The Id of the bucket used to store this
-   * {@link SpillableByteMapImpl} in the provided {@link SpillableStateStore}.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
-   */
-  public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
-  {
-    this.store = Preconditions.checkNotNull(store);
-    this.identifier = Preconditions.checkNotNull(identifier);
-    this.bucket = bucket;
-    this.serdeKey = Preconditions.checkNotNull(serdeKey);
-    this.serdeValue = Preconditions.checkNotNull(serdeValue);
-  }
-
-  public SpillableStateStore getStore()
-  {
-    return this.store;
-  }
-
-  @Override
-  public int size()
-  {
-    return size;
-  }
-
-  @Override
-  public boolean isEmpty()
-  {
-    return size == 0;
-  }
-
-  @Override
-  public boolean containsKey(Object o)
-  {
-    return get(o) != null;
-  }
-
-  @Override
-  public boolean containsValue(Object o)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public V get(Object o)
-  {
-    K key = (K)o;
-
-    if (cache.getRemovedKeys().contains(key)) {
-      return null;
-    }
-
-    V val = cache.get(key);
-
-    if (val != null) {
-      return val;
-    }
-
-    Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
-
-    if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
-      return null;
-    }
-
-    tempOffset.setValue(0);
-    return serdeValue.deserialize(valSlice, tempOffset);
-  }
-
-  @Override
-  public V put(K k, V v)
-  {
-    V value = get(k);
-
-    if (value == null) {
-      size++;
-    }
-
-    cache.put(k, v);
-
-    return value;
-  }
-
-  @Override
-  public V remove(Object o)
-  {
-    V value = get(o);
-
-    if (value != null) {
-      size--;
-    }
-
-    cache.remove((K)o);
-
-    return value;
-  }
-
-  @Override
-  public void putAll(Map<? extends K, ? extends V> map)
-  {
-    for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
-      put(entry.getKey(), entry.getValue());
-    }
-  }
-
-  @Override
-  public void clear()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Set<K> keySet()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Collection<V> values()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Set<Entry<K, V>> entrySet()
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-  }
-
-  @Override
-  public void endWindow()
-  {
-    for (K key: cache.getChangedKeys()) {
-      store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
-          serdeValue.serialize(cache.get(key)));
-    }
-
-    for (K key: cache.getRemovedKeys()) {
-      store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
-          new Slice(ArrayUtils.EMPTY_BYTE_ARRAY));
-    }
-
-    cache.endWindow();
-  }
-
-  @Override
-  public void teardown()
-  {
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index e4836c4..c4462d5 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -36,75 +36,75 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
     Operator.CheckpointNotificationListener
 {
   /**
-   * This is a method for creating a {@link SpillableArrayList}. This method
+   * This is a method for creating a {@link SpillableList}. This method
    * auto-generates an identifier for the data structure.
-   * @param <T> The type of data stored in the {@link SpillableArrayList}.
-   * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to.
-   * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}.
-   * @return A {@link SpillableArrayList}.
+   * @param <T> The type of data stored in the {@link SpillableList}.
+   * @param bucket The bucket that this {@link SpillableList} will be spilled to.
+   * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
+   * @return A {@link SpillableList}.
    */
-  <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde);
+  <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde);
 
   /**
-   * This is a method for creating a {@link SpillableArrayList}.
-   * @param <T> The type of data stored in the {@link SpillableArrayList}.
-   * @param identifier The identifier for this {@link SpillableArrayList}.
-   * @param bucket The bucket that this {@link SpillableArrayList} will be spilled to.
-   * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableArrayList}.
-   * @return A {@link SpillableArrayList}.
+   * This is a method for creating a {@link SpillableList}.
+   * @param <T> The type of data stored in the {@link SpillableList}.
+   * @param identifier The identifier for this {@link SpillableList}.
+   * @param bucket The bucket that this {@link SpillableList} will be spilled to.
+   * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
+   * @return A {@link SpillableList}.
    */
-  <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde);
+  <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde);
 
   /**
-   * This is a method for creating a {@link SpillableByteMap}. This method
+   * This is a method for creating a {@link SpillableMap}. This method
    * auto-generates an identifier for the data structure.
    * @param <K> The type of the keys.
    * @param <V> The type of the values.
-   * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to.
+   * @param bucket The bucket that this {@link SpillableMap} will be spilled to.
    * @param serdeKey The Serializer/Deserializer to use for the map's keys.
    * @param serdeValue The Serializer/Deserializer to use for the map's values.
-   * @return A {@link SpillableByteMap}.
+   * @return A {@link SpillableMap}.
    */
-  <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey,
+  <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
       Serde<V, Slice> serdeValue);
 
   /**
-   * This is a method for creating a {@link SpillableByteMap}.
+   * This is a method for creating a {@link SpillableMap}.
    * @param <K> The type of the keys.
    * @param <V> The type of the values.
-   * @param identifier The identifier for this {@link SpillableByteMap}.
-   * @param bucket The bucket that this {@link SpillableByteMap} will be spilled to.
+   * @param identifier The identifier for this {@link SpillableMap}.
+   * @param bucket The bucket that this {@link SpillableMap} will be spilled to.
    * @param serdeKey The Serializer/Deserializer to use for the map's keys.
    * @param serdeValue The Serializer/Deserializer to use for the map's values.
-   * @return A {@link SpillableByteMap}.
+   * @return A {@link SpillableMap}.
    */
-  <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket,
+  <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket,
       Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue);
 
   /**
-   * This is a method for creating a {@link SpillableByteArrayListMultimap}. This method
+   * This is a method for creating a {@link SpillableListMultimap}. This method
    * auto-generates an identifier for the data structure.
    * @param <K> The type of the keys.
    * @param <V> The type of the values in the map's lists.
-   * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to.
+   * @param bucket The bucket that this {@link SpillableListMultimap} will be spilled to.
    * @param serdeKey The Serializer/Deserializer to use for the map's keys.
    * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
-   * @return A {@link SpillableByteArrayListMultimap}.
+   * @return A {@link SpillableListMultimap}.
    */
-  <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket, Serde<K,
+  <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
       Slice> serdeKey, Serde<V, Slice> serdeValue);
 
   /**
-   * This is a method for creating a {@link SpillableByteArrayListMultimap}.
+   * This is a method for creating a {@link SpillableListMultimap}.
    * @param <K> The type of the keys.
    * @param <V> The type of the values in the map's lists.
-   * @param identifier The identifier for this {@link SpillableByteArrayListMultimap}.
-   * @param bucket The bucket that this {@link SpillableByteArrayListMultimap} will be spilled to.
+   * @param identifier The identifier for this {@link SpillableListMultimap}.
+   * @param bucket The bucket that this {@link SpillableListMultimap} will be spilled to.
    * @param serdeKey The Serializer/Deserializer to use for the map's keys.
    * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
-   * @return A {@link SpillableByteArrayListMultimap}.
+   * @return A {@link SpillableListMultimap}.
    */
-  <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier, long bucket,
+  <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
       Serde<K, Slice> serdeKey,
       Serde<V, Slice> serdeValue);
 
@@ -121,24 +121,24 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
       Slice> serdeKey, Serde<V, Slice> serdeValue);
 
   /**
-   * This is a method for creating a {@link SpillableByteMultiset}. This method
+   * This is a method for creating a {@link SpillableMultiset}. This method
    * auto-generates an identifier for the data structure.
    * @param <T> The type of the elements.
-   * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to.
-   * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}.
-   * @return A {@link SpillableByteMultiset}.
+   * @param bucket The bucket that this {@link SpillableMultiset} will be spilled to.
+   * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
+   * @return A {@link SpillableMultiset}.
    */
-  <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde);
+  <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde);
 
   /**
-   * This is a method for creating a {@link SpillableByteMultiset}.
+   * This is a method for creating a {@link SpillableMultiset}.
    * @param <T> The type of the elements.
-   * @param identifier The identifier for this {@link SpillableByteMultiset}.
-   * @param bucket The bucket that this {@link SpillableByteMultiset} will be spilled to.
-   * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableByteMultiset}.
-   * @return A {@link SpillableByteMultiset}.
+   * @param identifier The identifier for this {@link SpillableMultiset}.
+   * @param bucket The bucket that this {@link SpillableMultiset} will be spilled to.
+   * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
+   * @return A {@link SpillableMultiset}.
    */
-  <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde);
+  <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde);
 
   /**
    * This is a method for creating a {@link SpillableQueue}. This method
@@ -153,7 +153,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
   /**
    * This is a method for creating a {@link SpillableQueue}.
    * @param <T> The type of the data stored in the {@link SpillableQueue}.
-   * @param identifier The identifier for this {@link SpillableByteArrayListMultimap}.
+   * @param identifier The identifier for this {@link SpillableListMultimap}.
    * @param bucket The bucket that this {@link SpillableQueue} will be spilled to.
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
    * @return A {@link SpillableQueue}.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index 9c3defc..aad219d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -66,14 +66,14 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
     this.identifierGenerator = Preconditions.checkNotNull(identifierGenerator);
   }
 
-  public <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
+  public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
   {
     SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde);
     componentList.add(list);
     return list;
   }
 
-  public <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde)
   {
     identifierGenerator.register(identifier);
     SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde);
@@ -81,39 +81,39 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
     return list;
   }
 
-  public <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey,
+  public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
       Serde<V, Slice> serdeValue)
   {
-    SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifierGenerator.next(),
+    SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifierGenerator.next(),
         bucket, serdeKey, serdeValue);
     componentList.add(map);
     return map;
   }
 
-  public <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
+  public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
       Serde<V, Slice> serdeValue)
   {
     identifierGenerator.register(identifier);
-    SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
+    SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
     componentList.add(map);
     return map;
   }
 
-  public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket, Serde<K,
+  public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
       Slice> serdeKey, Serde<V, Slice> serdeValue)
   {
-    SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store,
+    SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
         identifierGenerator.next(), bucket, serdeKey, serdeValue);
     componentList.add(map);
     return map;
   }
 
-  public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier, long bucket,
+  public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
       Serde<K, Slice> serdeKey,
       Serde<V, Slice> serdeValue)
   {
     identifierGenerator.register(identifier);
-    SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store,
+    SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
         identifier, bucket, serdeKey, serdeValue);
     componentList.add(map);
     return map;
@@ -128,12 +128,12 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
     return map;
   }
 
-  public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde)
+  public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");
   }
 
-  public <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
new file mode 100644
index 0000000..016aeec
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A Spillable implementation of {@link Map}
+ * @param <K> The types of keys.
+ * @param <V> The types of values.
+ *
+ * @since 3.5.0
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spillable.SpillableComponent,
+    Serializable
+{
+  private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
+  private transient MutableInt tempOffset = new MutableInt();
+
+  @NotNull
+  private SpillableStateStore store;
+  @NotNull
+  private byte[] identifier;
+  private long bucket;
+  @NotNull
+  private Serde<K, Slice> serdeKey;
+  @NotNull
+  private Serde<V, Slice> serdeValue;
+
+  private int size = 0;
+
+  private SpillableMapImpl()
+  {
+    //for kryo
+  }
+
+  /**
+   * Creats a {@link SpillableMapImpl}.
+   * @param store The {@link SpillableStateStore} in which to spill to.
+   * @param identifier The Id of this {@link SpillableMapImpl}.
+   * @param bucket The Id of the bucket used to store this
+   * {@link SpillableMapImpl} in the provided {@link SpillableStateStore}.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+   */
+  public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
+      Serde<V, Slice> serdeValue)
+  {
+    this.store = Preconditions.checkNotNull(store);
+    this.identifier = Preconditions.checkNotNull(identifier);
+    this.bucket = bucket;
+    this.serdeKey = Preconditions.checkNotNull(serdeKey);
+    this.serdeValue = Preconditions.checkNotNull(serdeValue);
+  }
+
+  public SpillableStateStore getStore()
+  {
+    return this.store;
+  }
+
+  @Override
+  public int size()
+  {
+    return size;
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return size == 0;
+  }
+
+  @Override
+  public boolean containsKey(Object o)
+  {
+    return get(o) != null;
+  }
+
+  @Override
+  public boolean containsValue(Object o)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public V get(Object o)
+  {
+    K key = (K)o;
+
+    if (cache.getRemovedKeys().contains(key)) {
+      return null;
+    }
+
+    V val = cache.get(key);
+
+    if (val != null) {
+      return val;
+    }
+
+    Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
+
+    if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
+      return null;
+    }
+
+    tempOffset.setValue(0);
+    return serdeValue.deserialize(valSlice, tempOffset);
+  }
+
+  @Override
+  public V put(K k, V v)
+  {
+    V value = get(k);
+
+    if (value == null) {
+      size++;
+    }
+
+    cache.put(k, v);
+
+    return value;
+  }
+
+  @Override
+  public V remove(Object o)
+  {
+    V value = get(o);
+
+    if (value != null) {
+      size--;
+    }
+
+    cache.remove((K)o);
+
+    return value;
+  }
+
+  @Override
+  public void putAll(Map<? extends K, ? extends V> map)
+  {
+    for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
+      put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void clear()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<K> keySet()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<V> values()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<Entry<K, V>> entrySet()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+    for (K key: cache.getChangedKeys()) {
+      store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
+          serdeValue.serialize(cache.get(key)));
+    }
+
+    for (K key: cache.getRemovedKeys()) {
+      store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
+          new Slice(ArrayUtils.EMPTY_BYTE_ARRAY));
+    }
+
+    cache.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
index 122cd2d..c2741b0 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -110,7 +110,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
   @NotNull
   private SpillableStateStore store;
   @NotNull
-  private SpillableByteMapImpl<T, ListNode<T>> map;
+  private SpillableMapImpl<T, ListNode<T>> map;
 
   private T head;
   private int size;
@@ -139,7 +139,7 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
   {
     this.store = Preconditions.checkNotNull(store);
 
-    map = new SpillableByteMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde));
+    map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde));
   }
 
   public void setSize(int size)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index c227ed7..98f60d2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -61,7 +61,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
   private transient WindowBoundedMapCache<K, SpillableSetImpl<V>> cache = new WindowBoundedMapCache<>();
 
   @NotNull
-  private SpillableByteMapImpl<Slice, Pair<Integer, V>> map;
+  private SpillableMapImpl<Slice, Pair<Integer, V>> map;
   private SpillableStateStore store;
   private byte[] identifier;
   private long bucket;
@@ -93,7 +93,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     this.serdeKey = Preconditions.checkNotNull(serdeKey);
     this.serdeValue = Preconditions.checkNotNull(serdeValue);
 
-    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue));
+    map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue));
   }
 
   public SpillableStateStore getStore()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
index ac77d1b..ac386ab 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
@@ -53,7 +53,7 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
   protected Serde<K, Slice> keySerde;
   protected Serde<V, Slice> valueSerde;
 
-  protected Spillable.SpillableByteMap<Pair<Window, K>, V> windowKeyToValueMap;
+  protected Spillable.SpillableMap<Pair<Window, K>, V> windowKeyToValueMap;
   protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap;
 
   private class KVIterator implements Iterator<Map.Entry<K, V>>
@@ -181,7 +181,7 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
     }
 
     if (windowKeyToValueMap == null) {
-      windowKeyToValueMap = scc.newSpillableByteMap(bucket, windowKeyPairSerde, valueSerde);
+      windowKeyToValueMap = scc.newSpillableMap(bucket, windowKeyPairSerde, valueSerde);
     }
     if (windowToKeysMap == null) {
       windowToKeysMap = scc.newSpillableSetMultimap(bucket, windowSerde, keySerde);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
index 81f5dbb..6666381 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
@@ -45,7 +45,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
   private Serde<Window, Slice> windowSerde;
   private Serde<T, Slice> valueSerde;
 
-  protected Spillable.SpillableByteMap<Window, T> windowToDataMap;
+  protected Spillable.SpillableMap<Window, T> windowToDataMap;
 
   public SpillableWindowedPlainStorage()
   {
@@ -134,7 +134,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
       valueSerde = new SerdeKryoSlice<>();
     }
     if (windowToDataMap == null) {
-      windowToDataMap = scc.newSpillableByteMap(bucket, windowSerde, valueSerde);
+      windowToDataMap = scc.newSpillableMap(bucket, windowSerde, valueSerde);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f5f1943d/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
new file mode 100644
index 0000000..82fb340
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class SpillableArrayListMultimapImplTest
+{
+  public static final byte[] ID1 = new byte[]{(byte)0};
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+  @Test
+  public void simpleMultiKeyTest()
+  {
+    InMemSpillableStateStore store = new InMemSpillableStateStore();
+
+    simpleMultiKeyTestHelper(store);
+  }
+
+  @Test
+  public void simpleMultiKeyManagedStateTest()
+  {
+    simpleMultiKeyTestHelper(testMeta.store);
+  }
+
+  public void simpleMultiKeyTestHelper(SpillableStateStore store)
+  {
+    SpillableArrayListMultimapImpl<String, String> map =
+        new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new SerdeStringSlice(),
+        new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long nextWindowId = 0L;
+    nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+    nextWindowId++;
+
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(1, map.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    nextWindowId = simpleMultiKeyTestHelper(store, map, "b", nextWindowId);
+    nextWindowId++;
+
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(2, map.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    simpleMultiKeyTestHelper(store, map, "c", nextWindowId);
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(3, map.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    map.teardown();
+    store.teardown();
+  }
+
+  public long simpleMultiKeyTestHelper(SpillableStateStore store,
+      SpillableArrayListMultimapImpl<String, String> map, String key, long nextWindowId)
+  {
+    SerdeStringSlice serdeString = new SerdeStringSlice();
+    SerdeIntSlice serdeInt = new SerdeIntSlice();
+
+    Slice keySlice = serdeString.serialize(key);
+
+    byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertNull(map.get(key));
+
+    Assert.assertFalse(map.containsKey(key));
+
+    map.put(key, "a");
+
+    Assert.assertTrue(map.containsKey(key));
+
+    List<String> list1 = map.get(key);
+    Assert.assertEquals(1, list1.size());
+
+    Assert.assertEquals("a", list1.get(0));
+
+    list1.addAll(Lists.newArrayList("a", "b", "c", "d", "e", "f", "g"));
+
+    Assert.assertEquals(8, list1.size());
+
+    Assert.assertEquals("a", list1.get(0));
+    Assert.assertEquals("a", list1.get(1));
+    Assert.assertEquals("b", list1.get(2));
+    Assert.assertEquals("c", list1.get(3));
+    Assert.assertEquals("d", list1.get(4));
+    Assert.assertEquals("e", list1.get(5));
+    Assert.assertEquals("f", list1.get(6));
+    Assert.assertEquals("g", list1.get(7));
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    SpillableTestUtils.checkValue(store, 0L,
+        SliceUtils.concatenate(keyBytes, SpillableArrayListMultimapImpl.SIZE_KEY_SUFFIX), 8, 0, serdeInt);
+
+    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e",
+        "f", "g"));
+
+    List<String> list2 = map.get(key);
+
+    Assert.assertEquals(8, list2.size());
+
+    Assert.assertEquals("a", list2.get(0));
+    Assert.assertEquals("a", list2.get(1));
+    Assert.assertEquals("b", list2.get(2));
+    Assert.assertEquals("c", list2.get(3));
+    Assert.assertEquals("d", list2.get(4));
+    Assert.assertEquals("e", list2.get(5));
+    Assert.assertEquals("f", list2.get(6));
+    Assert.assertEquals("g", list2.get(7));
+
+    list2.add("tt");
+    list2.add("ab");
+    list2.add("99");
+    list2.add("oo");
+
+    Assert.assertEquals("tt", list2.get(8));
+    Assert.assertEquals("ab", list2.get(9));
+    Assert.assertEquals("99", list2.get(10));
+    Assert.assertEquals("oo", list2.get(11));
+
+    Assert.assertEquals(12, list2.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    Assert.assertEquals(12, list2.size());
+
+    SpillableTestUtils.checkValue(store, 0L,
+        SliceUtils.concatenate(keyBytes, SpillableArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt);
+
+    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "a", "b", "c", "d", "e",
+        "f", "g", "tt", "ab", "99", "oo"));
+
+    List<String> list3 = map.get(key);
+
+    list3.set(1, "111");
+    list3.set(3, "222");
+    list3.set(5, "333");
+    list3.set(11, "444");
+
+    Assert.assertEquals("a", list3.get(0));
+    Assert.assertEquals("111", list3.get(1));
+    Assert.assertEquals("b", list3.get(2));
+    Assert.assertEquals("222", list3.get(3));
+    Assert.assertEquals("d", list3.get(4));
+    Assert.assertEquals("333", list3.get(5));
+    Assert.assertEquals("f", list3.get(6));
+    Assert.assertEquals("g", list3.get(7));
+    Assert.assertEquals("tt", list3.get(8));
+    Assert.assertEquals("ab", list3.get(9));
+    Assert.assertEquals("99", list3.get(10));
+    Assert.assertEquals("444", list3.get(11));
+
+    Assert.assertEquals(12, list2.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    nextWindowId++;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    SpillableTestUtils.checkValue(store, 0L,
+        SliceUtils.concatenate(keyBytes, SpillableArrayListMultimapImpl.SIZE_KEY_SUFFIX), 12, 0, serdeInt);
+
+    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d", "333",
+        "f", "g", "tt", "ab", "99", "444"));
+
+    map.endWindow();
+    store.endWindow();
+
+    return nextWindowId;
+  }
+
+  @Test
+  public void recoveryTestWithManagedState()
+  {
+    SpillableStateStore store = testMeta.store;
+
+    SpillableArrayListMultimapImpl<String, String> map =
+        new SpillableArrayListMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    long nextWindowId = 0L;
+    nextWindowId = simpleMultiKeyTestHelper(store, map, "a", nextWindowId);
+    long activationWindow = nextWindowId;
+    store.beforeCheckpoint(nextWindowId);
+    SpillableArrayListMultimapImpl<String, String> clonedMap = KryoCloneUtils.cloneObject(map);
+    store.checkpointed(nextWindowId);
+    store.committed(nextWindowId);
+
+    nextWindowId++;
+
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    List<String> list1 = map.get("a");
+
+    Assert.assertEquals(12, list1.size());
+
+    Assert.assertEquals("a", list1.get(0));
+    Assert.assertEquals("111", list1.get(1));
+    Assert.assertEquals("b", list1.get(2));
+    Assert.assertEquals("222", list1.get(3));
+    Assert.assertEquals("d", list1.get(4));
+    Assert.assertEquals("333", list1.get(5));
+    Assert.assertEquals("f", list1.get(6));
+    Assert.assertEquals("g", list1.get(7));
+    Assert.assertEquals("tt", list1.get(8));
+    Assert.assertEquals("ab", list1.get(9));
+    Assert.assertEquals("99", list1.get(10));
+    Assert.assertEquals("444", list1.get(11));
+
+    list1.add("111");
+
+    Assert.assertEquals("a", list1.get(0));
+    Assert.assertEquals("111", list1.get(1));
+    Assert.assertEquals("b", list1.get(2));
+    Assert.assertEquals("222", list1.get(3));
+    Assert.assertEquals("d", list1.get(4));
+    Assert.assertEquals("333", list1.get(5));
+    Assert.assertEquals("f", list1.get(6));
+    Assert.assertEquals("g", list1.get(7));
+    Assert.assertEquals("tt", list1.get(8));
+    Assert.assertEquals("ab", list1.get(9));
+    Assert.assertEquals("99", list1.get(10));
+    Assert.assertEquals("444", list1.get(11));
+    Assert.assertEquals("111", list1.get(12));
+
+    Assert.assertEquals(13, list1.size());
+
+    map.endWindow();
+    store.endWindow();
+
+    map.teardown();
+    store.teardown();
+
+    map = clonedMap;
+    store = map.getStore();
+
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+    attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, activationWindow);
+    Context.OperatorContext context =
+        new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+
+    store.setup(context);
+    map.setup(context);
+    nextWindowId = activationWindow + 1;
+    store.beginWindow(nextWindowId);
+    map.beginWindow(nextWindowId);
+
+    SerdeStringSlice serdeString = new SerdeStringSlice();
+    Slice keySlice = serdeString.serialize("a");
+    byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
+
+    SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d",
+        "333", "f", "g", "tt", "ab", "99", "444"));
+
+    Assert.assertEquals(1, map.size());
+    Assert.assertEquals(12, map.get("a").size());
+
+    map.endWindow();
+    store.endWindow();
+
+    map.teardown();
+    store.teardown();
+  }
+
+  @Test
+  public void testLoad()
+  {
+    Random random = new Random();
+    final int keySize = 1000000;
+    final int valueSize = 100000000;
+    final int numOfEntry = 100000;
+
+    SpillableStateStore store = testMeta.store;
+    SpillableArrayListMultimapImpl<String, String> multimap = new SpillableArrayListMultimapImpl<>(
+        this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+    Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+    Context.OperatorContext context =
+        new OperatorContextTestHelper.TestIdOperatorContext(testMeta.operatorContext.getId(), attributes);
+    store.setup(context);
+    multimap.setup(context);
+
+    store.beginWindow(1);
+    multimap.beginWindow(1);
+    for (int i = 0; i < numOfEntry; ++i) {
+      multimap.put(String.valueOf(random.nextInt(keySize)), String.valueOf(random.nextInt(valueSize)));
+    }
+    multimap.endWindow();
+    store.endWindow();
+  }
+}