You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/08/15 19:55:03 UTC

[3/3] apex-malhar git commit: APEXMALHAR-2048 Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap

APEXMALHAR-2048 Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9b6e11d8
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9b6e11d8
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9b6e11d8

Branch: refs/heads/master
Commit: 9b6e11d85accc88faa08d7b4a8daeb9b069fc878
Parents: d1fb2b6
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Sun Jul 17 14:32:34 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Mon Aug 15 12:15:21 2016 -0700

----------------------------------------------------------------------
 .../SequentialSpillableIdentifierGenerator.java |  84 +++
 .../malhar/lib/state/spillable/Spillable.java   |  14 +-
 .../state/spillable/SpillableArrayListImpl.java | 326 ++++++++++
 .../SpillableByteArrayListMultimapImpl.java     | 291 +++++++++
 .../state/spillable/SpillableByteMapImpl.java   | 235 ++++++++
 .../spillable/SpillableComplexComponent.java    |   4 +-
 .../SpillableComplexComponentImpl.java          | 193 ++++++
 .../spillable/SpillableIdentifierGenerator.java |  41 ++
 .../state/spillable/SpillableStateStore.java    |  35 ++
 .../state/spillable/TimeBasedPriorityQueue.java | 154 +++++
 .../state/spillable/WindowBoundedMapCache.java  | 129 ++++
 .../lib/state/spillable/WindowListener.java     |  42 ++
 .../state/spillable/inmem/InMemMultiset.java    | 161 -----
 .../inmem/InMemSpillableArrayList.java          | 175 ------
 .../InMemSpillableByteArrayListMultimap.java    | 154 -----
 .../inmem/InMemSpillableComplexComponent.java   | 117 ----
 .../inmem/InMemSpillableStateStore.java         | 118 ++++
 .../ManagedStateSpillableStateStore.java        |  34 ++
 .../lib/utils/serde/PassThruByteArraySerde.java |   2 +
 .../serde/PassThruByteArraySliceSerde.java      |  59 ++
 .../lib/utils/serde/PassThruSliceSerde.java     |  50 ++
 .../malhar/lib/utils/serde/SerdeIntSlice.java   |  52 ++
 .../malhar/lib/utils/serde/SerdeListSlice.java  | 109 ++++
 .../lib/utils/serde/SerdeStringSlice.java       |  53 ++
 .../apex/malhar/lib/utils/serde/SliceUtils.java | 101 ++++
 .../com/datatorrent/lib/util/TestUtils.java     |  25 +
 .../state/managed/ManagedStateTestUtils.java    |  13 +-
 .../spillable/inmem/InMemMultisetTest.java      |  44 --
 .../inmem/InMemSpillableArrayListTest.java      |  44 --
 ...InMemSpillableByteArrayListMultimapTest.java |  45 --
 ...uentialSpillableIdentifierGeneratorTest.java | 125 ++++
 .../spillable/SpillableArrayListImplTest.java   | 594 +++++++++++++++++++
 .../SpillableByteArrayListMultimapImplTest.java | 341 +++++++++++
 .../spillable/SpillableByteMapImplTest.java     | 484 +++++++++++++++
 .../SpillableComplexComponentImplTest.java      |  63 ++
 .../lib/state/spillable/SpillableTestUtils.java | 134 +++++
 .../spillable/TimeBasedPriorityQueueTest.java   | 134 +++++
 .../spillable/WindowBoundedMapCacheTest.java    | 116 ++++
 .../inmem/InMemorySpillableStateStoreTest.java  |  60 ++
 .../lib/utils/serde/SerdeListSliceTest.java     |  45 ++
 40 files changed, 4240 insertions(+), 760 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java
new file mode 100644
index 0000000..600fa98
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * This is an id generator that generates single byte ids for Spillable datastructures.
+ */
+@InterfaceStability.Evolving
+public class SequentialSpillableIdentifierGenerator implements SpillableIdentifierGenerator
+{
+  private boolean nextCalled = false;
+  private boolean done = false;
+  private byte currentIdentifier = 0;
+
+  private Set<Byte> registeredIdentifier = Sets.newHashSet();
+
+  @Override
+  public byte[] next()
+  {
+    Preconditions.checkState(!done);
+
+    nextCalled = true;
+
+    byte nextIndentifier = currentIdentifier;
+    seek();
+
+    return new byte[]{nextIndentifier};
+  }
+
+  @Override
+  public void register(byte[] identifierArray)
+  {
+    Preconditions.checkState(!nextCalled);
+    Preconditions.checkState(!done);
+    Preconditions.checkArgument(identifierArray.length == 1);
+
+    byte identifier = identifierArray[0];
+
+    Preconditions.checkState(identifier >= currentIdentifier &&
+        !registeredIdentifier.contains(identifier));
+
+    registeredIdentifier.add(identifier);
+
+    if (currentIdentifier == identifier) {
+      seek();
+    }
+  }
+
+  private void seek()
+  {
+    if (currentIdentifier == Byte.MAX_VALUE) {
+      done = true;
+    } else {
+      do {
+        currentIdentifier++;
+      } while (registeredIdentifier.contains(currentIdentifier) && currentIdentifier < Byte.MAX_VALUE);
+
+      done = currentIdentifier == Byte.MAX_VALUE && registeredIdentifier.contains(currentIdentifier);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
index 41a0efc..4c9b997 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
@@ -98,19 +98,7 @@ public interface Spillable
    * should implement this interface. A user working with an implementation of this interface needs
    * to make sure that the {@link com.datatorrent.api.Operator} call-backs are propagated to it.
    */
-  interface SpillableComponent extends Component<OperatorContext>, Spillable
+  interface SpillableComponent extends Component<OperatorContext>, Spillable, WindowListener
   {
-    /**
-     * This signals that the parent {@link com.datatorrent.api.Operator}'s
-     * {@link com.datatorrent.api.Operator#beginWindow(long)} method has been called.
-     * @param windowId The next windowId of the parent operator.
-     */
-    void beginWindow(long windowId);
-
-    /**
-     * This signals that the parent {@link com.datatorrent.api.Operator}'s
-     * {@link com.datatorrent.api.Operator#endWindow()} method has been called.
-     */
-    void endWindow();
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
new file mode 100644
index 0000000..5d46906
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
+ * @param <T> The type of object stored in the {@link SpillableArrayListImpl}.
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T>, Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+  private long bucketId;
+  private byte[] prefix;
+
+  @NotNull
+  private SpillableStateStore store;
+  @NotNull
+  private Serde<T, Slice> serde;
+  @NotNull
+  private SpillableByteMapImpl<Integer, List<T>> map;
+
+  private boolean sizeCached = false;
+  private int size;
+  private int numBatches;
+
+  private SpillableArrayListImpl()
+  {
+    //for kryo
+  }
+
+  public SpillableStateStore getStore()
+  {
+    return store;
+  }
+
+  /**
+   * Creates a {@link SpillableArrayListImpl}.
+   * @param bucketId The Id of the bucket used to store this
+   * {@link SpillableArrayListImpl} in the provided {@link SpillableStateStore}.
+   * @param prefix The Id of this {@link SpillableArrayListImpl}.
+   * @param store The {@link SpillableStateStore} in which to spill to.
+   * @param serde The {@link Serde} to use when serializing and deserializing data.
+   */
+  public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
+      @NotNull SpillableStateStore store,
+      @NotNull Serde<T, Slice> serde)
+  {
+    this.bucketId = bucketId;
+    this.prefix = Preconditions.checkNotNull(prefix);
+    this.store = Preconditions.checkNotNull(store);
+    this.serde = Preconditions.checkNotNull(serde);
+
+    map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), new SerdeListSlice(serde));
+  }
+
+  /**
+   * Creates a {@link SpillableArrayListImpl}.
+   * @param bucketId The Id of the bucket used to store this
+   * {@link SpillableArrayListImpl} in the provided {@link SpillableStateStore}.
+   * @param prefix The Id of this {@link SpillableArrayListImpl}.
+   * @param store The {@link SpillableStateStore} in which to spill to.
+   * @param serde The {@link Serde} to use when serializing and deserializing data.
+   * @param batchSize When spilled to a {@link SpillableStateStore} data is stored in a batch. This determines the
+   *                  number of elements a batch will contain when it's spilled. Having small batches will increase
+   *                  the number of keys stored by your {@link SpillableStateStore} but will improve random reads and
+   *                  writes. Increasing the batch size will improve sequential read and write speed.
+   */
+  public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
+      @NotNull SpillableStateStore store,
+      @NotNull Serde<T, Slice> serde,
+      int batchSize)
+  {
+    this(bucketId, prefix, store, serde);
+
+    Preconditions.checkArgument(this.batchSize > 0);
+    this.batchSize = batchSize;
+  }
+
+  public void setSize(int size)
+  {
+    Preconditions.checkArgument(size >= 0);
+    this.size = size;
+  }
+
+  @Override
+  public int size()
+  {
+    return size;
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return size == 0;
+  }
+
+  @Override
+  public boolean contains(Object o)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Iterator<T> iterator()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Object[] toArray()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T1> T1[] toArray(T1[] t1s)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean add(T t)
+  {
+    Preconditions.checkArgument((size() + 1) > 0);
+
+    int batchIndex = (size / batchSize);
+
+    List<T> batch = null;
+
+    if (batchIndex == numBatches) {
+      batch = Lists.newArrayListWithCapacity(batchSize);
+      numBatches++;
+    } else {
+      batch = map.get(batchIndex);
+    }
+
+    batch.add(t);
+
+    size++;
+    map.put(batchIndex, batch);
+    return true;
+  }
+
+  @Override
+  public boolean remove(Object o)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean containsAll(Collection<?> collection)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean addAll(Collection<? extends T> collection)
+  {
+    for (T element: collection) {
+      add(element);
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean addAll(int i, Collection<? extends T> collection)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean removeAll(Collection<?> collection)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean retainAll(Collection<?> collection)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public T get(int i)
+  {
+    if (!(i < size)) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    int batchIndex = i / batchSize;
+    int batchOffset = i % batchSize;
+
+    List<T> batch = map.get(batchIndex);
+    return batch.get(batchOffset);
+  }
+
+  @Override
+  public T set(int i, T t)
+  {
+    if (!(i < size)) {
+      throw new IndexOutOfBoundsException();
+    }
+
+    int batchIndex = i / batchSize;
+    int batchOffset = i % batchSize;
+
+    List<T> batch = map.get(batchIndex);
+    T old = batch.get(batchOffset);
+    batch.set(batchOffset, t);
+    map.put(batchIndex, batch);
+    return old;
+  }
+
+  @Override
+  public void add(int i, T t)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public T remove(int i)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int indexOf(Object o)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int lastIndexOf(Object o)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ListIterator<T> listIterator()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ListIterator<T> listIterator(int i)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<T> subList(int i, int i1)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    map.setup(context);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    map.beginWindow(windowId);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    map.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+    map.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
new file mode 100644
index 0000000..ba0bb77
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}.
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
+    Spillable.SpillableComponent
+{
+  public static final int DEFAULT_BATCH_SIZE = 1000;
+  public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
+
+  private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
+  private transient boolean isRunning = false;
+  private transient boolean isInWindow = false;
+
+  private int batchSize = DEFAULT_BATCH_SIZE;
+  @NotNull
+  private SpillableByteMapImpl<byte[], Integer> map;
+  private SpillableStateStore store;
+  private byte[] identifier;
+  private long bucket;
+  private Serde<K, Slice> serdeKey;
+  private Serde<V, Slice> serdeValue;
+
+  private SpillableByteArrayListMultimapImpl()
+  {
+    // for kryo
+  }
+
+  /**
+   * Creates a {@link SpillableByteArrayListMultimapImpl}.
+   * @param store The {@link SpillableStateStore} in which to spill to.
+   * @param identifier The Id of this {@link SpillableByteArrayListMultimapImpl}.
+   * @param bucket The Id of the bucket used to store this
+   * {@link SpillableByteArrayListMultimapImpl} in the provided {@link SpillableStateStore}.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+   */
+  public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
+      Serde<K, Slice> serdeKey,
+      Serde<V, Slice> serdeValue)
+  {
+    this.store = Preconditions.checkNotNull(store);
+    this.identifier = Preconditions.checkNotNull(identifier);
+    this.bucket = bucket;
+    this.serdeKey = Preconditions.checkNotNull(serdeKey);
+    this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+    map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
+  }
+
+  public SpillableStateStore getStore()
+  {
+    return store;
+  }
+
+  @Override
+  public List<V> get(@Nullable K key)
+  {
+    return getHelper(key);
+  }
+
+  private SpillableArrayListImpl<V> getHelper(@Nullable K key)
+  {
+    SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
+
+    if (spillableArrayList == null) {
+      Slice keySlice = serdeKey.serialize(key);
+      Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray());
+
+      if (size == null) {
+        return null;
+      }
+
+      Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
+      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+      spillableArrayList.setSize(size);
+    }
+
+    cache.put(key, spillableArrayList);
+
+    return spillableArrayList;
+  }
+
+  @Override
+  public Set<K> keySet()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Multiset<K> keys()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<V> values()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<Map.Entry<K, V>> entries()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public List<V> removeAll(@Nullable Object key)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void clear()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public int size()
+  {
+    return map.size();
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return map.isEmpty();
+  }
+
+  @Override
+  public boolean containsKey(@Nullable Object key)
+  {
+    return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
+        SIZE_KEY_SUFFIX).toByteArray());
+  }
+
+  @Override
+  public boolean containsValue(@Nullable Object value)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean containsEntry(@Nullable Object key, @Nullable Object value)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean put(@Nullable K key, @Nullable V value)
+  {
+    SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
+
+    if (spillableArrayList == null) {
+      Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
+      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+
+      cache.put(key, spillableArrayList);
+    }
+
+    spillableArrayList.add(value);
+    return true;
+  }
+
+  @Override
+  public boolean remove(@Nullable Object key, @Nullable Object value)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean putAll(@Nullable K key, Iterable<? extends V> values)
+  {
+    boolean changed = false;
+
+    for (V value: values) {
+      changed |= put(key, value);
+    }
+
+    return changed;
+  }
+
+  @Override
+  public boolean putAll(Multimap<? extends K, ? extends V> multimap)
+  {
+    boolean changed = false;
+
+    for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) {
+      changed |= put(entry.getKey(), entry.getValue());
+    }
+
+    return changed;
+  }
+
+  @Override
+  public List<V> replaceValues(K key, Iterable<? extends V> values)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Map<K, Collection<V>> asMap()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    map.setup(context);
+    isRunning = true;
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    map.beginWindow(windowId);
+    isInWindow = true;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    isInWindow = false;
+    for (K key: cache.getChangedKeys()) {
+
+      SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
+      spillableArrayList.endWindow();
+
+      Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX).toByteArray(),
+          spillableArrayList.size());
+    }
+
+    Preconditions.checkState(cache.getRemovedKeys().isEmpty());
+    cache.endWindow();
+    map.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+    isRunning = false;
+    map.teardown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
new file mode 100644
index 0000000..da313ee
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A Spillable implementation of {@link Map}
+ * @param <K> The types of keys.
+ * @param <V> The types of values.
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent,
+    Serializable
+{
+  private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
+  private transient MutableInt tempOffset = new MutableInt();
+
+  @NotNull
+  private SpillableStateStore store;
+  @NotNull
+  private byte[] identifier;
+  private long bucket;
+  @NotNull
+  private Serde<K, Slice> serdeKey;
+  @NotNull
+  private Serde<V, Slice> serdeValue;
+
+  private int size = 0;
+
+  private SpillableByteMapImpl()
+  {
+    //for kryo
+  }
+
+  /**
+   * Creats a {@link SpillableByteMapImpl}.
+   * @param store The {@link SpillableStateStore} in which to spill to.
+   * @param identifier The Id of this {@link SpillableByteMapImpl}.
+   * @param bucket The Id of the bucket used to store this
+   * {@link SpillableByteMapImpl} in the provided {@link SpillableStateStore}.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+   */
+  public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
+      Serde<V, Slice> serdeValue)
+  {
+    this.store = Preconditions.checkNotNull(store);
+    this.identifier = Preconditions.checkNotNull(identifier);
+    this.bucket = bucket;
+    this.serdeKey = Preconditions.checkNotNull(serdeKey);
+    this.serdeValue = Preconditions.checkNotNull(serdeValue);
+  }
+
+  public SpillableStateStore getStore()
+  {
+    return this.store;
+  }
+
+  @Override
+  public int size()
+  {
+    return size;
+  }
+
+  @Override
+  public boolean isEmpty()
+  {
+    return size == 0;
+  }
+
+  @Override
+  public boolean containsKey(Object o)
+  {
+    return get(o) != null;
+  }
+
+  @Override
+  public boolean containsValue(Object o)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public V get(Object o)
+  {
+    K key = (K)o;
+
+    if (cache.getRemovedKeys().contains(key)) {
+      return null;
+    }
+
+    V val = cache.get(key);
+
+    if (val != null) {
+      return val;
+    }
+
+    Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
+
+    if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
+      return null;
+    }
+
+    tempOffset.setValue(0);
+    return serdeValue.deserialize(valSlice, tempOffset);
+  }
+
+  @Override
+  public V put(K k, V v)
+  {
+    V value = get(k);
+
+    if (value == null) {
+      size++;
+    }
+
+    cache.put(k, v);
+
+    return value;
+  }
+
+  @Override
+  public V remove(Object o)
+  {
+    V value = get(o);
+
+    if (value != null) {
+      size--;
+    }
+
+    cache.remove((K)o);
+
+    return value;
+  }
+
+  @Override
+  public void putAll(Map<? extends K, ? extends V> map)
+  {
+    for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
+      put(entry.getKey(), entry.getValue());
+    }
+  }
+
+  @Override
+  public void clear()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<K> keySet()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Collection<V> values()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Set<Entry<K, V>> entrySet()
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  @Override
+  public void endWindow()
+  {
+    for (K key: cache.getChangedKeys()) {
+      store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
+          serdeValue.serialize(cache.get(key)));
+    }
+
+    for (K key: cache.getRemovedKeys()) {
+      store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
+          new Slice(ArrayUtils.EMPTY_BYTE_ARRAY));
+    }
+
+    cache.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index 29da3f5..c63c7ef 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -23,6 +23,7 @@ import org.apache.apex.malhar.lib.utils.serde.Serde;
 
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
 import com.datatorrent.netlet.util.Slice;
 
 /**
@@ -31,7 +32,8 @@ import com.datatorrent.netlet.util.Slice;
  *
  * @since 3.4.0
  */
-public interface SpillableComplexComponent extends Component<OperatorContext>, SpillableComponent
+public interface SpillableComplexComponent extends Component<OperatorContext>, SpillableComponent,
+    Operator.CheckpointNotificationListener
 {
   /**
    * This is a method for creating a {@link SpillableArrayList}. This method

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
new file mode 100644
index 0000000..b31adfd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is a factory that is used for Spillable datastructures. This component is used by nesting it inside of an
+ * operator and forwarding the appropriate operator callbacks are called on the {@link SpillableComplexComponentImpl}.
+ * Spillable datastructures are created by called the appropriate factory methods on the
+ * {@link SpillableComplexComponentImpl} in the setup method of an operator.
+ */
+@InterfaceStability.Evolving
+public class SpillableComplexComponentImpl implements SpillableComplexComponent
+{
+  private List<SpillableComponent> componentList = Lists.newArrayList();
+
+  @NotNull
+  private SpillableStateStore store;
+
+  @NotNull
+  private SpillableIdentifierGenerator identifierGenerator;
+
+  private SpillableComplexComponentImpl()
+  {
+    // for kryo
+  }
+
+  public SpillableComplexComponentImpl(SpillableStateStore store)
+  {
+    this(store, new SequentialSpillableIdentifierGenerator());
+  }
+
+  public SpillableComplexComponentImpl(SpillableStateStore store, SpillableIdentifierGenerator identifierGenerator)
+  {
+    this.store = Preconditions.checkNotNull(store);
+    this.identifierGenerator = Preconditions.checkNotNull(identifierGenerator);
+  }
+
+  public <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
+  {
+    SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde);
+    componentList.add(list);
+    return list;
+  }
+
+  public <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  {
+    identifierGenerator.register(identifier);
+    SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde);
+    componentList.add(list);
+    return list;
+  }
+
+  public <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey,
+      Serde<V, Slice> serdeValue)
+  {
+    SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifierGenerator.next(),
+        bucket, serdeKey, serdeValue);
+    componentList.add(map);
+    return map;
+  }
+
+  public <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
+      Serde<V, Slice> serdeValue)
+  {
+    identifierGenerator.register(identifier);
+    SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
+    componentList.add(map);
+    return map;
+  }
+
+  public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket, Serde<K,
+      Slice> serdeKey, Serde<V, Slice> serdeValue)
+  {
+    SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store,
+        identifierGenerator.next(), bucket, serdeKey, serdeValue);
+    componentList.add(map);
+    return map;
+  }
+
+  public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier, long bucket,
+      Serde<K, Slice> serdeKey,
+      Serde<V, Slice> serdeValue)
+  {
+    identifierGenerator.register(identifier);
+    SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store,
+        identifier, bucket, serdeKey, serdeValue);
+    componentList.add(map);
+    return map;
+  }
+
+  public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde)
+  {
+    throw new UnsupportedOperationException("Unsupported Operation");
+  }
+
+  public <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  {
+    throw new UnsupportedOperationException("Unsupported Operation");
+  }
+
+  public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde)
+  {
+    throw new UnsupportedOperationException("Unsupported Operation");
+  }
+
+  public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  {
+    throw new UnsupportedOperationException("Unsupported Operation");
+  }
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    store.setup(context);
+    for (SpillableComponent spillableComponent: componentList) {
+      spillableComponent.setup(context);
+    }
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    store.beginWindow(windowId);
+    for (SpillableComponent spillableComponent: componentList) {
+      spillableComponent.beginWindow(windowId);
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    for (SpillableComponent spillableComponent: componentList) {
+      spillableComponent.endWindow();
+    }
+    store.endWindow();
+  }
+
+  @Override
+  public void teardown()
+  {
+    for (SpillableComponent spillableComponent: componentList) {
+      spillableComponent.teardown();
+    }
+    store.teardown();
+  }
+
+  @Override
+  public void beforeCheckpoint(long l)
+  {
+    store.beforeCheckpoint(l);
+  }
+
+  @Override
+  public void checkpointed(long l)
+  {
+    store.checkpointed(l);
+  }
+
+  @Override
+  public void committed(long l)
+  {
+    store.committed(l);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java
new file mode 100644
index 0000000..17a52f0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Classes implementing this interface can be used as generators for identifiers for Spillable data structures. This is
+ * mainly used in implementations of {@link SpillableComplexComponent}.
+ */
+@InterfaceStability.Evolving
+public interface SpillableIdentifierGenerator
+{
+  /**
+   * Generators the next valid identifier for a Spillable data structure.
+   * @return A byte array which represents the next valid identifier for a Spillable data structure.
+   */
+  byte[] next();
+
+  /**
+   * Registers the given identifier with this {@link SpillableIdentifierGenerator}.
+   * @param identifier The identifier to register with this {@link SpillableIdentifierGenerator}.
+   */
+  void register(byte[] identifier);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
new file mode 100644
index 0000000..1db0eeb
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+/**
+ * Implementations of this interface are used by Spillable datastructures to spill data to disk.
+ */
+@InterfaceStability.Evolving
+public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
+    Operator.CheckpointNotificationListener, WindowListener
+{
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
new file mode 100644
index 0000000..025c501
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * A simple priority queue where the priority of an object is determined by the time at which it is inserted into the
+ * queue. The object in the queue with the smallest time stamp is the first to be dequeued.
+ * @param <T> The type of the objects inserted into the queue.
+ */
+@InterfaceStability.Evolving
+public class TimeBasedPriorityQueue<T>
+{
+  private Map<T, TimeWrapper<T>> timeWrappperMap = Maps.newHashMap();
+  private Set<TimeWrapper<T>> sortedTimestamp = Sets.newTreeSet();
+
+  public void upSert(T value)
+  {
+    TimeWrapper<T> timeWrapper = timeWrappperMap.get(value);
+
+    if (timeWrapper != null) {
+      sortedTimestamp.remove(timeWrapper);
+      timeWrapper.setTimestamp(System.currentTimeMillis());
+    } else {
+      timeWrapper = new TimeWrapper<>(value, System.currentTimeMillis());
+      timeWrappperMap.put(value, timeWrapper);
+    }
+
+    sortedTimestamp.add(timeWrapper);
+  }
+
+  public void remove(T value)
+  {
+    TimeWrapper<T> timeWrapper = timeWrappperMap.get(value);
+    sortedTimestamp.remove(timeWrapper);
+    timeWrappperMap.remove(value);
+  }
+
+  public Set<T> removeLRU(int count)
+  {
+    Preconditions.checkArgument(count > 0 && count <= timeWrappperMap.size());
+
+    Iterator<TimeWrapper<T>> iterator = sortedTimestamp.iterator();
+    Set<T> valueSet = Sets.newHashSet();
+
+    for (int counter = 0; counter < count; counter++) {
+      T value = iterator.next().getKey();
+      valueSet.add(value);
+      timeWrappperMap.remove(value);
+      iterator.remove();
+    }
+
+    return valueSet;
+  }
+
+  protected static class TimeWrapper<T> implements Comparable<TimeWrapper<T>>
+  {
+    private T key;
+    private long timestamp;
+
+    public TimeWrapper(T key, long timestamp)
+    {
+      this.key = Preconditions.checkNotNull(key);
+      this.timestamp = timestamp;
+    }
+
+    public T getKey()
+    {
+      return key;
+    }
+
+    public long getTimestamp()
+    {
+      return timestamp;
+    }
+
+    public void setTimestamp(long timestamp)
+    {
+      this.timestamp = timestamp;
+    }
+
+    @Override
+    public int compareTo(TimeWrapper<T> timeWrapper)
+    {
+      if (this.timestamp < timeWrapper.getTimestamp()) {
+        return -1;
+      } else if (this.timestamp > timeWrapper.getTimestamp()) {
+        return 1;
+      }
+
+      return 0;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      TimeWrapper<?> that = (TimeWrapper<?>)o;
+
+      return key.equals(that.key);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return key.hashCode();
+    }
+
+    @Override
+    public String toString()
+    {
+      return "TimeWrapper{" +
+          "key=" + key +
+          ", timestamp=" + timestamp +
+          '}';
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(TimeBasedPriorityQueue.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
new file mode 100644
index 0000000..fcf219d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * This is an LRU cache with a maximum size. When the cache size is exceeded, the excess elements are kept in the
+ * cache until the end of the window. When the end of the window is reached, the least recently used entries are
+ * evicted from the cache.
+ * @param <K> The type of the keys.
+ * @param <V> The type of the values.
+ */
+@InterfaceStability.Evolving
+public class WindowBoundedMapCache<K, V>
+{
+  public static final int DEFAULT_MAX_SIZE = 50000;
+
+  private int maxSize = DEFAULT_MAX_SIZE;
+
+  private Map<K, V> cache = Maps.newHashMap();
+
+  private Set<K> changedKeys = Sets.newHashSet();
+  private Set<K> removedKeys = Sets.newHashSet();
+  private TimeBasedPriorityQueue<K> priorityQueue = new TimeBasedPriorityQueue<>();
+
+  public WindowBoundedMapCache()
+  {
+  }
+
+  public WindowBoundedMapCache(int maxSize)
+  {
+    Preconditions.checkArgument(maxSize > 0);
+
+    this.maxSize = maxSize;
+  }
+
+  public void put(K key, V value)
+  {
+    Preconditions.checkNotNull(key);
+    Preconditions.checkNotNull(value);
+
+    removedKeys.remove(key);
+    changedKeys.add(key);
+    priorityQueue.upSert(key);
+
+    cache.put(key, value);
+  }
+
+  public V get(K key)
+  {
+    Preconditions.checkNotNull(key);
+
+    return cache.get(key);
+  }
+
+  public boolean contains(K key)
+  {
+    return cache.containsKey(key);
+  }
+
+  public void remove(K key)
+  {
+    Preconditions.checkNotNull(key);
+
+    if (!cache.containsKey(key)) {
+      return;
+    }
+
+    cache.remove(key);
+    changedKeys.remove(key);
+    removedKeys.add(key);
+    priorityQueue.remove(key);
+  }
+
+  public Set<K> getChangedKeys()
+  {
+    return changedKeys;
+  }
+
+  public Set<K> getRemovedKeys()
+  {
+    return removedKeys;
+  }
+
+  /*
+    Note: beginWindow is intentionally not implemented because many users need a cache that does not require
+    beginWindow to be called.
+   */
+
+  public void endWindow()
+  {
+    int count = cache.size() - maxSize;
+
+    if (count > 0) {
+      Set<K> expiredKeys = priorityQueue.removeLRU(count);
+
+      for (K expiredKey: expiredKeys) {
+        cache.remove(expiredKey);
+      }
+    }
+
+    changedKeys = Sets.newHashSet();
+    removedKeys = Sets.newHashSet();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java
new file mode 100644
index 0000000..fa8cd9f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Operator;
+
+/**
+ * This interface represents components which need to listen to the operator {@link Operator#beginWindow(long)} and
+ * {@link Operator#endWindow()} callbacks.
+ */
+@InterfaceStability.Evolving
+public interface WindowListener
+{
+  /**
+   * This is called when the parent {@link Operator}'s {@link Operator#beginWindow(long)} callback is called.
+   * @param windowId The id of the current application window.
+   */
+  void beginWindow(long windowId);
+
+  /**
+   * This is called when the parent {@link Operator}'s {@link Operator#endWindow()} callback is called.
+   */
+  void endWindow();
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java
deleted file mode 100644
index fa7bf08..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable.inmem;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
-
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.collect.HashMultiset;
-
-/**
- * An in memory implementation of the {@link Spillable.SpillableByteMultiset} interface.
- * @param <T> The type of the data stored in the {@link InMemMultiset}
- */
-public class InMemMultiset<T> implements Spillable.SpillableByteMultiset<T>
-{
-  @FieldSerializer.Bind(JavaSerializer.class)
-  private HashMultiset<T> multiset = HashMultiset.create();
-
-  @Override
-  public int count(@Nullable Object element)
-  {
-    return multiset.count(element);
-  }
-
-  @Override
-  public int add(@Nullable T element, int occurrences)
-  {
-    return multiset.add(element, occurrences);
-  }
-
-  @Override
-  public int remove(@Nullable Object element, int occurrences)
-  {
-    return multiset.remove(element, occurrences);
-  }
-
-  @Override
-  public int setCount(T element, int count)
-  {
-    return multiset.setCount(element, count);
-  }
-
-  @Override
-  public boolean setCount(T element, int oldCount, int newCount)
-  {
-    return multiset.setCount(element, oldCount, newCount);
-  }
-
-  @Override
-  public Set<T> elementSet()
-  {
-    return multiset.elementSet();
-  }
-
-  @Override
-  public Set<Entry<T>> entrySet()
-  {
-    return multiset.entrySet();
-  }
-
-  @Override
-  public Iterator<T> iterator()
-  {
-    return multiset.iterator();
-  }
-
-  @Override
-  public Object[] toArray()
-  {
-    return multiset.toArray();
-  }
-
-  @Override
-  public <T1> T1[] toArray(T1[] t1s)
-  {
-    return multiset.toArray(t1s);
-  }
-
-  @Override
-  public int size()
-  {
-    return multiset.size();
-  }
-
-  @Override
-  public boolean isEmpty()
-  {
-    return multiset.isEmpty();
-  }
-
-  @Override
-  public boolean contains(@Nullable Object element)
-  {
-    return multiset.contains(element);
-  }
-
-  @Override
-  public boolean containsAll(Collection<?> es)
-  {
-    return multiset.containsAll(es);
-  }
-
-  @Override
-  public boolean addAll(Collection<? extends T> collection)
-  {
-    return multiset.addAll(collection);
-  }
-
-  @Override
-  public boolean add(T element)
-  {
-    return multiset.add(element);
-  }
-
-  @Override
-  public boolean remove(@Nullable Object element)
-  {
-    return multiset.remove(element);
-  }
-
-  @Override
-  public boolean removeAll(Collection<?> c)
-  {
-    return multiset.removeAll(c);
-  }
-
-  @Override
-  public boolean retainAll(Collection<?> c)
-  {
-    return multiset.retainAll(c);
-  }
-
-  @Override
-  public void clear()
-  {
-    multiset.clear();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java
deleted file mode 100644
index 9742537..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable.inmem;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
-
-import com.google.common.collect.Lists;
-
-/**
- * An in memory implementation of the {@link Spillable.SpillableArrayList} interface.
- * @param <T> The type of the data stored in the {@link InMemSpillableArrayList}
- */
-public class InMemSpillableArrayList<T> implements Spillable.SpillableArrayList<T>
-{
-  private List<T> list = Lists.newArrayList();
-
-  @Override
-  public int size()
-  {
-    return list.size();
-  }
-
-  @Override
-  public boolean isEmpty()
-  {
-    return list.isEmpty();
-  }
-
-  @Override
-  public boolean contains(Object o)
-  {
-    return list.contains(o);
-  }
-
-  @Override
-  public Iterator<T> iterator()
-  {
-    return list.iterator();
-  }
-
-  @Override
-  public Object[] toArray()
-  {
-    return list.toArray();
-  }
-
-  @Override
-  public <T1> T1[] toArray(T1[] t1s)
-  {
-    return list.toArray(t1s);
-  }
-
-  @Override
-  public boolean add(T t)
-  {
-    return list.add(t);
-  }
-
-  @Override
-  public boolean remove(Object o)
-  {
-    return list.remove(o);
-  }
-
-  @Override
-  public boolean containsAll(Collection<?> collection)
-  {
-    return list.containsAll(collection);
-  }
-
-  @Override
-  public boolean addAll(Collection<? extends T> collection)
-  {
-    return list.addAll(collection);
-  }
-
-  @Override
-  public boolean addAll(int i, Collection<? extends T> collection)
-  {
-    return list.addAll(i, collection);
-  }
-
-  @Override
-  public boolean removeAll(Collection<?> collection)
-  {
-    return list.removeAll(collection);
-  }
-
-  @Override
-  public boolean retainAll(Collection<?> collection)
-  {
-    return list.retainAll(collection);
-  }
-
-  @Override
-  public void clear()
-  {
-    list.clear();
-  }
-
-  @Override
-  public T get(int i)
-  {
-    return list.get(i);
-  }
-
-  @Override
-  public T set(int i, T t)
-  {
-    return list.set(i, t);
-  }
-
-  @Override
-  public void add(int i, T t)
-  {
-    list.add(i, t);
-  }
-
-  @Override
-  public T remove(int i)
-  {
-    return list.remove(i);
-  }
-
-  @Override
-  public int indexOf(Object o)
-  {
-    return list.indexOf(o);
-  }
-
-  @Override
-  public int lastIndexOf(Object o)
-  {
-    return list.lastIndexOf(o);
-  }
-
-  @Override
-  public ListIterator<T> listIterator()
-  {
-    return list.listIterator();
-  }
-
-  @Override
-  public ListIterator<T> listIterator(int i)
-  {
-    return list.listIterator(i);
-  }
-
-  @Override
-  public List<T> subList(int i, int i1)
-  {
-    return list.subList(i, i1);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java
deleted file mode 100644
index 8376bd5..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable.inmem;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
-
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multiset;
-
-/**
- * An in memory implementation of the {@link Spillable.SpillableByteArrayListMultimap} interface.
- * @param <K> The type of the keys stored in the {@link InMemSpillableByteArrayListMultimap}
- * @param <V> The type of the values stored in the {@link InMemSpillableByteArrayListMultimap}
- */
-public class InMemSpillableByteArrayListMultimap<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>
-{
-  @FieldSerializer.Bind(JavaSerializer.class)
-  private ListMultimap<K, V> multimap = ArrayListMultimap.create();
-
-  @Override
-  public List<V> get(@Nullable K key)
-  {
-    return multimap.get(key);
-  }
-
-  @Override
-  public Set<K> keySet()
-  {
-    return multimap.keySet();
-  }
-
-  @Override
-  public Multiset<K> keys()
-  {
-    return multimap.keys();
-  }
-
-  @Override
-  public Collection<V> values()
-  {
-    return multimap.values();
-  }
-
-  @Override
-  public Collection<Map.Entry<K, V>> entries()
-  {
-    return multimap.entries();
-  }
-
-  @Override
-  public List<V> removeAll(@Nullable Object key)
-  {
-    return multimap.removeAll(key);
-  }
-
-  @Override
-  public void clear()
-  {
-    multimap.clear();
-  }
-
-  @Override
-  public int size()
-  {
-    return multimap.size();
-  }
-
-  @Override
-  public boolean isEmpty()
-  {
-    return multimap.isEmpty();
-  }
-
-  @Override
-  public boolean containsKey(@Nullable Object key)
-  {
-    return multimap.containsKey(key);
-  }
-
-  @Override
-  public boolean containsValue(@Nullable Object value)
-  {
-    return multimap.containsValue(value);
-  }
-
-  @Override
-  public boolean containsEntry(@Nullable Object key, @Nullable Object value)
-  {
-    return multimap.containsEntry(key, value);
-  }
-
-  @Override
-  public boolean put(@Nullable K key, @Nullable V value)
-  {
-    return multimap.put(key, value);
-  }
-
-  @Override
-  public boolean remove(@Nullable Object key, @Nullable Object value)
-  {
-    return multimap.remove(key, value);
-  }
-
-  @Override
-  public boolean putAll(@Nullable K key, Iterable<? extends V> values)
-  {
-    return multimap.putAll(key, values);
-  }
-
-  @Override
-  public boolean putAll(Multimap<? extends K, ? extends V> m)
-  {
-    return multimap.putAll(m);
-  }
-
-  @Override
-  public List<V> replaceValues(K key, Iterable<? extends V> values)
-  {
-    return multimap.replaceValues(key, values);
-  }
-
-  @Override
-  public Map<K, Collection<V>> asMap()
-  {
-    return multimap.asMap();
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java
deleted file mode 100644
index 25e8b2c..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable.inmem;
-
-import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
-import org.apache.apex.malhar.lib.utils.serde.Serde;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * An in memory implementation {@link SpillableComplexComponent}
- */
-public class InMemSpillableComplexComponent implements SpillableComplexComponent
-{
-  @Override
-  public <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
-  {
-    return new InMemSpillableArrayList<>();
-  }
-
-  @Override
-  public <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket,
-      Serde<T, Slice> serde)
-  {
-    return new InMemSpillableArrayList<>();
-  }
-
-  @Override
-  public <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket,
-      Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue)
-  {
-    return new InMemSpillableByteArrayListMultimap<>();
-  }
-
-  @Override
-  public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier,
-      long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue)
-  {
-    return  new InMemSpillableByteArrayListMultimap<>();
-  }
-
-  @Override
-  public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde)
-  {
-    return new InMemMultiset<>();
-  }
-
-  @Override
-  public <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket,
-      Serde<T, Slice> serde)
-  {
-    return new InMemMultiset<>();
-  }
-
-  @Override
-  public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde)
-  {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void setup(Context.OperatorContext context)
-  {
-  }
-
-  @Override
-  public void beginWindow(long windowId)
-  {
-  }
-
-  @Override
-  public void endWindow()
-  {
-  }
-
-  @Override
-  public void teardown()
-  {
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
new file mode 100644
index 0000000..0e65344
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable.inmem;
+
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A simple in memory implementation of a {@link SpillableStateStore} backed by a {@link Map}.
+ */
+@InterfaceStability.Evolving
+public class InMemSpillableStateStore implements SpillableStateStore
+{
+  private Map<Long, Map<Slice, Slice>> store = Maps.newHashMap();
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+
+  }
+
+  @Override
+  public void endWindow()
+  {
+
+  }
+
+  @Override
+  public void teardown()
+  {
+
+  }
+
+  @Override
+  public void put(long bucketId, @NotNull Slice key, @NotNull Slice value)
+  {
+    Map<Slice, Slice> bucket = store.get(bucketId);
+
+    if (bucket == null) {
+      bucket = Maps.newHashMap();
+      store.put(bucketId, bucket);
+    }
+
+    bucket.put(key, value);
+  }
+
+  @Override
+  public Slice getSync(long bucketId, @NotNull Slice key)
+  {
+    Map<Slice, Slice> bucket = store.get(bucketId);
+
+    if (bucket == null) {
+      bucket = Maps.newHashMap();
+      store.put(bucketId, bucket);
+    }
+
+    return bucket.get(key);
+  }
+
+  @Override
+  public Future<Slice> getAsync(long bucketId, @NotNull Slice key)
+  {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void beforeCheckpoint(long l)
+  {
+  }
+
+  @Override
+  public void checkpointed(long l)
+  {
+  }
+
+  @Override
+  public void committed(long l)
+  {
+  }
+
+  @Override
+  public String toString()
+  {
+    return store.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java
new file mode 100644
index 0000000..6d68acc
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable.managed;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+
+@DefaultSerializer(FieldSerializer.class)
+public class ManagedStateSpillableStateStore extends ManagedStateImpl implements SpillableStateStore
+{
+  public ManagedStateSpillableStateStore()
+  {
+    super();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
index 85c34d9..9669981 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
@@ -19,6 +19,7 @@
 package org.apache.apex.malhar.lib.utils.serde;
 
 import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * This is a simple pass through {@link Serde}. When serialization is performed the input byte array is returned.
@@ -26,6 +27,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
  *
  * @since 3.4.0
  */
+@InterfaceStability.Evolving
 public class PassThruByteArraySerde implements Serde<byte[], byte[]>
 {
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
new file mode 100644
index 0000000..436e7f8
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is a simple {@link Serde} which serializes and deserializes byte arrays to {@link Slice}s. A byte array is
+ * serialized by simply wrapping it in a {@link Slice} object and deserialized by simply reading the byte array
+ * out of the {@link Slice} object.
+ *
+ * <b>Note:</b> The deserialized method doesn't use the offset argument in this implementation.
+ */
+public class PassThruByteArraySliceSerde implements Serde<byte[], Slice>
+{
+  @Override
+  public Slice serialize(byte[] object)
+  {
+    return new Slice(object);
+  }
+
+  @Override
+  public byte[] deserialize(Slice object, MutableInt offset)
+  {
+    offset.add(object.length);
+
+    if (object.offset == 0) {
+      return object.buffer;
+    }
+
+    byte[] bytes = new byte[object.length];
+    System.arraycopy(object.buffer, object.offset, bytes, 0, object.length);
+    return bytes;
+  }
+
+  @Override
+  public byte[] deserialize(Slice object)
+  {
+    return deserialize(object, new MutableInt(0));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
new file mode 100644
index 0000000..f9d93b3
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is a {@link Serde} implementation which simply allows an input slice to pass through. No serialization or
+ * deserialization transformation is performed on the input {@link Slice}s.
+ */
+@InterfaceStability.Evolving
+public class PassThruSliceSerde implements Serde<Slice, Slice>
+{
+  @Override
+  public Slice serialize(Slice object)
+  {
+    return object;
+  }
+
+  @Override
+  public Slice deserialize(Slice object, MutableInt offset)
+  {
+    return object;
+  }
+
+  @Override
+  public Slice deserialize(Slice object)
+  {
+    return object;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
new file mode 100644
index 0000000..c18af33
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ */
+@InterfaceStability.Evolving
+public class SerdeIntSlice implements Serde<Integer, Slice>
+{
+  @Override
+  public Slice serialize(Integer object)
+  {
+    return new Slice(GPOUtils.serializeInt(object));
+  }
+
+  @Override
+  public Integer deserialize(Slice slice, MutableInt offset)
+  {
+    int val = GPOUtils.deserializeInt(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
+    offset.add(4);
+    return val;
+  }
+
+  @Override
+  public Integer deserialize(Slice object)
+  {
+    return deserialize(object, new MutableInt(0));
+  }
+}