You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/24 20:38:56 UTC

[1/3] apex-malhar git commit: APEXMALHAR-2190 #resolve #comment Use reusable buffer for serialization in spillable data structures

Repository: apex-malhar
Updated Branches:
  refs/heads/master 37991576d -> 2fa1e6b16


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
new file mode 100644
index 0000000..fa4cd73
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.WindowListener;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+
+/**
+ * This is a stream which manages blocks and supports window related operations.
+ *
+ */
+public class WindowedBlockStream extends BlockStream implements WindowListener, WindowCompleteListener
+{
+  private static final Logger logger = LoggerFactory.getLogger(WindowedBlockStream.class);
+  /**
+   * Map from windowId to blockIds
+   */
+  protected SetMultimap<Long, Integer> windowToBlockIds = HashMultimap.create();
+
+  /**
+   * set of all free blockIds.
+   */
+  protected Set<Integer> freeBlockIds = Sets.newHashSet();
+
+  // max block index; must be >= 0
+  protected int maxBlockIndex = 0;
+
+  protected long currentWindowId;
+
+  /**
+   * This lock is used for adding/removing block(s)
+   */
+  protected transient ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  protected BlockReleaseStrategy releaseStrategy = new DefaultBlockReleaseStrategy();
+
+  public WindowedBlockStream()
+  {
+    super();
+  }
+
+  public WindowedBlockStream(int blockCapacity)
+  {
+    super(blockCapacity);
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+    moveToNextWindow();
+  }
+
+  /**
+   * make sure different windows will not share any blocks. Move to next block if
+   * current block is already used.
+   */
+  protected void moveToNextWindow()
+  {
+    //use current block if it hasn't be used, else, move to next block
+    Block block = getOrCreateCurrentBlock();
+    if (!block.isClear()) {
+      throw new RuntimeException("Current block not clear, should NOT move to next window. Please call toSlice() to output data first");
+    }
+    if (block.size() > 0) {
+      moveToNextBlock();
+    }
+    windowToBlockIds.put(currentWindowId, currentBlockIndex);
+  }
+
+  /**
+   * This method tries to use a free block first. Allocate a new block if there
+   * are no free blocks
+   *
+   * @return The previous block
+   */
+  @Override
+  protected Block moveToNextBlock()
+  {
+    lock.writeLock().lock();
+    try {
+      Block previousBlock = currentBlock;
+      if (!freeBlockIds.isEmpty()) {
+        currentBlockIndex = freeBlockIds.iterator().next();
+        freeBlockIds.remove(currentBlockIndex);
+        currentBlock = this.blocks.get(currentBlockIndex);
+      } else {
+        currentBlockIndex = ++maxBlockIndex;
+        currentBlock = getOrCreateCurrentBlock();
+      }
+      windowToBlockIds.put(currentWindowId, currentBlockIndex);
+      return previousBlock;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void endWindow()
+  {
+    releaseMemory();
+  }
+
+  @Override
+  public void completeWindow(long windowId)
+  {
+    lock.writeLock().lock();
+    try {
+      Set<Long> windIds = Sets.newHashSet(windowToBlockIds.keySet());
+      for (long windId : windIds) {
+        if (windId <= windowId) {
+          resetWindow(windId);
+        }
+      }
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  protected void resetWindow(long windowId)
+  {
+    lock.writeLock().lock();
+    try {
+      Set<Integer> removedBlockIds = windowToBlockIds.removeAll(windowId);
+
+      int removedSize = 0;
+      for (int blockId : removedBlockIds) {
+        removedSize += blocks.get(blockId).size();
+        Block theBlock = blocks.get(blockId);
+        theBlock.reset();
+        if (theBlock == currentBlock) {
+          //the client code could ask reset up to current window
+          //but the reset block should not be current block. current block should be reassigned.
+          moveToNextBlock();
+        }
+        logger.debug("reset block: {}, currentBlock: {}", blockId, theBlock);
+      }
+
+      freeBlockIds.addAll(removedBlockIds);
+      size -= removedSize;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void reset()
+  {
+    lock.writeLock().lock();
+    try {
+      super.reset();
+
+      //all blocks are free now except the current one
+      freeBlockIds.addAll(blocks.keySet());
+      freeBlockIds.remove(currentBlockIndex);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * The size of the data of all windows with id less than or equals to windowId
+   * @param windowId
+   * @return
+   */
+  public long dataSizeUpToWindow(long windowId)
+  {
+    lock.readLock().lock();
+    try {
+      long totalSize = 0;
+      for (long winId : windowToBlockIds.keySet()) {
+        totalSize += dataSizeOfWindow(winId);
+      }
+      return totalSize;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  protected long dataSizeOfWindow(long windowId)
+  {
+    lock.readLock().lock();
+    try {
+      long sizeOfWindow = 0;
+      Set<Integer> blockIds = windowToBlockIds.get(windowId);
+      if (blockIds != null) {
+        for (int blockId : blockIds) {
+          sizeOfWindow += blocks.get(blockId).size();
+        }
+      }
+      return sizeOfWindow;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  public void releaseMemory()
+  {
+    /**
+     * report and release extra blocks
+     */
+    releaseStrategy.currentFreeBlocks(freeBlockIds.size());
+    int releasingBlocks = Math.min(releaseStrategy.getNumBlocksToRelease(), freeBlockIds.size());
+    int releasedBlocks = 0;
+    Iterator<Integer> iter = freeBlockIds.iterator();
+    while (releasedBlocks < releasingBlocks) {
+      //release blocks
+      int blockId = iter.next();
+      iter.remove();
+      blocks.remove(blockId);
+      releasedBlocks++;
+    }
+
+    /**
+     * report number of released blocks
+     */
+    if (releasedBlocks > 0) {
+      releaseStrategy.releasedBlocks(releasedBlocks);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
index da44fb1..b88501e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
@@ -34,7 +34,6 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * Spillable session windowed storage.
@@ -53,7 +52,7 @@ public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeye
     if (keyToWindowsMap == null) {
       // NOTE: this will pose difficulties when we try to assign the entries to a time bucket later on.
       // This is logged in APEXMALHAR-2271
-      keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>, Slice>)(Serde)windowSerde);
+      keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>>)(Serde)windowSerde);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
index ac386ab..ef111b3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
@@ -27,15 +27,14 @@ import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.state.spillable.Spillable;
 import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
 import org.apache.apex.malhar.lib.window.Window;
 import org.apache.apex.malhar.lib.window.WindowedStorage;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * Implementation of WindowedKeyedStorage using {@link Spillable} data structures
@@ -48,10 +47,10 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
   @NotNull
   protected SpillableComplexComponent scc;
   protected long bucket;
-  protected Serde<Window, Slice> windowSerde;
-  protected Serde<Pair<Window, K>, Slice> windowKeyPairSerde;
-  protected Serde<K, Slice> keySerde;
-  protected Serde<V, Slice> valueSerde;
+  protected Serde<Window> windowSerde;
+  protected Serde<Pair<Window, K>> windowKeyPairSerde;
+  protected Serde<K> keySerde;
+  protected Serde<V> valueSerde;
 
   protected Spillable.SpillableMap<Pair<Window, K>, V> windowKeyToValueMap;
   protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap;
@@ -96,7 +95,7 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
   }
 
   public SpillableWindowedKeyedStorage(long bucket,
-      Serde<Window, Slice> windowSerde, Serde<Pair<Window, K>, Slice> windowKeyPairSerde, Serde<K, Slice> keySerde, Serde<V, Slice> valueSerde)
+      Serde<Window> windowSerde, Serde<Pair<Window, K>> windowKeyPairSerde, Serde<K> keySerde, Serde<V> valueSerde)
   {
     this.bucket = bucket;
     this.windowSerde = windowSerde;
@@ -120,17 +119,17 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
     this.bucket = bucket;
   }
 
-  public void setWindowSerde(Serde<Window, Slice> windowSerde)
+  public void setWindowSerde(Serde<Window> windowSerde)
   {
     this.windowSerde = windowSerde;
   }
 
-  public void setWindowKeyPairSerde(Serde<Pair<Window, K>, Slice> windowKeyPairSerde)
+  public void setWindowKeyPairSerde(Serde<Pair<Window, K>> windowKeyPairSerde)
   {
     this.windowKeyPairSerde = windowKeyPairSerde;
   }
 
-  public void setValueSerde(Serde<V, Slice> valueSerde)
+  public void setValueSerde(Serde<V> valueSerde)
   {
     this.valueSerde = valueSerde;
   }
@@ -168,16 +167,16 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
     }
     // set default serdes
     if (windowSerde == null) {
-      windowSerde = new SerdeKryoSlice<>();
+      windowSerde = new GenericSerde<>();
     }
     if (windowKeyPairSerde == null) {
-      windowKeyPairSerde = new SerdeKryoSlice<>();
+      windowKeyPairSerde = new GenericSerde<>();
     }
     if (keySerde == null) {
-      keySerde = new SerdeKryoSlice<>();
+      keySerde = new GenericSerde<>();
     }
     if (valueSerde == null) {
-      valueSerde = new SerdeKryoSlice<>();
+      valueSerde = new GenericSerde<>();
     }
 
     if (windowKeyToValueMap == null) {
@@ -220,5 +219,4 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
   {
     return windowKeyToValueMap.get(new ImmutablePair<>(window, key));
   }
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
index 6666381..9a8a291 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
@@ -24,13 +24,12 @@ import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.state.spillable.Spillable;
 import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
 import org.apache.apex.malhar.lib.window.Window;
 import org.apache.apex.malhar.lib.window.WindowedStorage;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures
@@ -42,8 +41,8 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
   @NotNull
   private SpillableComplexComponent scc;
   private long bucket;
-  private Serde<Window, Slice> windowSerde;
-  private Serde<T, Slice> valueSerde;
+  private Serde<Window> windowSerde;
+  private Serde<T> valueSerde;
 
   protected Spillable.SpillableMap<Window, T> windowToDataMap;
 
@@ -51,7 +50,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
   {
   }
 
-  public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde)
+  public SpillableWindowedPlainStorage(long bucket, Serde<Window> windowSerde, Serde<T> valueSerde)
   {
     this.bucket = bucket;
     this.windowSerde = windowSerde;
@@ -73,12 +72,12 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
     this.bucket = bucket;
   }
 
-  public void setWindowSerde(Serde<Window, Slice> windowSerde)
+  public void setWindowSerde(Serde<Window> windowSerde)
   {
     this.windowSerde = windowSerde;
   }
 
-  public void setValueSerde(Serde<T, Slice> valueSerde)
+  public void setValueSerde(Serde<T> valueSerde)
   {
     this.valueSerde = valueSerde;
   }
@@ -128,10 +127,10 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
     }
     // set default serdes
     if (windowSerde == null) {
-      windowSerde = new SerdeKryoSlice<>();
+      windowSerde = new GenericSerde<>();
     }
     if (valueSerde == null) {
-      valueSerde = new SerdeKryoSlice<>();
+      valueSerde = new GenericSerde<>();
     }
     if (windowToDataMap == null) {
       windowToDataMap = scc.newSpillableMap(bucket, windowSerde, valueSerde);
@@ -142,5 +141,4 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
   public void teardown()
   {
   }
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
index 403072d..92937a9 100644
--- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
+++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
 import org.apache.commons.io.FileUtils;
 
 import com.google.common.base.Preconditions;
@@ -57,7 +58,7 @@ public class TestUtils
 
   public static Slice getSlice(int val)
   {
-    return new Slice(getBytes(val));
+    return new BufferSlice(getBytes(val));
   }
 
   public static class TestInfo extends TestWatcher

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
index 2058b69..6645a98 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -28,6 +28,12 @@ import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.apex.malhar.lib.state.managed.Bucket.ReadSource;
+import org.apache.apex.malhar.lib.utils.serde.AffixSerde;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
+
 import com.google.common.primitives.Longs;
 
 import com.datatorrent.lib.fileaccess.FileAccess;
@@ -82,6 +88,7 @@ public class DefaultBucketTest
     Assert.assertNull("value not present", value);
 
     Assert.assertEquals("size of bucket", one.length * 2 + Longs.BYTES, testMeta.defaultBucket.getSizeInBytes());
+
     testMeta.defaultBucket.teardown();
   }
 
@@ -126,7 +133,6 @@ public class DefaultBucketTest
     Slice one = ManagedStateTestUtils.getSliceFor("1");
     testPut();
     Map<Slice, Bucket.BucketedValue> unsaved = testMeta.defaultBucket.checkpoint(10);
-    Assert.assertEquals("size", 1, unsaved.size());
 
     Map.Entry<Slice, Bucket.BucketedValue> entry = unsaved.entrySet().iterator().next();
     Assert.assertEquals("key", one, entry.getKey());
@@ -192,15 +198,39 @@ public class DefaultBucketTest
     testGetFromReader();
     long initSize = testMeta.defaultBucket.getSizeInBytes();
 
-    Slice two = ManagedStateTestUtils.getSliceFor("2");
-    testMeta.defaultBucket.put(two, 101, two);
-
-    Assert.assertEquals("size", initSize + (two.length * 2 + Longs.BYTES ), testMeta.defaultBucket.getSizeInBytes());
+    //The temporary memory generated by get was not managed by bucket, only put was managed by bucket
+    SerializationBuffer buffer = new SerializationBuffer(testMeta.defaultBucket.getKeyStream());
+    byte[] keyPrefix = new byte[]{0};
+    String key = "1";
+    String value = "2";
+    AffixSerde<String> keySerde = new AffixSerde<>(keyPrefix, new StringSerde(), null);
+
+    StringSerde valueSerde = new StringSerde();
+
+    testMeta.defaultBucket.getKeyStream().beginWindow(1);
+    testMeta.defaultBucket.getValueStream().beginWindow(1);
+    keySerde.serialize(key, buffer);
+    Slice keySlice = buffer.toSlice();
+    valueSerde.serialize(value, buffer);
+    Slice valueSlice = buffer.toSlice();
+    testMeta.defaultBucket.put(keySlice, 1, valueSlice);
+    testMeta.defaultBucket.getKeyStream().endWindow();
+    testMeta.defaultBucket.getValueStream().endWindow();
+
+    long currentSize = testMeta.defaultBucket.getSizeInBytes();
+    testMeta.defaultBucket.freeMemory(Long.MAX_VALUE);
+    //call this method to invoke the release memory
+    testMeta.defaultBucket.get(keySlice, -1, ReadSource.MEMORY);
+    long sizeFreed = currentSize - testMeta.defaultBucket.getSizeInBytes();
+
+    SerializationBuffer tmpBuffer = new SerializationBuffer(new WindowedBlockStream());
+    tmpBuffer.writeBytes(keyPrefix);
+    tmpBuffer.writeString(key);
+    tmpBuffer.writeString(value);
+    int expectedFreedSize = tmpBuffer.toSlice().toByteArray().length; //key prefix, key length, key; value length, value
+    Assert.assertEquals("size freed", expectedFreedSize, sizeFreed);
+    Assert.assertEquals("existing size", currentSize - expectedFreedSize, testMeta.defaultBucket.getSizeInBytes());
 
-    long sizeFreed = testMeta.defaultBucket.freeMemory(Long.MAX_VALUE);
-    Assert.assertEquals("size freed", initSize, sizeFreed);
-    Assert.assertEquals("existing size", (two.length * 2 + Longs.BYTES), testMeta.defaultBucket.getSizeInBytes());
     testMeta.defaultBucket.teardown();
   }
-
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
index 0d3f87a..86f8430 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
@@ -28,6 +28,7 @@ import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.RemoteIterator;
 
@@ -128,6 +129,6 @@ public class ManagedStateTestUtils
 
   public static Slice getSliceFor(String x)
   {
-    return new Slice(x.getBytes());
+    return new BufferSlice(x.getBytes());
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
index af05c88..5dd6404 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
@@ -23,7 +23,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 import com.google.common.collect.Lists;
 
@@ -58,7 +58,7 @@ public class SpillableArrayListImplTest
   public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
   {
     SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
-        new SerdeStringSlice(), 1);
+        new StringSerde(), 1);
 
     store.setup(testMeta.operatorContext);
     list.setup(testMeta.operatorContext);
@@ -177,7 +177,7 @@ public class SpillableArrayListImplTest
   private void simpleAddGetAndSetTest3Helper(SpillableStateStore store)
   {
     SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
-        new SerdeStringSlice(), 3);
+        new StringSerde(), 3);
 
     store.setup(testMeta.operatorContext);
     list.setup(testMeta.operatorContext);
@@ -321,10 +321,10 @@ public class SpillableArrayListImplTest
   public void simpleMultiListTestHelper(SpillableStateStore store)
   {
     SpillableArrayListImpl<String> list1 = new SpillableArrayListImpl<>(0L, ID1, store,
-        new SerdeStringSlice(), 1);
+        new StringSerde(), 1);
 
     SpillableArrayListImpl<String> list2 = new SpillableArrayListImpl<>(0L, ID2, store,
-        new SerdeStringSlice(), 1);
+        new StringSerde(), 1);
 
     store.setup(testMeta.operatorContext);
     list1.setup(testMeta.operatorContext);
@@ -483,7 +483,7 @@ public class SpillableArrayListImplTest
     SpillableStateStore store = testMeta.store;
 
     SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
-        new SerdeStringSlice(), 3);
+        new StringSerde(), 3);
 
     store.setup(testMeta.operatorContext);
     list.setup(testMeta.operatorContext);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
index 82fb340..d21bf50 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
@@ -26,9 +26,11 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
 import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
 
 import com.google.common.collect.Lists;
 
@@ -63,8 +65,8 @@ public class SpillableArrayListMultimapImplTest
   public void simpleMultiKeyTestHelper(SpillableStateStore store)
   {
     SpillableArrayListMultimapImpl<String, String> map =
-        new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new SerdeStringSlice(),
-        new SerdeStringSlice());
+        new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new StringSerde(),
+        new StringSerde());
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -112,11 +114,11 @@ public class SpillableArrayListMultimapImplTest
   public long simpleMultiKeyTestHelper(SpillableStateStore store,
       SpillableArrayListMultimapImpl<String, String> map, String key, long nextWindowId)
   {
-    SerdeStringSlice serdeString = new SerdeStringSlice();
-    SerdeIntSlice serdeInt = new SerdeIntSlice();
-
-    Slice keySlice = serdeString.serialize(key);
-
+    StringSerde serdeString = new StringSerde();
+    IntSerde serdeInt = new IntSerde();
+    SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+    serdeString.serialize(key, buffer);
+    Slice keySlice = buffer.toSlice();
     byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
 
     nextWindowId++;
@@ -249,7 +251,7 @@ public class SpillableArrayListMultimapImplTest
     SpillableStateStore store = testMeta.store;
 
     SpillableArrayListMultimapImpl<String, String> map =
-        new SpillableArrayListMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+        new SpillableArrayListMultimapImpl<>(store, ID1, 0L, new StringSerde(), new StringSerde());
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -323,8 +325,10 @@ public class SpillableArrayListMultimapImplTest
     store.beginWindow(nextWindowId);
     map.beginWindow(nextWindowId);
 
-    SerdeStringSlice serdeString = new SerdeStringSlice();
-    Slice keySlice = serdeString.serialize("a");
+    StringSerde serdeString = new StringSerde();
+    SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+    serdeString.serialize("a", buffer);
+    Slice keySlice = buffer.toSlice();
     byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
 
     SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d",
@@ -350,7 +354,7 @@ public class SpillableArrayListMultimapImplTest
 
     SpillableStateStore store = testMeta.store;
     SpillableArrayListMultimapImpl<String, String> multimap = new SpillableArrayListMultimapImpl<>(
-        this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+        this.testMeta.store, ID1, 0L, new StringSerde(), new StringSerde());
 
     Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
     attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
index 5c477b1..29c2090 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
@@ -22,7 +22,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 public class SpillableComplexComponentImplTest
 {
@@ -48,9 +48,9 @@ public class SpillableComplexComponentImplTest
     SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(store);
 
     Spillable.SpillableComponent scList =
-        (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new SerdeStringSlice());
+        (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new StringSerde());
     Spillable.SpillableComponent scMap =
-        (Spillable.SpillableComponent)sccImpl.newSpillableMap(0L, new SerdeStringSlice(), new SerdeStringSlice());
+        (Spillable.SpillableComponent)sccImpl.newSpillableMap(0L, new StringSerde(), new StringSerde());
 
     sccImpl.setup(testMeta.operatorContext);
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
index e8aea46..a96a8fd 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
@@ -23,7 +23,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
@@ -47,6 +47,7 @@ public class SpillableMapImplTest
     simpleGetAndPutTestHelper(store);
   }
 
+
   @Test
   public void simpleGetAndPutManagedStateTest()
   {
@@ -55,11 +56,7 @@ public class SpillableMapImplTest
 
   private void simpleGetAndPutTestHelper(SpillableStateStore store)
   {
-    SerdeStringSlice sss = new SerdeStringSlice();
-
-    SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
+    SpillableMapImpl<String, String> map = createSpillableMap(store);
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -157,23 +154,25 @@ public class SpillableMapImplTest
   public void simpleRemoveTest()
   {
     InMemSpillableStateStore store = new InMemSpillableStateStore();
-
     simpleRemoveTestHelper(store);
   }
 
+
   @Test
   public void simpleRemoveManagedStateTest()
   {
     simpleRemoveTestHelper(testMeta.store);
   }
 
-  private void simpleRemoveTestHelper(SpillableStateStore store)
+  protected SpillableMapImpl<String, String> createSpillableMap(SpillableStateStore store)
   {
-    SerdeStringSlice sss = new SerdeStringSlice();
+    return new SpillableMapImpl<String, String>(store, ID1, 0L, new StringSerde(),
+        new StringSerde());
+  }
 
-    SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
+  private void simpleRemoveTestHelper(SpillableStateStore store)
+  {
+    SpillableMapImpl<String, String> map = createSpillableMap(store);
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -324,14 +323,14 @@ public class SpillableMapImplTest
 
   public void multiMapPerBucketTestHelper(SpillableStateStore store)
   {
-    SerdeStringSlice sss = new SerdeStringSlice();
+    StringSerde sss = new StringSerde();
 
     SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(store, ID1, 0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
+        new StringSerde(),
+        new StringSerde());
     SpillableMapImpl<String, String> map2 = new SpillableMapImpl<>(store, ID2, 0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
+        new StringSerde(),
+        new StringSerde());
 
     store.setup(testMeta.operatorContext);
     map1.setup(testMeta.operatorContext);
@@ -413,11 +412,11 @@ public class SpillableMapImplTest
   @Test
   public void recoveryWithManagedStateTest() throws Exception
   {
-    SerdeStringSlice sss = new SerdeStringSlice();
+    StringSerde sss = new StringSerde();
 
     SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(testMeta.store, ID1, 0L,
-        new SerdeStringSlice(),
-        new SerdeStringSlice());
+        new StringSerde(),
+        new StringSerde());
 
     testMeta.store.setup(testMeta.operatorContext);
     map1.setup(testMeta.operatorContext);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
index 3883191..d0343e1 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
@@ -26,7 +26,7 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 import com.google.common.collect.Lists;
 
@@ -53,7 +53,7 @@ public class SpillableSetImplTest
 
   public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
   {
-    SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new SerdeStringSlice());
+    SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new StringSerde());
 
     store.setup(testMeta.operatorContext);
     set.setup(testMeta.operatorContext);

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
index 15970af..2f80628 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
@@ -27,7 +27,8 @@ import org.junit.Rule;
 import org.junit.Test;
 
 import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -62,8 +63,7 @@ public class SpillableSetMultimapImplTest
   public void simpleMultiKeyTestHelper(SpillableStateStore store)
   {
     SpillableSetMultimapImpl<String, String> map =
-        new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(),
-        new SerdeStringSlice());
+        new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde());
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -201,7 +201,7 @@ public class SpillableSetMultimapImplTest
     SpillableStateStore store = testMeta.store;
 
     SpillableSetMultimapImpl<String, String> map =
-        new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+        new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde());
 
     store.setup(testMeta.operatorContext);
     map.setup(testMeta.operatorContext);
@@ -276,8 +276,9 @@ public class SpillableSetMultimapImplTest
     final int numOfEntry = 100000;
 
     SpillableStateStore store = testMeta.store;
-    SpillableSetMultimapImpl<String, String> multimap = new SpillableSetMultimapImpl<>(
-        this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+    SpillableSetMultimapImpl<String, String> multimap = new SpillableSetMultimapImpl<>(testMeta.store, ID1, 0L,
+        createStringSerde(), createStringSerde());
 
     Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
     attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
@@ -294,4 +295,9 @@ public class SpillableSetMultimapImplTest
     multimap.endWindow();
     store.endWindow();
   }
+
+  protected Serde<String> createStringSerde()
+  {
+    return new StringSerde();
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
index 36e3557..d72b1f9 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
@@ -27,14 +27,15 @@ import org.junit.runner.Description;
 
 import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils;
 import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.CollectionSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
+
+import com.esotericsoftware.kryo.io.Input;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
 import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
 import com.datatorrent.lib.util.TestUtils;
 import com.datatorrent.netlet.util.Slice;
@@ -44,9 +45,9 @@ import com.datatorrent.netlet.util.Slice;
  */
 public class SpillableTestUtils
 {
-  public static SerdeStringSlice SERDE_STRING_SLICE = new SerdeStringSlice();
-  public static SerdeCollectionSlice<String, List<String>> SERDE_STRING_LIST_SLICE = new SerdeCollectionSlice<>(new SerdeStringSlice(),
-      (Class<List<String>>)(Class)ArrayList.class);
+  public static StringSerde STRING_SERDE = new StringSerde();
+  public static CollectionSerde<String, List<String>> STRING_LIST_SERDE = new CollectionSerde<>(new StringSerde(),
+      (Class)ArrayList.class);
 
   private SpillableTestUtils()
   {
@@ -77,34 +78,41 @@ public class SpillableTestUtils
     }
   }
 
+  protected static SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+
   public static Slice getKeySlice(byte[] id, String key)
   {
-    return SliceUtils.concatenate(id, SERDE_STRING_SLICE.serialize(key));
+    buffer.writeBytes(id);
+    STRING_SERDE.serialize(key, buffer);
+    return buffer.toSlice();
   }
 
   public static Slice getKeySlice(byte[] id, int index, String key)
   {
-    return SliceUtils.concatenate(id,
-        SliceUtils.concatenate(GPOUtils.serializeInt(index),
-            SERDE_STRING_SLICE.serialize(key)));
+    buffer.writeBytes(id);
+    buffer.writeInt(index);
+    STRING_SERDE.serialize(key, buffer);
+    return buffer.toSlice();
   }
 
   public static void checkValue(SpillableStateStore store, long bucketId, String key,
       byte[] prefix, String expectedValue)
   {
-    checkValue(store, bucketId, SliceUtils.concatenate(prefix, SERDE_STRING_SLICE.serialize(key)).buffer,
-        expectedValue, 0, SERDE_STRING_SLICE);
+    buffer.writeBytes(prefix);
+    STRING_SERDE.serialize(key, buffer);
+    checkValue(store, bucketId, buffer.toSlice().toByteArray(), expectedValue, 0, STRING_SERDE);
   }
 
   public static void checkValue(SpillableStateStore store, long bucketId,
       byte[] prefix, int index, List<String> expectedValue)
   {
-    checkValue(store, bucketId, SliceUtils.concatenate(prefix, GPOUtils.serializeInt(index)), expectedValue, 0,
-        SERDE_STRING_LIST_SLICE);
+    buffer.writeBytes(prefix);
+    buffer.writeInt(index);
+    checkValue(store, bucketId, buffer.toSlice().toByteArray(), expectedValue, 0, STRING_LIST_SERDE);
   }
 
-  public static <T> void  checkValue(SpillableStateStore store, long bucketId, byte[] bytes,
-      T expectedValue, int offset, Serde<T, Slice> serde)
+  public static <T> void checkValue(SpillableStateStore store, long bucketId, byte[] bytes,
+      T expectedValue, int offset, Serde<T> serde)
   {
     Slice slice = store.getSync(bucketId, new Slice(bytes));
 
@@ -116,7 +124,7 @@ public class SpillableTestUtils
       }
     }
 
-    T string = serde.deserialize(slice, new MutableInt(offset));
+    T string = serde.deserialize(new Input(slice.buffer, slice.offset + offset, slice.length));
 
     Assert.assertEquals(expectedValue, string);
   }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
index 8033a7d..a2cbb54 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
@@ -25,9 +25,6 @@ import org.junit.Test;
 
 import com.google.common.collect.Sets;
 
-/**
- * Created by tfarkas on 6/4/16.
- */
 public class TimeBasedPriorityQueueTest
 {
   @Test

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java
new file mode 100644
index 0000000..007fab9
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class AffixSerdeTest
+{
+  @Test
+  public void simpleTest()
+  {
+    SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+    AffixSerde<String> serde = new AffixSerde<>(new byte[]{1, 2, 3}, new StringSerde(), new byte[]{9});
+
+    final String orgValue = "abc";
+    serde.serialize(orgValue, buffer);
+    Slice slice = buffer.toSlice();
+
+    String value = serde.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+    Assert.assertEquals(orgValue, value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java
new file mode 100644
index 0000000..3b39d6c
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class BlockStreamTest
+{
+  protected Random random = new Random();
+
+  @Test
+  public void testWindowedBlockStream()
+  {
+    WindowedBlockStream bs = new WindowedBlockStream();
+    List<byte[]> totalList = Lists.newArrayList();
+    List<Slice> slices = Lists.newArrayList();
+
+    for (int windowId = 0; windowId < 10; ++windowId) {
+      List<byte[]> list = generateList();
+      totalList.addAll(list);
+
+      bs.beginWindow(windowId);
+      writeToBlockStream(bs, list, slices);
+      bs.endWindow();
+
+      if (windowId % 2 != 0) {
+        verify(totalList, slices);
+
+        bs.completeWindow(windowId);
+        totalList.clear();
+        slices.clear();
+      }
+    }
+  }
+
+  @Test
+  public void testBlockStream()
+  {
+    BlockStream bs = new BlockStream();
+    List<byte[]> totalList = Lists.newArrayList();
+    List<Slice> slices = Lists.newArrayList();
+
+    for (int tryTime = 0; tryTime < 10; ++tryTime) {
+      List<byte[]> list = generateList();
+      totalList.addAll(list);
+
+      writeToBlockStream(bs, list, slices);
+
+      if (tryTime % 2 != 0) {
+        verify(totalList, slices);
+
+        bs.reset();
+        totalList.clear();
+        slices.clear();
+      }
+
+    }
+  }
+
+  private void writeToBlockStream(BlockStream bs, List<byte[]> list, List<Slice> slices)
+  {
+    for (byte[] bytes : list) {
+      int times = random.nextInt(100) + 1;
+      int remainLen = bytes.length;
+      int offset = 0;
+      while (times > 0 && remainLen > 0) {
+        int avgSubLen = remainLen / times;
+        times--;
+        if (avgSubLen == 0) {
+          bs.write(bytes, offset, remainLen);
+          break;
+        }
+
+        int writeLen = remainLen;
+        if (times != 0) {
+          int subLen = random.nextInt(avgSubLen * 2);
+          writeLen = Math.min(subLen, remainLen);
+        }
+        bs.write(bytes, offset, writeLen);
+
+        offset += writeLen;
+        remainLen -= writeLen;
+      }
+      slices.add(bs.toSlice());
+    }
+  }
+
+  private void verify(List<byte[]> list, List<Slice> slices)
+  {
+    //verify
+    Assert.assertTrue("size not equal.", list.size() == slices.size());
+
+    for (int i = 0; i < list.size(); ++i) {
+      byte[] bytes = list.get(i);
+      byte[] newBytes = slices.get(i).toByteArray();
+      if (!Arrays.equals(bytes, newBytes)) {
+        Assert.assertArrayEquals(bytes, newBytes);
+      }
+    }
+  }
+
+  private List<byte[]> generateList()
+  {
+    List<byte[]> list = Lists.newArrayList();
+    int size = random.nextInt(10000) + 1;
+    for (int i = 0; i < size; i++) {
+      list.add(generateByteArray());
+    }
+    return list;
+  }
+
+  protected byte[] generateByteArray()
+  {
+    int len = random.nextInt(10000) + 1;
+    byte[] bytes = new byte[len];
+    random.nextBytes(bytes);
+    return bytes;
+  }
+
+
+  @Test
+  public void testReleaseMemory()
+  {
+    WindowedBlockStream stream = new WindowedBlockStream();
+
+    byte[] data = new byte[2048];
+    final int loopPerWindow = 100;
+    long windowId = 0;
+
+    //fill data;
+    for (; windowId < 100; ++windowId) {
+      stream.beginWindow(windowId);
+      for (int i = 0; i < loopPerWindow; ++i) {
+        stream.write(data);
+        stream.toSlice();
+      }
+      stream.endWindow();
+    }
+
+    long capacity = stream.capacity();
+    stream.completeWindow(windowId);
+    Assert.assertTrue(capacity == stream.capacity());
+    Assert.assertTrue(0 == stream.size());
+
+    //release memory;
+    for (; windowId < 200; ++windowId) {
+      stream.beginWindow(windowId);
+      stream.endWindow();
+    }
+
+    //at least keep one block as current block
+    Assert.assertTrue(stream.capacity() == Block.DEFAULT_BLOCK_SIZE);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java
new file mode 100644
index 0000000..255d9c0
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class CollectionSerdeTest
+{
+  @Test
+  public void testSerdeList()
+  {
+    CollectionSerde<String, List<String>> serdeList =
+        new CollectionSerde<>(new StringSerde(), (Class)ArrayList.class);
+
+    List<String> stringList = Lists.newArrayList("a", "b", "c");
+    SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+    serdeList.serialize(stringList, buffer);
+
+    Slice slice = buffer.toSlice();
+    List<String> deserializedList = serdeList.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+
+    Assert.assertEquals(stringList, deserializedList);
+  }
+
+  @Test
+  public void testSerdeSet()
+  {
+    CollectionSerde<String, Set<String>> serdeSet =
+        new CollectionSerde<>(new StringSerde(), (Class)HashSet.class);
+
+    Set<String> stringList = Sets.newHashSet("a", "b", "c");
+    SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+    serdeSet.serialize(stringList, buffer);
+
+    Slice slice = buffer.toSlice();
+    Set<String> deserializedSet = serdeSet.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+
+    Assert.assertEquals(stringList, deserializedSet);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
new file mode 100644
index 0000000..34b7088
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * SerdeKryoSlice unit tests
+ */
+public class GenericSerdeTest
+{
+  public static class TestPojo
+  {
+    private TestPojo()
+    {
+    }
+
+    public TestPojo(int intValue, String stringValue)
+    {
+      this.intValue = intValue;
+      this.stringValue = stringValue;
+    }
+
+    @Override
+    public boolean equals(Object other)
+    {
+      TestPojo o = (TestPojo)other;
+      return intValue == o.intValue && stringValue.equals(o.stringValue);
+    }
+
+    int intValue;
+    String stringValue;
+  }
+
+  @Test
+  public void stringListTest()
+  {
+    GenericSerde<ArrayList> serdeList = new GenericSerde<>(ArrayList.class);
+
+    ArrayList<String> stringList = Lists.newArrayList("a", "b", "c");
+    SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+    serdeList.serialize(stringList, buffer);
+    Slice slice = buffer.toSlice();
+    List<String> deserializedList = serdeList.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+    Assert.assertEquals(stringList, deserializedList);
+  }
+
+  @Test
+  public void pojoTest()
+  {
+    GenericSerde<TestPojo> serdePojo = new GenericSerde<>();
+    TestPojo pojo = new TestPojo(345, "xyz");
+    SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+    serdePojo.serialize(pojo, buffer);
+    Slice slice = buffer.toSlice();
+    TestPojo deserializedPojo = serdePojo.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+    Assert.assertEquals(pojo, deserializedPojo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java
new file mode 100644
index 0000000..104ff04
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class PairSerdeTest
+{
+  @Test
+  public void simpleSerdeTest()
+  {
+    PairSerde<String, Integer> serdePair = new PairSerde<>(new StringSerde(), new IntSerde());
+
+    Pair<String, Integer> pair = new ImmutablePair<>("abc", 123);
+
+    SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+    serdePair.serialize(pair, buffer);
+    Slice slice = buffer.toSlice();
+
+    Pair<String, Integer> deserializedPair = serdePair.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+
+    Assert.assertEquals(pair, deserializedPair);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java
deleted file mode 100644
index 3cb5b65..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-
-public class PassThruByteArraySerdeTest
-{
-  @Rule
-  public SerdeByteArrayToByteArrayTestWatcher testMeta = new SerdeByteArrayToByteArrayTestWatcher();
-
-  public static class SerdeByteArrayToByteArrayTestWatcher extends TestWatcher
-  {
-    public PassThruByteArraySerde serde;
-
-    @Override
-    protected void starting(Description description)
-    {
-      this.serde = new PassThruByteArraySerde();
-      super.starting(description);
-    }
-  }
-
-  @Test
-  public void simpleSerializeTest()
-  {
-    byte[] byteArray = new byte[]{1, 2, 3};
-    byte[] serialized = testMeta.serde.serialize(byteArray);
-
-    Assert.assertArrayEquals(byteArray, serialized);
-  }
-
-  @Test
-  public void simpleDeserializeTest()
-  {
-    byte[] byteArray = new byte[]{1, 2, 3};
-    byte[] serialized = testMeta.serde.deserialize(byteArray);
-
-    Assert.assertArrayEquals(byteArray, serialized);
-  }
-
-  @Test
-  public void simpleDeserializeOffsetTest()
-  {
-    byte[] byteArray = new byte[]{1, 2, 3};
-    byte[] serialized = testMeta.serde.deserialize(byteArray, new MutableInt(0));
-
-    Assert.assertArrayEquals(byteArray, serialized);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
deleted file mode 100644
index f6085f6..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import com.datatorrent.netlet.util.Slice;
-
-public class SerdeCollectionSliceTest
-{
-  @Test
-  public void testSerdeList()
-  {
-    SerdeCollectionSlice<String, List<String>> serdeList =
-        new SerdeCollectionSlice<>(new SerdeStringSlice(), (Class<List<String>>)(Class)ArrayList.class);
-
-    List<String> stringList = Lists.newArrayList("a", "b", "c");
-
-    Slice slice = serdeList.serialize(stringList);
-
-    List<String> deserializedList = serdeList.deserialize(slice);
-
-    Assert.assertEquals(stringList, deserializedList);
-  }
-
-  @Test
-  public void testSerdeSet()
-  {
-    SerdeCollectionSlice<String, Set<String>> serdeSet =
-        new SerdeCollectionSlice<>(new SerdeStringSlice(), (Class<Set<String>>)(Class)HashSet.class);
-
-    Set<String> stringList = Sets.newHashSet("a", "b", "c");
-
-    Slice slice = serdeSet.serialize(stringList);
-
-    Set<String> deserializedSet = serdeSet.deserialize(slice);
-
-    Assert.assertEquals(stringList, deserializedSet);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java
new file mode 100644
index 0000000..ee24557
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SerdeGeneralTest
+{
+  private final int charNum = 62;
+  private String[] testData = null;
+  private final Random random = new Random();
+
+  @Before
+  public void generateTestData()
+  {
+    int size = random.nextInt(10000) + 1;
+    testData = new String[size];
+    for (int i = 0; i < size; ++i) {
+      char[] chars = new char[random.nextInt(10000) + 1];
+      for (int j = 0; j < chars.length; ++j) {
+        chars[j] = getRandomChar();
+      }
+
+      testData[i] = new String(chars);
+    }
+  }
+
+  private char getRandomChar()
+  {
+    int value = random.nextInt(62);
+    if (value < 10) {
+      return (char)(value + '0');
+    } else if (value < 36) {
+      return (char)(value + 'A');
+    }
+    return (char)(value + 'a');
+  }
+
+  @Test
+  public void testSerdeInt()
+  {
+    IntSerde intSerde = new IntSerde();
+
+    SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+    int value = 123;
+    intSerde.serialize(value, buffer);
+
+    Slice slice = buffer.toSlice();
+
+    int deserializedValue = intSerde.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+
+    Assert.assertEquals(value, deserializedValue);
+  }
+
+  @Test
+  public void testSerdeString()
+  {
+    testSerde(testData, new StringSerde(), new StringSerdeVerifier());
+  }
+
+  @Test
+  public void testSerdeArray()
+  {
+    testSerde(testData, ArraySerde.newSerde(new StringSerde(), String.class), new StringArraySerdeVerifier());
+  }
+
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Test
+  public void testSerdeCollection()
+  {
+    CollectionSerde<String, List<String>> listSerde = new CollectionSerde<>(new StringSerde(), (Class)ArrayList.class);
+    testSerde(testData, listSerde, new StringListSerdeVerifier());
+  }
+
+
+  public <T> void testSerde(String[] strs, Serde<T> serde, SerdeVerifier<T> verifier)
+  {
+    SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+
+    for (int i = 0; i < 10; ++i) {
+      buffer.beginWindow(i);
+      verifier.verifySerde(strs, serde, buffer);
+      buffer.endWindow();
+      if (i % 3 == 0) {
+        buffer.completeWindow(i);
+      }
+      if (i % 4 == 0) {
+        buffer.reset();
+      }
+    }
+    buffer.release();
+  }
+
+  public interface SerdeVerifier<T>
+  {
+    void verifySerde(String[] datas, Serde<T> serde, SerializationBuffer buffer);
+  }
+
+  public static class StringSerdeVerifier implements SerdeVerifier<String>
+  {
+    @Override
+    public void verifySerde(String[] datas, Serde<String> serde, SerializationBuffer buffer)
+    {
+      for (String str : datas) {
+        serde.serialize(str, buffer);
+        Slice slice = buffer.toSlice();
+        Assert.assertTrue("serialize failed, String: " + str, str.equals(serde.deserialize(new Input(slice.buffer, slice.offset, slice.length))));
+      }
+    }
+  }
+
+
+  public static class StringArraySerdeVerifier implements SerdeVerifier<String[]>
+  {
+    @Override
+    public void verifySerde(String[] datas, Serde<String[]> serde, SerializationBuffer buffer)
+    {
+      serde.serialize(datas, buffer);
+      Slice slice = buffer.toSlice();
+      String[] newStrs = serde.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+      Assert.assertArrayEquals("serialize array failed.", datas, newStrs);
+    }
+  }
+
+  public static class StringListSerdeVerifier implements SerdeVerifier<List<String>>
+  {
+    @Override
+    public void verifySerde(String[] datas, Serde<List<String>> serdeList, SerializationBuffer buffer)
+    {
+      List<String> list = Arrays.asList(datas);
+
+      serdeList.serialize(list, buffer);
+      Slice slice = buffer.toSlice();
+      List<String> newStrs = serdeList.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+      Assert.assertArrayEquals("serialize list failed.", datas, newStrs.toArray(new String[0]));
+
+      buffer.reset();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
deleted file mode 100644
index b780f66..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * SerdeKryoSlice unit tests
- */
-public class SerdeKryoSliceTest
-{
-  public static class TestPojo
-  {
-    private TestPojo()
-    {
-    }
-
-    public TestPojo(int intValue, String stringValue)
-    {
-      this.intValue = intValue;
-      this.stringValue = stringValue;
-    }
-
-    @Override
-    public boolean equals(Object other)
-    {
-      TestPojo o = (TestPojo)other;
-      return intValue == o.intValue && stringValue.equals(o.stringValue);
-    }
-
-    int intValue;
-    String stringValue;
-  }
-
-  @Test
-  public void stringListTest()
-  {
-    SerdeKryoSlice<ArrayList> serdeList = new SerdeKryoSlice<>(ArrayList.class);
-
-    ArrayList<String> stringList = Lists.newArrayList("a", "b", "c");
-    Slice slice = serdeList.serialize(stringList);
-    List<String> deserializedList = serdeList.deserialize(slice);
-    Assert.assertEquals(stringList, deserializedList);
-  }
-
-  @Test
-  public void pojoTest()
-  {
-    SerdeKryoSlice<TestPojo> serdePojo = new SerdeKryoSlice<>();
-    TestPojo pojo = new TestPojo(345, "xyz");
-    Slice slice = serdePojo.serialize(pojo);
-    TestPojo deserializedPojo = serdePojo.deserialize(slice);
-    Assert.assertEquals(pojo, deserializedPojo);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
deleted file mode 100644
index 6684a9f..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
-import com.datatorrent.netlet.util.Slice;
-
-public class SerdePairSliceTest
-{
-  @Test
-  public void simpleSerdeTest()
-  {
-    SerdePairSlice<String, Integer> serdePair = new SerdePairSlice<>(new SerdeStringSlice(), new SerdeIntSlice());
-
-    Pair<String, Integer> pair = new ImmutablePair<>("abc", 123);
-
-    Slice slice = serdePair.serialize(pair);
-
-    Pair<String, Integer> deserializedPair = serdePair.deserialize(slice);
-
-    Assert.assertEquals(pair, deserializedPair);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
index 3b7789c..a44e454 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
@@ -47,8 +47,14 @@ public class SpillableWindowedStorageTest
     Window window2 = new Window.TimeWindow<>(1010, 10);
     Window window3 = new Window.TimeWindow<>(1020, 10);
     storage.setSpillableComplexComponent(sccImpl);
-    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+
+    /*
+     * storage.setup() will create Spillable Data Structures
+     * storage.getSpillableComplexComponent().setup() will setup these Data Structures.
+     * So storage.setup() should be called before storage.getSpillableComplexComponent().setup()
+     */
     storage.setup(testMeta.operatorContext);
+    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
 
     sccImpl.beginWindow(1000);
     storage.put(window1, 1);
@@ -103,8 +109,15 @@ public class SpillableWindowedStorageTest
     Window window2 = new Window.TimeWindow<>(1010, 10);
     Window window3 = new Window.TimeWindow<>(1020, 10);
     storage.setSpillableComplexComponent(sccImpl);
-    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+
+    /*
+     * storage.setup() will create Spillable Data Structures
+     * storage.getSpillableComplexComponent().setup() will setup these Data Structures.
+     * So storage.setup() should be called before storage.getSpillableComplexComponent().setup()
+     */
     storage.setup(testMeta.operatorContext);
+    storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+
 
     sccImpl.beginWindow(1000);
     storage.put(window1, "x", 1);


[3/3] apex-malhar git commit: APEXMALHAR-2190 #resolve #comment Use reusable buffer for serialization in spillable data structures

Posted by th...@apache.org.
APEXMALHAR-2190 #resolve #comment Use reusable buffer for serialization in spillable data structures


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2fa1e6b1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2fa1e6b1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2fa1e6b1

Branch: refs/heads/master
Commit: 2fa1e6b16312eecdd074520d431902f11d555221
Parents: 3799157
Author: brightchen <br...@datatorrent.com>
Authored: Mon Aug 15 17:46:27 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Mon Oct 24 12:52:08 2016 -0700

----------------------------------------------------------------------
 .../spillable/SpillableBenchmarkApp.java        |  69 +++++
 .../spillable/SpillableTestInputOperator.java   |  46 ++++
 .../spillable/SpillableTestOperator.java        | 189 ++++++++++++++
 .../spillable/SpillableBenchmarkAppTester.java  |  73 ++++++
 .../spillable/SpillableDSBenchmarkTest.java     | 171 +++++++++++++
 .../state/ManagedStateBenchmarkAppTest.java     | 101 ++++++++
 .../state/ManagedStateBenchmarkAppTester.java   | 101 --------
 benchmark/src/test/resources/log4j.properties   |   2 +
 .../state/managed/AbstractManagedStateImpl.java |  34 ++-
 .../apex/malhar/lib/state/managed/Bucket.java   |  85 +++++--
 .../lib/state/managed/BucketProvider.java       |  40 +++
 .../state/spillable/SpillableArrayListImpl.java |  17 +-
 .../SpillableArrayListMultimapImpl.java         |  53 ++--
 .../spillable/SpillableComplexComponent.java    |  29 +--
 .../SpillableComplexComponentImpl.java          |  64 +++--
 .../lib/state/spillable/SpillableMapImpl.java   |  44 ++--
 .../lib/state/spillable/SpillableSetImpl.java   |  45 +---
 .../spillable/SpillableSetMultimapImpl.java     |  45 ++--
 .../state/spillable/SpillableStateStore.java    |   3 +-
 .../state/spillable/WindowBoundedMapCache.java  |   5 +-
 .../inmem/InMemSpillableStateStore.java         |  26 ++
 .../utils/serde/AffixKeyValueSerdeManager.java  |  76 ++++++
 .../apex/malhar/lib/utils/serde/AffixSerde.java |  68 +++++
 .../apex/malhar/lib/utils/serde/ArraySerde.java |  97 ++++++++
 .../apex/malhar/lib/utils/serde/Block.java      | 217 ++++++++++++++++
 .../lib/utils/serde/BlockReleaseStrategy.java   |  47 ++++
 .../malhar/lib/utils/serde/BlockStream.java     | 179 +++++++++++++
 .../malhar/lib/utils/serde/BufferSlice.java     | 100 ++++++++
 .../malhar/lib/utils/serde/CollectionSerde.java |  97 ++++++++
 .../serde/DefaultBlockReleaseStrategy.java      |  96 +++++++
 .../malhar/lib/utils/serde/GenericSerde.java    |  81 ++++++
 .../apex/malhar/lib/utils/serde/IntSerde.java   |  45 ++++
 .../utils/serde/KeyValueByteStreamProvider.java |  37 +++
 .../lib/utils/serde/KeyValueSerdeManager.java   |  86 +++++++
 .../apex/malhar/lib/utils/serde/LongSerde.java  |  45 ++++
 .../apex/malhar/lib/utils/serde/PairSerde.java  |  73 ++++++
 .../lib/utils/serde/PassThruByteArraySerde.java |  51 ----
 .../serde/PassThruByteArraySliceSerde.java      |  61 -----
 .../lib/utils/serde/PassThruSliceSerde.java     |  32 ++-
 .../apex/malhar/lib/utils/serde/Serde.java      |  41 +--
 .../lib/utils/serde/SerdeCollectionSlice.java   | 120 ---------
 .../malhar/lib/utils/serde/SerdeIntSlice.java   |  54 ----
 .../malhar/lib/utils/serde/SerdeKryoSlice.java  | 100 --------
 .../malhar/lib/utils/serde/SerdeLongSlice.java  |  54 ----
 .../malhar/lib/utils/serde/SerdePairSlice.java  |  89 -------
 .../lib/utils/serde/SerdeStringSlice.java       |  55 ----
 .../lib/utils/serde/SerializationBuffer.java    | 130 ++++++++++
 .../apex/malhar/lib/utils/serde/SliceUtils.java |  10 +
 .../malhar/lib/utils/serde/StringSerde.java     |  45 ++++
 .../lib/utils/serde/WindowCompleteListener.java |  29 +++
 .../lib/utils/serde/WindowedBlockStream.java    | 249 +++++++++++++++++++
 .../impl/SpillableSessionWindowedStorage.java   |   3 +-
 .../impl/SpillableWindowedKeyedStorage.java     |  28 +--
 .../impl/SpillableWindowedPlainStorage.java     |  18 +-
 .../com/datatorrent/lib/util/TestUtils.java     |   3 +-
 .../lib/state/managed/DefaultBucketTest.java    |  48 +++-
 .../state/managed/ManagedStateTestUtils.java    |   3 +-
 .../spillable/SpillableArrayListImplTest.java   |  12 +-
 .../SpillableArrayListMultimapImplTest.java     |  30 ++-
 .../SpillableComplexComponentImplTest.java      |   6 +-
 .../state/spillable/SpillableMapImplTest.java   |  39 ++-
 .../state/spillable/SpillableSetImplTest.java   |   4 +-
 .../spillable/SpillableSetMultimapImplTest.java |  18 +-
 .../lib/state/spillable/SpillableTestUtils.java |  46 ++--
 .../spillable/TimeBasedPriorityQueueTest.java   |   3 -
 .../malhar/lib/utils/serde/AffixSerdeTest.java  |  43 ++++
 .../malhar/lib/utils/serde/BlockStreamTest.java | 179 +++++++++++++
 .../lib/utils/serde/CollectionSerdeTest.java    |  68 +++++
 .../lib/utils/serde/GenericSerdeTest.java       |  84 +++++++
 .../malhar/lib/utils/serde/PairSerdeTest.java   |  48 ++++
 .../utils/serde/PassThruByteArraySerdeTest.java |  72 ------
 .../utils/serde/SerdeCollectionSliceTest.java   |  65 -----
 .../lib/utils/serde/SerdeGeneralTest.java       | 169 +++++++++++++
 .../lib/utils/serde/SerdeKryoSliceTest.java     |  79 ------
 .../lib/utils/serde/SerdePairSliceTest.java     |  44 ----
 .../window/SpillableWindowedStorageTest.java    |  17 +-
 76 files changed, 3570 insertions(+), 1265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
new file mode 100644
index 0000000..e2fe8bb
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+@ApplicationAnnotation(name = "SpillableBenchmarkApp")
+public class SpillableBenchmarkApp implements StreamingApplication
+{
+  protected final String PROP_STORE_PATH = "dt.application.SpillableBenchmarkApp.storeBasePath";
+
+  @Override
+  public void populateDAG(DAG dag, Configuration conf)
+  {
+    // Create ActiveMQStringSinglePortOutputOperator
+    SpillableTestInputOperator input = new SpillableTestInputOperator();
+    input.batchSize = 100;
+    input.sleepBetweenBatch = 0;
+    input = dag.addOperator("input", input);
+
+    SpillableTestOperator testOperator = new SpillableTestOperator();
+    testOperator.store = createStore(conf);
+    testOperator.shutdownCount = -1;
+    testOperator = dag.addOperator("test", testOperator );
+
+
+    // Connect ports
+    dag.addStream("stream", input.output, testOperator.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
+  }
+
+
+  public ManagedStateSpillableStateStore createStore(Configuration conf)
+  {
+    String basePath = getStoreBasePath(conf);
+    ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore();
+    ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+    return store;
+  }
+
+  public String getStoreBasePath(Configuration conf)
+  {
+    return Preconditions.checkNotNull(conf.get(PROP_STORE_PATH),
+        "base path should be specified in the properties.xml");
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
new file mode 100644
index 0000000..2e33721
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+public class SpillableTestInputOperator extends BaseOperator implements InputOperator
+{
+  public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+  public long count = 0;
+  public int batchSize = 100;
+  public int sleepBetweenBatch = 1;
+
+  @Override
+  public void emitTuples()
+  {
+    for (int i = 0; i < batchSize; ++i) {
+      output.emit("" + ++count);
+    }
+    if (sleepBetweenBatch > 0) {
+      try {
+        Thread.sleep(sleepBetweenBatch);
+      } catch (InterruptedException e) {
+        //ignore
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
new file mode 100644
index 0000000..3c5bf71
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListMultimapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.LongSerde;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ShutdownException;
+import com.datatorrent.common.util.BaseOperator;
+
+public class SpillableTestOperator extends BaseOperator implements Operator.CheckpointNotificationListener
+{
+  private static final Logger logger = LoggerFactory.getLogger(SpillableTestOperator.class);
+
+  public static final byte[] ID1 = new byte[] {(byte)1};
+  public static final byte[] ID2 = new byte[] {(byte)2};
+  public static final byte[] ID3 = new byte[] {(byte)3};
+
+  public SpillableArrayListMultimapImpl<String, String> multiMap;
+
+  public ManagedStateSpillableStateStore store;
+
+  public long totalCount = 0;
+  public transient long countInWindow;
+  public long minWinId = -1;
+  public long committedWinId = -1;
+  public long windowId;
+
+  public SpillableMapImpl<Long, Long> windowToCount;
+
+  public long shutdownCount = -1;
+
+  public static Throwable errorTrace;
+
+  public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+  {
+    @Override
+    public void process(String tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  public void processTuple(String tuple)
+  {
+    if (++totalCount == shutdownCount) {
+      throw new RuntimeException("Test recovery. count = " + totalCount);
+    }
+    countInWindow++;
+    multiMap.put("" + windowId, tuple);
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    if (windowToCount == null) {
+      windowToCount = createWindowToCountMap(store);
+    }
+    if (multiMap == null) {
+      multiMap = createMultimap(store);
+    }
+
+    store.setup(context);
+    multiMap.setup(context);
+
+    checkData();
+  }
+
+  public void checkData()
+  {
+    long startTime = System.currentTimeMillis();
+    logger.debug("check data: totalCount: {}; minWinId: {}; committedWinId: {}; curWinId: {}", totalCount,
+        this.minWinId, committedWinId, this.windowId);
+    for (long winId = Math.max(committedWinId + 1, minWinId); winId < this.windowId; ++winId) {
+      Long count = this.windowToCount.get(winId);
+      SpillableArrayListImpl<String> datas = (SpillableArrayListImpl<String>)multiMap.get("" + winId);
+      String msg;
+      if (((datas == null && count != null) || (datas != null && count == null)) || (datas == null && count == null)) {
+        msg = "Invalid data/count. datas: " + datas + "; count: " + count;
+        logger.error(msg);
+        errorTrace = new RuntimeException(msg);
+        throw new ShutdownException();
+      } else {
+        int dataSize = datas.size();
+        if ((long)count != (long)dataSize) {
+          msg = String.format("data size not equal: window Id: %d; datas size: %d; count: %d", winId, dataSize, count);
+          logger.error(msg);
+          errorTrace = new RuntimeException(msg);
+          throw new ShutdownException();
+        }
+      }
+    }
+    logger.info("check data took {} millis.", System.currentTimeMillis() - startTime);
+  }
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+    store.beginWindow(windowId);
+    multiMap.beginWindow(windowId);
+    if (minWinId < 0) {
+      minWinId = windowId;
+    }
+
+    this.windowId = windowId;
+    countInWindow = 0;
+  }
+
+  @Override
+  public void endWindow()
+  {
+    multiMap.endWindow();
+    windowToCount.put(windowId, countInWindow);
+    windowToCount.endWindow();
+    store.endWindow();
+
+    if (windowId % 10 == 0) {
+      checkData();
+    }
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+    store.beforeCheckpoint(windowId);
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    this.committedWinId = windowId;
+    store.committed(windowId);
+  }
+
+  public static SpillableArrayListMultimapImpl<String, String> createMultimap(SpillableStateStore store)
+  {
+    return new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new StringSerde(),
+        new StringSerde());
+  }
+
+  public static SpillableMapImpl<String, String> createMap(SpillableStateStore store)
+  {
+    return new SpillableMapImpl<String, String>(store, ID2, 0L, new StringSerde(),
+        new StringSerde());
+  }
+
+  public static SpillableMapImpl<Long, Long> createWindowToCountMap(SpillableStateStore store)
+  {
+    return new SpillableMapImpl<Long, Long>(store, ID3, 0L, new LongSerde(),
+        new LongSerde());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
new file mode 100644
index 0000000..7f94079
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp
+{
+  private static final Logger logger = LoggerFactory.getLogger(SpillableBenchmarkAppTester.class);
+  public static final String basePath = "target/temp";
+  @Test
+  public void test() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    super.populateDAG(dag, conf);
+
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+      }
+    };
+
+    lma.prepareDAG(app, conf);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.run(60000);
+
+    lc.shutdown();
+
+    if (SpillableTestOperator.errorTrace != null) {
+      logger.error("Error.", SpillableTestOperator.errorTrace);
+      Assert.assertNull(SpillableTestOperator.errorTrace.getMessage(), SpillableTestOperator.errorTrace);
+    }
+  }
+
+  @Override
+  public String getStoreBasePath(Configuration conf)
+  {
+    return basePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
new file mode 100644
index 0000000..7e64c5f
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+
+public class SpillableDSBenchmarkTest
+{
+  private static final Logger logger = LoggerFactory.getLogger(SpillableDSBenchmarkTest.class);
+  protected static final int loopCount = 100000000;
+  protected static final long oneMB = 1024 * 1024;
+  protected static final int keySize = 500000;
+  protected static final int valueSize = 100000;
+  protected static final int maxKeyLength = 100;
+  protected static final int maxValueLength = 1000;
+
+  protected static final int tuplesPerWindow = 10000;
+  protected static final int checkPointWindows = 10;
+  protected static final int commitDelays = 100;
+
+  protected final transient Random random = new Random();
+  protected String[] keys;
+  protected String[] values;
+
+  @Rule
+  public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+
+  @Before
+  public void setup()
+  {
+    keys = new String[keySize];
+    for (int i = 0; i < keys.length; ++i) {
+      keys[i] = this.randomString(maxKeyLength);
+    }
+
+    values = new String[valueSize];
+    for (int i = 0; i < values.length; ++i) {
+      values[i] = this.randomString(maxValueLength);
+    }
+  }
+
+  @Test
+  public void testSpillableMap()
+  {
+    byte[] ID1 = new byte[]{(byte)1};
+    ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore();
+    ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath("target/temp");
+
+    StringSerde keySerde = createKeySerde();
+    Serde<String> valueSerde = createValueSerde();
+
+    SpillableMapImpl<String, String> map = new SpillableMapImpl<String, String>(store, ID1, 0L, keySerde, valueSerde);
+    store.setup(testMeta.operatorContext);
+    map.setup(testMeta.operatorContext);
+
+    final long startTime = System.currentTimeMillis();
+
+    long windowId = 0;
+    store.beginWindow(++windowId);
+    map.beginWindow(windowId);
+
+    int outputTimes = 0;
+    for (int i = 0; i < loopCount; ++i) {
+      putEntry(map);
+
+      if (i % tuplesPerWindow == 0) {
+        map.endWindow();
+        store.endWindow();
+
+        if (i % (tuplesPerWindow * checkPointWindows) == 0) {
+          store.beforeCheckpoint(windowId);
+
+          if (windowId > commitDelays) {
+            store.committed(windowId - commitDelays);
+          }
+        }
+
+        //next window
+        store.beginWindow(++windowId);
+        map.beginWindow(windowId);
+      }
+
+      long spentTime = System.currentTimeMillis() - startTime;
+      if (spentTime > outputTimes * 5000) {
+        ++outputTimes;
+        logger.info("Total Statistics: Spent {} mills for {} operation. average/second: {}", spentTime, i, i * 1000 / spentTime);
+        checkEnvironment();
+      }
+    }
+    long spentTime = System.currentTimeMillis() - startTime;
+
+    logger.info("Spent {} mills for {} operation. average: {}", spentTime, loopCount,
+        loopCount / spentTime);
+  }
+
+
+  public void putEntry(SpillableMapImpl<String, String> map)
+  {
+    map.put(keys[random.nextInt(keys.length)], values[random.nextInt(values.length)]);
+  }
+
+  public static final String characters = "0123456789ABCDEFGHIJKLMNOPKRSTUVWXYZabcdefghijklmopqrstuvwxyz";
+
+  protected static final char[] text = new char[Math.max(maxKeyLength, maxValueLength)];
+
+  public String randomString(int length)
+  {
+    for (int i = 0; i < length; i++) {
+      text[i] = characters.charAt(random.nextInt(characters.length()));
+    }
+    return new String(text, 0, length);
+  }
+
+  public void checkEnvironment()
+  {
+    Runtime runtime = Runtime.getRuntime();
+
+    long maxMemory = runtime.maxMemory() / oneMB;
+    long allocatedMemory = runtime.totalMemory() / oneMB;
+    long freeMemory = runtime.freeMemory() / oneMB;
+
+    logger.info("freeMemory: {}M; allocatedMemory: {}M; maxMemory: {}M", freeMemory,
+        allocatedMemory, maxMemory);
+
+    Assert.assertFalse("Run out of memory.", allocatedMemory == maxMemory && freeMemory < 10);
+  }
+
+  protected StringSerde createKeySerde()
+  {
+    return new StringSerde();
+  }
+
+  protected Serde<String> createValueSerde()
+  {
+    return new StringSerde();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
new file mode 100644
index 0000000..4792843
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.state;
+
+import java.io.File;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
+
+/**
+ * This is not a really unit test, but in fact a benchmark runner.
+ * Provides this class to give developers the convenience to run in local IDE environment.
+ *
+ */
+public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
+{
+  public static final String basePath = "target/temp";
+
+  @Before
+  public void before()
+  {
+    FileUtil.fullyDelete(new File(basePath));
+  }
+
+  @Test
+  public void testUpdateSync() throws Exception
+  {
+    test(ExecMode.UPDATESYNC);
+  }
+
+  @Test
+  public void testUpdateAsync() throws Exception
+  {
+    test(ExecMode.UPDATEASYNC);
+  }
+
+  @Test
+  public void testInsert() throws Exception
+  {
+    test(ExecMode.INSERT);
+  }
+
+  public void test(ExecMode exeMode) throws Exception
+  {
+    Configuration conf = new Configuration(false);
+
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    super.populateDAG(dag, conf);
+    storeOperator.execMode = exeMode;
+
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+      }
+    };
+
+    lma.prepareDAG(app, conf);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.run(300000);
+
+    lc.shutdown();
+  }
+
+
+
+  @Override
+  public String getStoreBasePath(Configuration conf)
+  {
+    return basePath;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
deleted file mode 100644
index 4435aad..0000000
--- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.benchmark.state;
-
-import java.io.File;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
-
-/**
- * This is not a really unit test, but in fact a benchmark runner.
- * Provides this class to give developers the convenience to run in local IDE environment.
- *
- */
-public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp
-{
-  public static final String basePath = "target/temp";
-
-  @Before
-  public void before()
-  {
-    FileUtil.fullyDelete(new File(basePath));
-  }
-
-  @Test
-  public void testUpdateSync() throws Exception
-  {
-    test(ExecMode.UPDATESYNC);
-  }
-
-  @Test
-  public void testUpdateAsync() throws Exception
-  {
-    test(ExecMode.UPDATEASYNC);
-  }
-
-  @Test
-  public void testInsert() throws Exception
-  {
-    test(ExecMode.INSERT);
-  }
-
-  public void test(ExecMode exeMode) throws Exception
-  {
-    Configuration conf = new Configuration(false);
-
-    LocalMode lma = LocalMode.newInstance();
-    DAG dag = lma.getDAG();
-
-    super.populateDAG(dag, conf);
-    storeOperator.execMode = exeMode;
-
-    StreamingApplication app = new StreamingApplication()
-    {
-      @Override
-      public void populateDAG(DAG dag, Configuration conf)
-      {
-      }
-    };
-
-    lma.prepareDAG(app, conf);
-
-    // Create local cluster
-    final LocalMode.Controller lc = lma.getController();
-    lc.run(300000);
-
-    lc.shutdown();
-  }
-
-
-
-  @Override
-  public String getStoreBasePath(Configuration conf)
-  {
-    return basePath;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties
index cf0d19e..3fc0120 100644
--- a/benchmark/src/test/resources/log4j.properties
+++ b/benchmark/src/test/resources/log4j.properties
@@ -41,3 +41,5 @@ log4j.logger.org=info
 #log4j.logger.org.apache.commons.beanutils=warn
 log4j.logger.com.datatorrent=debug
 log4j.logger.org.apache.apex=debug
+log4j.logger.org.apache.apex.malhar.lib.state.managed=info
+log4j.logger.com.datatorrent.common.util.FSStorageAgent=info

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index dd2bbab..20271b0 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -125,7 +125,7 @@ import com.datatorrent.netlet.util.Slice;
  */
 public abstract class AbstractManagedStateImpl
     implements ManagedState, Component<OperatorContext>, Operator.CheckpointNotificationListener, ManagedStateContext,
-    TimeBucketAssigner.PurgeListener
+    TimeBucketAssigner.PurgeListener, BucketProvider
 {
   private long maxMemorySize;
 
@@ -319,11 +319,24 @@ public abstract class AbstractManagedStateImpl
     return (int)(bucketId % numBuckets);
   }
 
-  Bucket getBucket(long bucketId)
+  @Override
+  public Bucket getBucket(long bucketId)
   {
     return buckets[getBucketIdx(bucketId)];
   }
 
+  @Override
+  public Bucket ensureBucket(long bucketId)
+  {
+    Bucket b = getBucket(bucketId);
+    if (b == null) {
+      b = newBucket(bucketId);
+      b.setup(this);
+      buckets[getBucketIdx(bucketId)] = b;
+    }
+    return b;
+  }
+
   protected Bucket newBucket(long bucketId)
   {
     return new Bucket.DefaultBucket(bucketId);
@@ -384,6 +397,22 @@ public abstract class AbstractManagedStateImpl
     }
   }
 
+  /**
+   * get the memory usage for each bucket
+   * @return The map of bucket id to memory size used by the bucket
+   */
+  public Map<Long, Long> getBucketMemoryUsage()
+  {
+    Map<Long, Long> bucketToSize = Maps.newHashMap();
+    for (Bucket bucket : buckets) {
+      if (bucket == null) {
+        continue;
+      }
+      bucketToSize.put(bucket.getBucketId(), bucket.getKeyStream().size() + bucket.getValueStream().size());
+    }
+    return bucketToSize;
+  }
+
   @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @Override
   public void teardown()
@@ -476,6 +505,7 @@ public abstract class AbstractManagedStateImpl
     this.keyComparator = Preconditions.checkNotNull(keyComparator);
   }
 
+  @Override
   public BucketsFileSystem getBucketsFileSystem()
   {
     return bucketsFileSystem;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 4fc2327..cbc4e03 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -32,6 +33,10 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.malhar.lib.utils.serde.KeyValueByteStreamProvider;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
@@ -46,7 +51,7 @@ import com.datatorrent.netlet.util.Slice;
  *
  * @since 3.4.0
  */
-public interface Bucket extends ManagedStateComponent
+public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvider
 {
   /**
    * @return bucket id
@@ -218,13 +223,22 @@ public interface Bucket extends ManagedStateComponent
 
     private transient TreeMap<Long, BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
 
+    /**
+     * By default, separate keys and values into two different streams.
+     * key stream and value stream should be created during construction instead of setup, as the reference of the streams will be passed to the serialize method
+     */
+    protected WindowedBlockStream keyStream = new WindowedBlockStream();
+    protected WindowedBlockStream valueStream = new WindowedBlockStream();
+
+    protected ConcurrentLinkedQueue<Long> windowsForFreeMemory = new ConcurrentLinkedQueue<>();
+
     private DefaultBucket()
     {
       //for kryo
       bucketId = -1;
     }
 
-    protected DefaultBucket(long bucketId)
+    public DefaultBucket(long bucketId)
     {
       this.bucketId = bucketId;
     }
@@ -321,6 +335,9 @@ public interface Bucket extends ManagedStateComponent
     @Override
     public Slice get(Slice key, long timeBucket, ReadSource readSource)
     {
+      // This call is lightweight
+      releaseMemory();
+      key = SliceUtils.toBufferSlice(key);
       switch (readSource) {
         case MEMORY:
           return getFromMemory(key);
@@ -392,6 +409,11 @@ public interface Bucket extends ManagedStateComponent
     @Override
     public void put(Slice key, long timeBucket, Slice value)
     {
+      // This call is lightweight
+      releaseMemory();
+      key = SliceUtils.toBufferSlice(key);
+      value = SliceUtils.toBufferSlice(value);
+
       BucketedValue bucketedValue = flash.get(key);
       if (bucketedValue == null) {
         bucketedValue = new BucketedValue(timeBucket, value);
@@ -409,39 +431,45 @@ public interface Bucket extends ManagedStateComponent
       }
     }
 
+    /**
+     * Free memory up to the given windowId
+     * This method will be called by another thread. Adding concurrency control to Stream would impact the performance.
+     * This method only calculates the size of the memory that could be released and then sends free memory request to the operator thread
+     */
     @Override
     public long freeMemory(long windowId) throws IOException
     {
-      long memoryFreed = 0;
-      Long clearWindowId;
-
-      while ((clearWindowId = committedData.floorKey(windowId)) != null) {
-        Map<Slice, BucketedValue> windowData = committedData.remove(clearWindowId);
+      // calculate the size first and then send the release memory request. It could reduce the chance of conflict and increase the performance.
+      long size = keyStream.dataSizeUpToWindow(windowId) + valueStream.dataSizeUpToWindow(windowId);
+      windowsForFreeMemory.add(windowId);
+      return size;
+    }
 
-        for (Map.Entry<Slice, BucketedValue> entry: windowData.entrySet()) {
-          memoryFreed += entry.getKey().length + entry.getValue().getSize();
-        }
+    /**
+     * This operation must be called from operator thread. It won't do anything if no memory to be freed
+     */
+    protected long releaseMemory()
+    {
+      long memoryFreed = 0;
+      while (!windowsForFreeMemory.isEmpty()) {
+        long windowId = windowsForFreeMemory.poll();
+        long originSize = keyStream.size() + valueStream.size();
+        keyStream.completeWindow(windowId);
+        valueStream.completeWindow(windowId);
+        memoryFreed += originSize - (keyStream.size() + valueStream.size());
       }
-      fileCache.clear();
-      if (cachedBucketMetas != null) {
-
-        for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas.values()) {
-          FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
-          if (reader != null) {
-            memoryFreed += tbm.getSizeInBytes();
-            reader.close();
-          }
-        }
 
+      if (memoryFreed > 0) {
+        LOG.debug("Total freed memory size: {}", memoryFreed);
+        sizeInBytes.getAndAdd(-memoryFreed);
       }
-      sizeInBytes.getAndAdd(-memoryFreed);
-      LOG.debug("space freed {} {}", bucketId, memoryFreed);
       return memoryFreed;
     }
 
     @Override
     public Map<Slice, BucketedValue> checkpoint(long windowId)
     {
+      releaseMemory();
       try {
         //transferring the data from flash to check-pointed state in finally block and re-initializing the flash.
         return flash;
@@ -548,6 +576,19 @@ public interface Bucket extends ManagedStateComponent
       return checkpointedData;
     }
 
+
+    @Override
+    public WindowedBlockStream getKeyStream()
+    {
+      return keyStream;
+    }
+
+    @Override
+    public WindowedBlockStream getValueStream()
+    {
+      return valueStream;
+    }
+
     private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
new file mode 100644
index 0000000..bbd18ac
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+/**
+ * This interface declares methods to get bucket by bucket id
+ *
+ */
+public interface BucketProvider
+{
+  /**
+   * get bucket by bucket id
+   * @param bucketId
+   * @return
+   */
+  public Bucket getBucket(long bucketId);
+
+  /**
+   * Create bucket if not exist, return the bucket
+   * @param bucketId
+   * @return
+   */
+  public Bucket ensureBucket(long bucketId);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
index a59872c..d0ca9ff 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
@@ -26,9 +26,9 @@ import java.util.ListIterator;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.utils.serde.CollectionSerde;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.esotericsoftware.kryo.DefaultSerializer;
@@ -37,7 +37,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
@@ -58,11 +57,10 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
   @NotNull
   private SpillableStateStore store;
   @NotNull
-  private Serde<T, Slice> serde;
+  private Serde<T> serde;
   @NotNull
   private SpillableMapImpl<Integer, List<T>> map;
 
-  private boolean sizeCached = false;
   private int size;
   private int numBatches;
 
@@ -86,15 +84,15 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
    */
   public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
       @NotNull SpillableStateStore store,
-      @NotNull Serde<T, Slice> serde)
+      @NotNull Serde<T> serde)
   {
     this.bucketId = bucketId;
     this.prefix = Preconditions.checkNotNull(prefix);
     this.store = Preconditions.checkNotNull(store);
     this.serde = Preconditions.checkNotNull(serde);
 
-    map = new SpillableMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(),
-        new SerdeCollectionSlice<>(serde, (Class<List<T>>)(Class)ArrayList.class));
+    map = new SpillableMapImpl<>(store, prefix, bucketId, new IntSerde(),
+        new CollectionSerde<T, List<T>>(serde, (Class)ArrayList.class));
   }
 
   /**
@@ -111,7 +109,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
    */
   public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
       @NotNull SpillableStateStore store,
-      @NotNull Serde<T, Slice> serde,
+      @NotNull Serde<T> serde,
       int batchSize)
   {
     this(bucketId, prefix, store, serde);
@@ -328,6 +326,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
   @Override
   public void setup(Context.OperatorContext context)
   {
+    store.ensureBucket(bucketId);
     map.setup(context);
   }
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
index 0944583..d3340ce 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
@@ -26,10 +26,10 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
 import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.esotericsoftware.kryo.DefaultSerializer;
@@ -62,10 +62,11 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
   @NotNull
   private SpillableMapImpl<Slice, Integer> map;
   private SpillableStateStore store;
-  private byte[] identifier;
   private long bucket;
-  private Serde<K, Slice> serdeKey;
-  private Serde<V, Slice> serdeValue;
+  private Serde<V> valueSerde;
+
+  protected transient Context.OperatorContext context;
+  protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
 
   private SpillableArrayListMultimapImpl()
   {
@@ -78,20 +79,20 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
    * @param identifier The Id of this {@link SpillableArrayListMultimapImpl}.
    * @param bucket The Id of the bucket used to store this
    * {@link SpillableArrayListMultimapImpl} in the provided {@link SpillableStateStore}.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+   * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+   * @param valueSerde The {@link Serde} to use when serializing and deserializing values.
    */
   public SpillableArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+      Serde<K> keySerde,
+      Serde<V> valueSerde)
   {
     this.store = Preconditions.checkNotNull(store);
-    this.identifier = Preconditions.checkNotNull(identifier);
     this.bucket = bucket;
-    this.serdeKey = Preconditions.checkNotNull(serdeKey);
-    this.serdeValue = Preconditions.checkNotNull(serdeValue);
+    this.valueSerde = Preconditions.checkNotNull(valueSerde);
+
+    keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(SIZE_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
 
-    map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice());
+    map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new IntSerde());
   }
 
   public SpillableStateStore getStore()
@@ -110,15 +111,12 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
     SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
 
     if (spillableArrayList == null) {
-      Slice keySlice = serdeKey.serialize(key);
-      Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX));
-
+      Integer size = map.get(keyValueSerdeManager.serializeMetaKey(key, false));
       if (size == null) {
         return null;
       }
 
-      Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
-      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, false).toByteArray(), store, valueSerde);
       spillableArrayList.setSize(size);
     }
 
@@ -179,8 +177,7 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
   @Override
   public boolean containsKey(@Nullable Object key)
   {
-    return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
-        SIZE_KEY_SUFFIX));
+    return cache.contains((K)key) || map.containsKey(keyValueSerdeManager.serializeMetaKey((K)key, false));
   }
 
   @Override
@@ -217,9 +214,9 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
     SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
 
     if (spillableArrayList == null) {
-      Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
-      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
-
+      Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, true);
+      spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, valueSerde);
+      spillableArrayList.setup(context);
       cache.put(key, spillableArrayList);
     }
 
@@ -272,14 +269,19 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
   @Override
   public void setup(Context.OperatorContext context)
   {
+    this.context = context;
+
     map.setup(context);
     isRunning = true;
+
+    keyValueSerdeManager.setup(store, bucket);
   }
 
   @Override
   public void beginWindow(long windowId)
   {
     map.beginWindow(windowId);
+    keyValueSerdeManager.beginWindow(windowId);
     isInWindow = true;
   }
 
@@ -292,13 +294,14 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
       SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
       spillableArrayList.endWindow();
 
-      Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX),
-          spillableArrayList.size());
+      map.put(keyValueSerdeManager.serializeMetaKey(key, true), spillableArrayList.size());
     }
 
     Preconditions.checkState(cache.getRemovedKeys().isEmpty());
     cache.endWindow();
     map.endWindow();
+
+    keyValueSerdeManager.resetReadBuffer();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index c4462d5..542a914 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -24,7 +24,6 @@ import org.apache.apex.malhar.lib.utils.serde.Serde;
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Operator;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * This is a composite component containing spillable data structures. This should be used as
@@ -43,7 +42,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
    * @return A {@link SpillableList}.
    */
-  <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde);
+  <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde);
 
   /**
    * This is a method for creating a {@link SpillableList}.
@@ -53,7 +52,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
    * @return A {@link SpillableList}.
    */
-  <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde);
+  <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde);
 
   /**
    * This is a method for creating a {@link SpillableMap}. This method
@@ -65,8 +64,8 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serdeValue The Serializer/Deserializer to use for the map's values.
    * @return A {@link SpillableMap}.
    */
-  <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue);
+  <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey,
+      Serde<V> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableMap}.
@@ -79,7 +78,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @return A {@link SpillableMap}.
    */
   <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue);
+      Serde<K> serdeKey, Serde<V> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableListMultimap}. This method
@@ -91,8 +90,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
    * @return A {@link SpillableListMultimap}.
    */
-  <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
-      Slice> serdeKey, Serde<V, Slice> serdeValue);
+  <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableListMultimap}.
@@ -105,8 +103,8 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @return A {@link SpillableListMultimap}.
    */
   <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue);
+      Serde<K> serdeKey,
+      Serde<V> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableSetMultimap}.
@@ -117,8 +115,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
    * @return A {@link SpillableSetMultimap}.
    */
-  <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
-      Slice> serdeKey, Serde<V, Slice> serdeValue);
+  <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue);
 
   /**
    * This is a method for creating a {@link SpillableMultiset}. This method
@@ -128,7 +125,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
    * @return A {@link SpillableMultiset}.
    */
-  <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde);
+  <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde);
 
   /**
    * This is a method for creating a {@link SpillableMultiset}.
@@ -138,7 +135,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
    * @return A {@link SpillableMultiset}.
    */
-  <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde);
+  <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T> serde);
 
   /**
    * This is a method for creating a {@link SpillableQueue}. This method
@@ -148,7 +145,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
    * @return A {@link SpillableQueue}.
    */
-  <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde);
+  <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T> serde);
 
   /**
    * This is a method for creating a {@link SpillableQueue}.
@@ -158,5 +155,5 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
    * @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
    * @return A {@link SpillableQueue}.
    */
-  <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde);
+  <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index aad219d..1a3f550 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -19,6 +19,7 @@
 package org.apache.apex.malhar.lib.state.spillable;
 
 import java.util.List;
+import java.util.Set;
 
 import javax.validation.constraints.NotNull;
 
@@ -27,9 +28,9 @@ import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * This is a factory that is used for Spillable datastructures. This component is used by nesting it inside of an
@@ -50,6 +51,11 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
   @NotNull
   private SpillableIdentifierGenerator identifierGenerator;
 
+  /**
+   * need to make sure all the buckets are created during setup.
+   */
+  protected transient Set<Long> bucketIds = Sets.newHashSet();
+
   private SpillableComplexComponentImpl()
   {
     // for kryo
@@ -66,84 +72,99 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
     this.identifierGenerator = Preconditions.checkNotNull(identifierGenerator);
   }
 
-  public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde)
   {
     SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde);
     componentList.add(list);
     return list;
   }
 
-  public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde)
   {
     identifierGenerator.register(identifier);
     SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde);
+    bucketIds.add(bucket);
     componentList.add(list);
     return list;
   }
 
-  public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+  @Override
+  public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey,
+      Serde<V> serdeValue)
   {
     SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifierGenerator.next(),
         bucket, serdeKey, serdeValue);
+    bucketIds.add(bucket);
     componentList.add(map);
     return map;
   }
 
-  public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+  @Override
+  public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K> serdeKey,
+      Serde<V> serdeValue)
   {
     identifierGenerator.register(identifier);
     SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
+    bucketIds.add(bucket);
     componentList.add(map);
     return map;
   }
 
-  public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
-      Slice> serdeKey, Serde<V, Slice> serdeValue)
+  @Override
+  public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
   {
     SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
         identifierGenerator.next(), bucket, serdeKey, serdeValue);
+    bucketIds.add(bucket);
     componentList.add(map);
     return map;
   }
 
+  @Override
   public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+      Serde<K> serdeKey,
+      Serde<V> serdeValue)
   {
     identifierGenerator.register(identifier);
     SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
         identifier, bucket, serdeKey, serdeValue);
+    bucketIds.add(bucket);
     componentList.add(map);
     return map;
   }
 
-  public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
-      Slice> serdeKey, Serde<V, Slice> serdeValue)
+  @Override
+  public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
   {
     SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<K, V>(store,
         identifierGenerator.next(), bucket, serdeKey, serdeValue);
+    bucketIds.add(bucket);
     componentList.add(map);
     return map;
   }
 
-  public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");
   }
 
-  public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");
   }
 
-  public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");
   }
 
-  public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde)
+  @Override
+  public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde)
   {
     throw new UnsupportedOperationException("Unsupported Operation");
   }
@@ -152,6 +173,15 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
   public void setup(Context.OperatorContext context)
   {
     store.setup(context);
+
+    //ensure buckets created.
+    for (long bucketId : bucketIds) {
+      store.ensureBucket(bucketId);
+    }
+
+    //the bucket ids are only for setup. We don't need bucket ids during run time.
+    bucketIds.clear();
+
     for (SpillableComponent spillableComponent: componentList) {
       spillableComponent.setup(context);
     }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
index 016aeec..5fa39d7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
@@ -26,13 +26,13 @@ import java.util.Set;
 import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.serializers.FieldSerializer;
 import com.google.common.base.Preconditions;
 
@@ -51,21 +51,20 @@ import com.datatorrent.netlet.util.Slice;
 public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spillable.SpillableComponent,
     Serializable
 {
+  private static final long serialVersionUID = 4552547110215784584L;
   private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
-  private transient MutableInt tempOffset = new MutableInt();
+  private transient Input tmpInput = new Input();
 
   @NotNull
   private SpillableStateStore store;
   @NotNull
   private byte[] identifier;
   private long bucket;
-  @NotNull
-  private Serde<K, Slice> serdeKey;
-  @NotNull
-  private Serde<V, Slice> serdeValue;
 
   private int size = 0;
 
+  protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
+
   private SpillableMapImpl()
   {
     //for kryo
@@ -77,17 +76,16 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
    * @param identifier The Id of this {@link SpillableMapImpl}.
    * @param bucket The Id of the bucket used to store this
    * {@link SpillableMapImpl} in the provided {@link SpillableStateStore}.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
-   * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+   * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+   * @param keySerde The {@link Serde} to use when serializing and deserializing values.
    */
-  public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+  public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> keySerde,
+      Serde<V> valueSerde)
   {
     this.store = Preconditions.checkNotNull(store);
     this.identifier = Preconditions.checkNotNull(identifier);
     this.bucket = bucket;
-    this.serdeKey = Preconditions.checkNotNull(serdeKey);
-    this.serdeValue = Preconditions.checkNotNull(serdeValue);
+    keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(null, identifier, Preconditions.checkNotNull(keySerde), Preconditions.checkNotNull(valueSerde));
   }
 
   public SpillableStateStore getStore()
@@ -134,16 +132,17 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
       return val;
     }
 
-    Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
+    Slice valSlice = store.getSync(bucket, keyValueSerdeManager.serializeDataKey(key, false));
 
     if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
       return null;
     }
 
-    tempOffset.setValue(0);
-    return serdeValue.deserialize(valSlice, tempOffset);
+    tmpInput.setBuffer(valSlice.buffer, valSlice.offset, valSlice.length);
+    return keyValueSerdeManager.deserializeValue(tmpInput);
   }
 
+
   @Override
   public V put(K k, V v)
   {
@@ -207,6 +206,8 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
   @Override
   public void setup(Context.OperatorContext context)
   {
+    store.ensureBucket(bucket);
+    keyValueSerdeManager.setup(store, bucket);
   }
 
   @Override
@@ -218,16 +219,15 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
   public void endWindow()
   {
     for (K key: cache.getChangedKeys()) {
-      store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
-          serdeValue.serialize(cache.get(key)));
+      store.put(bucket, keyValueSerdeManager.serializeDataKey(key, true),
+          keyValueSerdeManager.serializeValue(cache.get(key)));
     }
 
     for (K key: cache.getRemovedKeys()) {
-      store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
-          new Slice(ArrayUtils.EMPTY_BYTE_ARRAY));
+      store.put(this.bucket, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
     }
-
     cache.endWindow();
+    keyValueSerdeManager.resetReadBuffer();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
index c2741b0..0dfc411 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -26,15 +26,15 @@ import java.util.NoSuchElementException;
 import javax.validation.constraints.NotNull;
 
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.FieldSerializer;
 import com.google.common.base.Preconditions;
 
 import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
 
 /**
  * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
@@ -62,49 +62,30 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
     T next;
   }
 
-  public static class SerdeListNodeSlice<T> implements Serde<ListNode<T>, Slice>
+  public static class ListNodeSerde<T> implements Serde<ListNode<T>>
   {
-    private Serde<T, Slice> serde;
-    private static Slice falseSlice = new Slice(new byte[]{0});
-    private static Slice trueSlice = new Slice(new byte[]{1});
+    private Serde<T> serde;
 
-    public SerdeListNodeSlice(@NotNull Serde<T, Slice> serde)
+    public ListNodeSerde(@NotNull Serde<T> serde)
     {
       this.serde = Preconditions.checkNotNull(serde);
     }
 
     @Override
-    public Slice serialize(ListNode<T> object)
+    public void serialize(ListNode<T> object, Output output)
     {
-      int size = 0;
-
-      Slice slice1 = object.valid ? trueSlice : falseSlice;
-      size += 1;
-      Slice slice2 = serde.serialize(object.next);
-      size += slice2.length;
-
-      byte[] bytes = new byte[size];
-      System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
-      System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
-
-      return new Slice(bytes);
+      output.writeBoolean(object.valid);
+      serde.serialize(object.next, output);
     }
 
     @Override
-    public ListNode<T> deserialize(Slice slice, MutableInt offset)
+    public ListNode<T> deserialize(Input input)
     {
       ListNode<T> result = new ListNode<>();
-      result.valid = slice.buffer[offset.intValue()] != 0;
-      offset.add(1);
-      result.next = serde.deserialize(slice, offset);
+      result.valid = input.readBoolean();
+      result.next = serde.deserialize(input);
       return result;
     }
-
-    @Override
-    public ListNode<T> deserialize(Slice object)
-    {
-      return deserialize(object, new MutableInt(0));
-    }
   }
 
   @NotNull
@@ -135,11 +116,11 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
    */
   public SpillableSetImpl(long bucketId, @NotNull byte[] prefix,
       @NotNull SpillableStateStore store,
-      @NotNull Serde<T, Slice> serde)
+      @NotNull Serde<T> serde)
   {
     this.store = Preconditions.checkNotNull(store);
 
-    map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde));
+    map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new ListNodeSerde(serde));
   }
 
   public void setSize(int size)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index 98f60d2..76e47f2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -27,11 +27,11 @@ import java.util.Set;
 import javax.annotation.Nullable;
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
+import org.apache.apex.malhar.lib.utils.serde.PairSerde;
 import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
 import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdePairSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -65,10 +65,11 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
   private SpillableStateStore store;
   private byte[] identifier;
   private long bucket;
-  private Serde<K, Slice> serdeKey;
-  private Serde<V, Slice> serdeValue;
+  private Serde<V> valueSerde;
   private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>();
 
+  protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
+  protected transient Context.OperatorContext context;
   private SpillableSetMultimapImpl()
   {
     // for kryo
@@ -84,16 +85,15 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
    * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
    */
   public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
-      Serde<K, Slice> serdeKey,
-      Serde<V, Slice> serdeValue)
+      Serde<K> keySerde,
+      Serde<V> valueSerde)
   {
     this.store = Preconditions.checkNotNull(store);
-    this.identifier = Preconditions.checkNotNull(identifier);
     this.bucket = bucket;
-    this.serdeKey = Preconditions.checkNotNull(serdeKey);
-    this.serdeValue = Preconditions.checkNotNull(serdeValue);
+    this.valueSerde = Preconditions.checkNotNull(valueSerde);
+    keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(META_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
 
-    map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue));
+    map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new PairSerde<>(new IntSerde(), valueSerde));
   }
 
   public SpillableStateStore getStore()
@@ -112,17 +112,17 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     SpillableSetImpl<V> spillableSet = cache.get(key);
 
     if (spillableSet == null) {
-      Slice keySlice = serdeKey.serialize(key);
-      Pair<Integer, V> meta = map.get(SliceUtils.concatenate(keySlice, META_KEY_SUFFIX));
+      Pair<Integer, V> meta = map.get(keyValueSerdeManager.serializeMetaKey(key, false));
 
       if (meta == null) {
         return null;
       }
 
-      Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
-      spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+      Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, false);
+      spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde);
       spillableSet.setSize(meta.getLeft());
       spillableSet.setHead(meta.getRight());
+      spillableSet.setup(context);
     }
 
     cache.put(key, spillableSet);
@@ -166,7 +166,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     SpillableSetImpl<V> spillableSet = getHelper((K)key);
     if (spillableSet != null) {
       cache.remove((K)key);
-      Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+      Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
       map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead()));
       spillableSet.clear();
       removedSets.add(spillableSet);
@@ -199,7 +199,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     if (cache.contains((K)key)) {
       return true;
     }
-    Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+    Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
     Pair<Integer, V> meta = map.get(keySlice);
     return meta != null && meta.getLeft() > 0;
   }
@@ -227,8 +227,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
     SpillableSetImpl<V> spillableSet = getHelper(key);
 
     if (spillableSet == null) {
-      Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
-      spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+      spillableSet = new SpillableSetImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde);
+      spillableSet.setup(context);
       cache.put(key, spillableSet);
     }
     return spillableSet.add(value);
@@ -284,13 +284,16 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
   @Override
   public void setup(Context.OperatorContext context)
   {
+    this.context = context;
     map.setup(context);
+    keyValueSerdeManager.setup(store, bucket);
   }
 
   @Override
   public void beginWindow(long windowId)
   {
     map.beginWindow(windowId);
+    keyValueSerdeManager.beginWindow(windowId);
   }
 
   @Override
@@ -301,7 +304,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
       SpillableSetImpl<V> spillableSet = cache.get(key);
       spillableSet.endWindow();
 
-      map.put(SliceUtils.concatenate(serdeKey.serialize(key), META_KEY_SUFFIX),
+      map.put(keyValueSerdeManager.serializeMetaKey(key, true),
           new ImmutablePair<>(spillableSet.size(), spillableSet.getHead()));
     }
 
@@ -311,6 +314,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
 
     cache.endWindow();
     map.endWindow();
+
+    keyValueSerdeManager.resetReadBuffer();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
index b6ee3c0..44f003b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
@@ -19,6 +19,7 @@
 package org.apache.apex.malhar.lib.state.spillable;
 
 import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.BucketProvider;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.datatorrent.api.Component;
@@ -32,6 +33,6 @@ import com.datatorrent.api.Operator;
  */
 @InterfaceStability.Evolving
 public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
-    Operator.CheckpointNotificationListener, WindowListener
+    Operator.CheckpointNotificationListener, WindowListener, BucketProvider
 {
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
index 0e1d55e..e80d38d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
@@ -21,6 +21,9 @@ package org.apache.apex.malhar.lib.state.spillable;
 import java.util.Map;
 import java.util.Set;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.base.Preconditions;
@@ -39,6 +42,7 @@ import com.google.common.collect.Sets;
 @InterfaceStability.Evolving
 public class WindowBoundedMapCache<K, V>
 {
+  private static final transient Logger logger = LoggerFactory.getLogger(WindowBoundedMapCache.class);
   public static final int DEFAULT_MAX_SIZE = 50000;
 
   private int maxSize = DEFAULT_MAX_SIZE;
@@ -109,7 +113,6 @@ public class WindowBoundedMapCache<K, V>
     Note: beginWindow is intentionally not implemented because many users need a cache that does not require
     beginWindow to be called.
    */
-
   public void endWindow()
   {
     int count = cache.size() - maxSize;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
index 61ab8a8..8acb044 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
@@ -23,7 +23,10 @@ import java.util.concurrent.Future;
 
 import javax.validation.constraints.NotNull;
 
+import org.apache.apex.malhar.lib.state.managed.Bucket;
 import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.google.common.collect.Maps;
@@ -74,6 +77,8 @@ public class InMemSpillableStateStore implements SpillableStateStore
       bucket = Maps.newHashMap();
       store.put(bucketId, bucket);
     }
+    key = SliceUtils.toBufferSlice(key);
+    value = SliceUtils.toBufferSlice(value);
 
     bucket.put(key, value);
   }
@@ -88,6 +93,10 @@ public class InMemSpillableStateStore implements SpillableStateStore
       store.put(bucketId, bucket);
     }
 
+    if (key.getClass() == Slice.class) {
+      //The hashCode of Slice was not correct, so correct it
+      key = new BufferSlice(key);
+    }
     return bucket.get(key);
   }
 
@@ -117,4 +126,21 @@ public class InMemSpillableStateStore implements SpillableStateStore
   {
     return store.toString();
   }
+
+  protected Bucket.DefaultBucket bucket;
+
+  @Override
+  public Bucket getBucket(long bucketId)
+  {
+    return bucket;
+  }
+
+  @Override
+  public Bucket ensureBucket(long bucketId)
+  {
+    if (bucket == null) {
+      bucket = new Bucket.DefaultBucket(1);
+    }
+    return bucket;
+  }
 }


[2/3] apex-malhar git commit: APEXMALHAR-2190 #resolve #comment Use reusable buffer for serialization in spillable data structures

Posted by th...@apache.org.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
new file mode 100644
index 0000000..57638d8
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.managed.Bucket;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * All spillable data structures use this class to manage the buffers for serialization.
+ * This class contains serialization logic that is common for all spillable data structures
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class AffixKeyValueSerdeManager<K, V> extends KeyValueSerdeManager<K, V>
+{
+  /**
+   * The read buffer will be released when read is done, while write buffer should be held until the data has been persisted.
+   * The write buffer should be non-transient. The data which has been already saved to files will be removed by {@link Bucket}
+   * while the data which haven't been saved need to be recovered by the platform from checkpoint.
+   */
+  private AffixSerde<K> metaKeySerde;
+  private AffixSerde<K> dataKeySerde;
+
+
+  private AffixKeyValueSerdeManager()
+  {
+    //for kyro
+  }
+
+  public AffixKeyValueSerdeManager(byte[] metaKeySuffix, byte[] dataKeyIdentifier, Serde<K> keySerde, Serde<V> valueSerde)
+  {
+    this.valueSerde = valueSerde;
+    metaKeySerde = new AffixSerde<K>(null, keySerde, metaKeySuffix);
+    dataKeySerde = new AffixSerde<K>(dataKeyIdentifier, keySerde, null);
+  }
+
+  public Slice serializeMetaKey(K key, boolean write)
+  {
+    SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+    metaKeySerde.serialize(key, buffer);
+    return buffer.toSlice();
+  }
+
+  public Slice serializeDataKey(K key, boolean write)
+  {
+    SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+    dataKeySerde.serialize(key, buffer);
+    return buffer.toSlice();
+  }
+
+  public V deserializeValue(Input input)
+  {
+    V value = valueSerde.deserialize(input);
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
new file mode 100644
index 0000000..7504633
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * AffixSerde provides serde for adding prefix or suffix
+ *
+ * @param <T>
+ */
+public class AffixSerde<T> implements Serde<T>
+{
+  private Serde<T> serde;
+  private byte[] prefix;
+  private byte[] suffix;
+
+  private AffixSerde()
+  {
+    //kyro
+  }
+
+  public AffixSerde(byte[] prefix, Serde<T> serde, byte[] suffix)
+  {
+    this.prefix = prefix;
+    this.suffix = suffix;
+    this.serde = serde;
+  }
+
+  @Override
+  public void serialize(T object, Output output)
+  {
+    if (prefix != null && prefix.length > 0) {
+      output.write(prefix);
+    }
+    serde.serialize(object, output);
+    if (suffix != null && suffix.length > 0) {
+      output.write(suffix);
+    }
+  }
+
+  @Override
+  public T deserialize(Input input)
+  {
+    if (prefix != null && prefix.length > 0) {
+      input.skip(prefix.length);
+    }
+    return serde.deserialize(input);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
new file mode 100644
index 0000000..4b2a45b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.lang.reflect.Array;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+
+public class ArraySerde<T> implements Serde<T[]>
+{
+  private Serde<T> itemSerde;
+  private Class<T> itemType;
+
+  private ArraySerde()
+  {
+  }
+
+  /**
+   * Serializer and Deserializer need different constructor, so use static factory method to wrap.
+   * The ArraySerde returned by newSerializer can only used for serialization
+   */
+  public static <T> ArraySerde<T> newSerializer(Serde<T> itemSerde)
+  {
+    return new ArraySerde<T>(Preconditions.checkNotNull(itemSerde));
+  }
+
+  public static <T> ArraySerde<T> newSerde(Serde<T> itemSerde, Class<T> itemType)
+  {
+    return new ArraySerde<T>(Preconditions.checkNotNull(itemSerde), Preconditions.checkNotNull(itemType));
+  }
+
+  private ArraySerde(Serde<T> itemSerde)
+  {
+    this.itemSerde = itemSerde;
+  }
+
+  private ArraySerde(Serde<T> itemSerde, Class<T> itemType)
+  {
+    this.itemSerde = itemSerde;
+    this.itemType = itemType;
+  }
+
+  @Override
+  public void serialize(T[] objects, Output output)
+  {
+    if (objects.length == 0) {
+      return;
+    }
+    output.writeInt(objects.length, true);
+    Serde<T> serializer = getItemSerde();
+    for (T object : objects) {
+      serializer.serialize(object, output);
+    }
+  }
+
+  protected Serde<T> getItemSerde()
+  {
+    return itemSerde;
+  }
+
+  @Override
+  public T[] deserialize(Input input)
+  {
+    int numOfElements = input.readInt(true);
+
+    T[] array = createObjectArray(numOfElements);
+
+    for (int index = 0; index < numOfElements; ++index) {
+      array[index] = getItemSerde().deserialize(input);
+    }
+    return array;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected T[] createObjectArray(int length)
+  {
+    return (T[])Array.newInstance(itemType, length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
new file mode 100644
index 0000000..c140962
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ *
+ * keep the information of one block
+ *
+ */
+public class Block
+{
+  public static class OutOfBlockBufferMemoryException extends RuntimeException
+  {
+    private static final long serialVersionUID = 3813792889200989131L;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(Block.class);
+
+  public static final int DEFAULT_BLOCK_SIZE = 100000;
+
+  //the capacity of the block
+  private int capacity;
+
+  /*
+   * the size of the data.
+   */
+  private volatile int size;
+
+  private int objectBeginOffset = 0;
+  private byte[] buffer;
+
+  /**
+   * whether any slices have been exposed to the caller.
+   */
+  private boolean exposedSlices;
+
+  private Block()
+  {
+    this(DEFAULT_BLOCK_SIZE);
+  }
+
+  public Block(int capacity)
+  {
+    if (capacity <= 0) {
+      throw new IllegalArgumentException("Invalid capacity: " + capacity);
+    }
+    buffer = new byte[capacity];
+    this.capacity = capacity;
+  }
+
+  public void write(byte data)
+  {
+    checkOrReallocateBuffer(1);
+    buffer[size++] = data;
+  }
+
+  public void write(byte[] data)
+  {
+    write(data, 0, data.length);
+  }
+
+  public void write(byte[] data, final int offset, final int length)
+  {
+    checkOrReallocateBuffer(length);
+
+    System.arraycopy(data, offset, buffer, size, length);
+    size += length;
+  }
+
+
+
+  /**
+   * check the buffer size and reallocate if buffer is not enough
+   *
+   * @param length
+   */
+  private void checkOrReallocateBuffer(int length) throws OutOfBlockBufferMemoryException
+  {
+    if (size + length <= capacity) {
+      return;
+    }
+
+    if (exposedSlices) {
+      throw new OutOfBlockBufferMemoryException();
+    }
+
+    //calculate the new capacity
+    capacity = (size + length) * 2;
+
+    byte[] oldBuffer = buffer;
+    buffer = new byte[capacity];
+
+    /**
+     * no slices are exposed in this block yet (this is the first object in this block).
+     * so we can reallocate and move the memory
+     */
+    if (size > 0) {
+      System.arraycopy(oldBuffer, 0, buffer, 0, size);
+    }
+  }
+
+  /**
+   * Similar to toSlice, this method is used to get the information of the
+   * object regards the data already write to buffer. But unlike toSlice() which
+   * indicates all the writes of this object are already done, this method can be called at
+   * any time
+   */
+  public Slice getLastObjectSlice()
+  {
+    return new Slice(buffer, objectBeginOffset, size - objectBeginOffset);
+  }
+
+  public void discardLastObjectData()
+  {
+    if (objectBeginOffset == 0) {
+      return;
+    }
+    size = objectBeginOffset;
+  }
+
+  public void moveLastObjectDataTo(Block newBlock)
+  {
+    if (size > objectBeginOffset) {
+      newBlock.write(buffer, objectBeginOffset, size - objectBeginOffset);
+      discardLastObjectData();
+    }
+  }
+
+  /**
+   * This method returns the slice that represents the serialized form.
+   * The process of serializing an object should be one or multiple calls of write() followed by a toSlice() call.
+   * A call to toSlice indicates the writes are done for this object
+   *
+   * @return
+   */
+  public BufferSlice toSlice()
+  {
+    if (size == objectBeginOffset) {
+      throw new RuntimeException("data size is zero.");
+    }
+    BufferSlice slice = new BufferSlice(buffer, objectBeginOffset, size - objectBeginOffset);
+    //prepare for next object
+    objectBeginOffset = size;
+    exposedSlices = true;
+    return slice;
+  }
+
+  public void reset()
+  {
+    size = 0;
+    objectBeginOffset = 0;
+    exposedSlices = false;
+  }
+
+  /**
+   * check if the block has enough space for the length
+   *
+   * @param length
+   * @return
+   */
+  public boolean hasEnoughSpace(int length)
+  {
+    return size + length < capacity;
+  }
+
+  public long size()
+  {
+    return size;
+  }
+
+  public long capacity()
+  {
+    return capacity;
+  }
+
+  public boolean isFresh()
+  {
+    return (size == 0 && objectBeginOffset == 0 && exposedSlices == false);
+  }
+
+  /**
+   * Returns whether the block is clear. The block is clear when there has not been any write calls since the last toSlice() call.
+   *
+   * @return
+   */
+  public boolean isClear()
+  {
+    return objectBeginOffset == size;
+  }
+
+  public void release()
+  {
+    reset();
+    buffer = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
new file mode 100644
index 0000000..f8a097e
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+/**
+ * The process of interface would be:
+ * - Stream keep on reporting how many free blocks it has in certain frequent. usually at the end of each window
+ * - Stream check how many block should release. Stream usually release the blocks but Stream can make its own decision
+ * - Stream report how many blocks actually released
+ */
+public interface BlockReleaseStrategy
+{
+  /**
+   * The stream should call this method to report to the strategy how many blocks are free currently.
+   * @param freeBlockNum
+   */
+  void currentFreeBlocks(int freeBlockNum);
+
+  /**
+   * Get how many blocks can be released
+   * @return
+   */
+  int getNumBlocksToRelease();
+
+  /**
+   * The stream should call this method to report how many block are released.
+   * @param numReleasedBlocks
+   */
+  void releasedBlocks(int numReleasedBlocks);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
new file mode 100644
index 0000000..ee50f7d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A stream is a collection of blocks
+ * BlockStream avoids copying the data that are already exposed to the caller
+ *
+ */
+public class BlockStream extends OutputStream
+{
+  private static final Logger logger = LoggerFactory.getLogger(BlockStream.class);
+
+  //the initial capacity of each block
+  protected final int blockCapacity;
+
+  protected Map<Integer, Block> blocks = Maps.newHashMap();
+  //the index of current block, valid block index should >= 0
+  protected int currentBlockIndex = 0;
+  protected long size = 0;
+
+  protected Block currentBlock;
+
+  public BlockStream()
+  {
+    this(Block.DEFAULT_BLOCK_SIZE);
+  }
+
+  public BlockStream(int blockCapacity)
+  {
+    this.blockCapacity = blockCapacity;
+  }
+
+  @Override
+  public void write(byte[] data)
+  {
+    write(data, 0, data.length);
+  }
+
+  @Override
+  public void write(int b)
+  {
+    currentBlock = getOrCreateCurrentBlock();
+    try {
+      currentBlock.write((byte)b);
+    } catch (Block.OutOfBlockBufferMemoryException e) {
+      reallocateBlock();
+      currentBlock.write((byte)b);
+    }
+    size++;
+  }
+
+  /**
+   * This write could be called multiple times for an object.
+   * The write method makes sure the same object only write to one block
+   *
+   * @param data
+   * @param offset
+   * @param length
+   */
+  @Override
+  public void write(byte[] data, final int offset, final int length)
+  {
+    //start with a block which at least can hold this data
+    currentBlock = getOrCreateCurrentBlock();
+    try {
+      currentBlock.write(data, offset, length);
+    } catch (Block.OutOfBlockBufferMemoryException e) {
+      reallocateBlock();
+      currentBlock.write(data, offset, length);
+    }
+    size += length;
+  }
+
+  private void reallocateBlock()
+  {
+    //use next block
+    Block previousBlock = moveToNextBlock();
+    if (!currentBlock.isFresh()) {
+      throw new RuntimeException("New block is not fresh.");
+    }
+    if (!previousBlock.isClear()) {
+      previousBlock.moveLastObjectDataTo(currentBlock);
+    }
+  }
+
+  /**
+   *
+   * @return The previous block
+   */
+  protected Block moveToNextBlock()
+  {
+    Block previousBlock = currentBlock;
+
+    ++currentBlockIndex;
+    currentBlock = getOrCreateCurrentBlock();
+    if (!currentBlock.isFresh()) {
+      throw new RuntimeException("Assigned non fresh block.");
+    }
+    return previousBlock;
+  }
+
+  protected Block getOrCreateCurrentBlock()
+  {
+    Block block = blocks.get(currentBlockIndex);
+    if (block == null) {
+      block = new Block(blockCapacity);
+      blocks.put(currentBlockIndex, block);
+    }
+    return block;
+  }
+
+  public long size()
+  {
+    return size;
+  }
+
+  public long capacity()
+  {
+    long capacity = 0;
+    for (Block block : blocks.values()) {
+      capacity += block.capacity();
+    }
+    return capacity;
+  }
+
+  /**
+   *
+   * this is the call that represents the end of an object
+   */
+  public Slice toSlice()
+  {
+    return blocks.get(currentBlockIndex).toSlice();
+  }
+
+  /**
+   * resets all blocks
+   */
+  public void reset()
+  {
+    currentBlockIndex = 0;
+    size = 0;
+    for (Block block : blocks.values()) {
+      block.reset();
+    }
+  }
+
+  public void release()
+  {
+    reset();
+    blocks.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
new file mode 100644
index 0000000..5d830fe
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.getopt.util.hash.MurmurHash;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * com.datatorrent.netlet.util.Slice has problem with the hashCode(), so
+ * override here
+ *
+ */
+public class BufferSlice extends Slice
+{
+  private static final long serialVersionUID = -471209532589983329L;
+  public static final BufferSlice EMPTY_SLICE = new BufferSlice(ArrayUtils.EMPTY_BYTE_ARRAY);
+
+  //for kyro
+  private BufferSlice()
+  {
+    //the super class's default constructor is private and can't called.
+    super(null, 0, 0);
+  }
+
+  public BufferSlice(byte[] array, int offset, int length)
+  {
+    super(array, offset, length);
+  }
+
+  public BufferSlice(byte[] array)
+  {
+    super(array);
+  }
+
+  public BufferSlice(Slice netletSlice)
+  {
+    this(netletSlice.buffer, netletSlice.offset, netletSlice.length);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int hash = 5;
+    hash = 59 * hash + MurmurHash.hash(buffer, hash, offset, length);
+    hash = 59 * hash + this.length;
+    return hash;
+  }
+
+  /**
+   * let this class equals with com.datatorrent.netlet.util.Slice
+   */
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (obj == null) {
+      return false;
+    }
+    if (!Slice.class.isAssignableFrom(obj.getClass())) {
+      return false;
+    }
+    final Slice other = (Slice)obj;
+    if (this.length != other.length) {
+      return false;
+    }
+
+    final int offset1 = this.offset;
+    final byte[] buffer1 = this.buffer;
+    int i = offset1 + this.length;
+
+    final byte[] buffer2 = other.buffer;
+    int j = other.offset + other.length;
+
+    while (i-- > offset1) {
+      if (buffer1[i] != buffer2[--j]) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
new file mode 100644
index 0000000..bcd0b74
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes lists.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class CollectionSerde<T, CollectionT extends Collection<T>> implements Serde<CollectionT>
+{
+  @NotNull
+  private Serde<T> serde;
+
+  @NotNull
+  private Class<? extends CollectionT> collectionClass;
+
+  private CollectionSerde()
+  {
+    // for Kryo
+  }
+
+  /**
+   * Creates a {@link CollectionSerde}.
+   * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
+   */
+  public CollectionSerde(@NotNull Serde<T> serde, @NotNull Class<? extends CollectionT> collectionClass /*Class<? extends C1> collectionClass*/ )
+  {
+    this.serde = Preconditions.checkNotNull(serde);
+    this.collectionClass = Preconditions.checkNotNull(collectionClass);
+  }
+
+  @Override
+  public void serialize(CollectionT objects, Output output)
+  {
+    if (objects.size() == 0) {
+      return;
+    }
+    output.writeInt(objects.size(), true);
+    Serde<T> serializer = getItemSerde();
+    for (T object : objects) {
+      serializer.serialize(object, output);
+    }
+  }
+
+  @Override
+  public CollectionT deserialize(Input input)
+  {
+    int numElements = input.readInt(true);
+
+    try {
+      CollectionT collection = collectionClass.newInstance();
+
+      for (int index = 0; index < numElements; index++) {
+        T object = serde.deserialize(input);
+        collection.add(object);
+      }
+
+      return collection;
+    } catch (Exception ex) {
+      throw Throwables.propagate(ex);
+    }
+  }
+
+  protected Serde<T> getItemSerde()
+  {
+    return serde;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
new file mode 100644
index 0000000..93929e4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Arrays;
+
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
+
+/**
+ * This implementation get the minimum number of free blocks in the period to release.
+ *
+ */
+public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
+{
+  public static final int DEFAULT_PERIOD = 60; // 60 reports
+  private CircularFifoBuffer freeBlockNumQueue;
+  private Integer[] tmpArray;
+
+  public DefaultBlockReleaseStrategy()
+  {
+    this(DEFAULT_PERIOD);
+  }
+
+  public DefaultBlockReleaseStrategy(int period)
+  {
+    freeBlockNumQueue = new CircularFifoBuffer(period);
+    tmpArray = new Integer[period];
+    Arrays.fill(tmpArray, 0);
+  }
+
+  /**
+   * The stream calls this to report to the strategy how many blocks are free currently.
+   * @param freeBlockNum
+   */
+  @Override
+  public void currentFreeBlocks(int freeBlockNum)
+  {
+    if (freeBlockNum < 0) {
+      throw new IllegalArgumentException("The number of free blocks could not less than zero.");
+    }
+    freeBlockNumQueue.add(freeBlockNum);
+  }
+
+  /**
+   * Get how many blocks that can be released
+   * @return
+   */
+  @Override
+  public int getNumBlocksToRelease()
+  {
+    int minNum = Integer.MAX_VALUE;
+    for (Object num : freeBlockNumQueue) {
+      minNum = Math.min((Integer)num, minNum);
+    }
+    return minNum;
+  }
+
+
+  /**
+   * report how many blocks that have been released.
+   * @param numReleasedBlocks
+   */
+  @Override
+  public void releasedBlocks(int numReleasedBlocks)
+  {
+    if (numReleasedBlocks == 0) {
+      return;
+    }
+    if (numReleasedBlocks < 0) {
+      throw new IllegalArgumentException("Num of released blocks should not be negative");
+    }
+    /**
+     * decrease by released blocks
+     */
+    for (Object num : freeBlockNumQueue) {
+      freeBlockNumQueue.add(Math.max((Integer)num - numReleasedBlocks, 0));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
new file mode 100644
index 0000000..0fbb2ab
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
+ * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or
+ * incompatible changes to the class being serialized.
+ *
+ * @param <T> The type being serialized
+ */
+@InterfaceStability.Evolving
+public class GenericSerde<T> implements Serde<T>
+{
+  private transient ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
+  {
+    @Override
+    public Kryo get()
+    {
+      return new Kryo();
+    }
+  };
+
+  private final Class<? extends T> clazz;
+
+  public GenericSerde()
+  {
+    this.clazz = null;
+  }
+
+  public GenericSerde(Class<? extends T> clazz)
+  {
+    this.clazz = clazz;
+  }
+
+  @Override
+  public void serialize(T object, Output output)
+  {
+    Kryo kryo = kryos.get();
+    if (clazz == null) {
+      kryo.writeClassAndObject(output, object);
+    } else {
+      kryo.writeObject(output, object);
+    }
+  }
+
+  @Override
+  public T deserialize(Input input)
+  {
+    T object;
+    Kryo kryo = kryos.get();
+    if (clazz == null) {
+      object = (T)kryo.readClassAndObject(input);
+    } else {
+      object = kryo.readObject(input, clazz);
+    }
+    return object;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
new file mode 100644
index 0000000..032b5e0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class IntSerde implements Serde<Integer>
+{
+  @Override
+  public void serialize(Integer value, Output output)
+  {
+    output.writeInt(value);
+  }
+
+  @Override
+  public Integer deserialize(Input input)
+  {
+    return input.readInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
new file mode 100644
index 0000000..a7dfa7f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+/**
+ * This interface provides methods for stream for key/value.
+ * The implementation can separate the stream for key and value or share the same one.
+ *
+ */
+public interface KeyValueByteStreamProvider
+{
+  /**
+   * @return The stream for keeping key
+   */
+  WindowedBlockStream getKeyStream();
+
+  /**
+   * @return The stream for keeping value
+   */
+  WindowedBlockStream getValueStream();
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
new file mode 100644
index 0000000..6fbe9fe
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.managed.Bucket;
+import org.apache.apex.malhar.lib.state.managed.BucketProvider;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class KeyValueSerdeManager<K, V>
+{
+  protected Serde<K> keySerde;
+  protected Serde<V> valueSerde;
+
+  protected SerializationBuffer keyBufferForWrite;
+  protected transient SerializationBuffer keyBufferForRead = SerializationBuffer.READ_BUFFER;
+
+  protected SerializationBuffer valueBuffer;
+
+
+  protected KeyValueSerdeManager()
+  {
+    //for kyro
+  }
+
+  public KeyValueSerdeManager(Serde<K> keySerde, Serde<V> valueSerde)
+  {
+    this.keySerde = keySerde;
+    this.valueSerde = valueSerde;
+  }
+
+  public void setup(BucketProvider bp, long bucketId)
+  {
+    //the bucket will not change for this class. so get streams from setup, else, need to set stream before serialize
+    Bucket bucketInst = bp.ensureBucket(bucketId);
+    this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream());
+
+    keyBufferForWrite = new SerializationBuffer(bucketInst.getKeyStream());
+  }
+
+  public Slice serializeKey(K key, boolean write)
+  {
+    SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+    keySerde.serialize(key, buffer);
+    return buffer.toSlice();
+  }
+
+
+  /**
+   * Value only serialize for writing
+   * @param value
+   * @return
+   */
+  public Slice serializeValue(V value)
+  {
+    valueSerde.serialize(value, valueBuffer);
+    return valueBuffer.toSlice();
+  }
+
+  public void beginWindow(long windowId)
+  {
+    keyBufferForWrite.beginWindow(windowId);
+    valueBuffer.beginWindow(windowId);
+  }
+
+  public void resetReadBuffer()
+  {
+    keyBufferForRead.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
new file mode 100644
index 0000000..0b63737
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class LongSerde implements Serde<Long>
+{
+  @Override
+  public void serialize(Long value, Output output)
+  {
+    output.writeLong(value);
+  }
+
+  @Override
+  public Long deserialize(Input input)
+  {
+    return input.readLong();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
new file mode 100644
index 0000000..3190880
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes pairs.
+ */
+@InterfaceStability.Evolving
+public class PairSerde<T1, T2> implements Serde<Pair<T1, T2>>
+{
+  @NotNull
+  private Serde<T1> serde1;
+  @NotNull
+  private Serde<T2> serde2;
+
+  private PairSerde()
+  {
+    // for Kryo
+  }
+
+  /**
+   * Creates a {@link PairSerde}.
+   * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair
+   * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair
+   */
+  public PairSerde(@NotNull Serde<T1> serde1, @NotNull Serde<T2> serde2)
+  {
+    this.serde1 = Preconditions.checkNotNull(serde1);
+    this.serde2 = Preconditions.checkNotNull(serde2);
+  }
+
+  @Override
+  public void serialize(Pair<T1, T2> pair, Output output)
+  {
+    serde1.serialize(pair.getLeft(), output);
+    serde2.serialize(pair.getRight(), output);
+  }
+
+  @Override
+  public Pair<T1, T2> deserialize(Input input)
+  {
+    T1 first = serde1.deserialize(input);
+    T2 second = serde2.deserialize(input);
+    return new ImmutablePair<>(first, second);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
deleted file mode 100644
index 9669981..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * This is a simple pass through {@link Serde}. When serialization is performed the input byte array is returned.
- * Similarly when deserialization is performed the input byte array is returned.
- *
- * @since 3.4.0
- */
-@InterfaceStability.Evolving
-public class PassThruByteArraySerde implements Serde<byte[], byte[]>
-{
-  @Override
-  public byte[] serialize(byte[] object)
-  {
-    return object;
-  }
-
-  @Override
-  public byte[] deserialize(byte[] object, MutableInt offset)
-  {
-    offset.add(object.length);
-    return object;
-  }
-
-  @Override
-  public byte[] deserialize(byte[] object)
-  {
-    return object;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
deleted file mode 100644
index b22bf6f..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is a simple {@link Serde} which serializes and deserializes byte arrays to {@link Slice}s. A byte array is
- * serialized by simply wrapping it in a {@link Slice} object and deserialized by simply reading the byte array
- * out of the {@link Slice} object.
- *
- * <b>Note:</b> The deserialized method doesn't use the offset argument in this implementation.
- *
- * @since 3.5.0
- */
-public class PassThruByteArraySliceSerde implements Serde<byte[], Slice>
-{
-  @Override
-  public Slice serialize(byte[] object)
-  {
-    return new Slice(object);
-  }
-
-  @Override
-  public byte[] deserialize(Slice object, MutableInt offset)
-  {
-    offset.add(object.length);
-
-    if (object.offset == 0) {
-      return object.buffer;
-    }
-
-    byte[] bytes = new byte[object.length];
-    System.arraycopy(object.buffer, object.offset, bytes, 0, object.length);
-    return bytes;
-  }
-
-  @Override
-  public byte[] deserialize(Slice object)
-  {
-    return deserialize(object, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
index 2646c0e..679e116 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
@@ -18,9 +18,14 @@
  */
 package org.apache.apex.malhar.lib.utils.serde;
 
-import org.apache.commons.lang3.mutable.MutableInt;
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Throwables;
+
 import com.datatorrent.netlet.util.Slice;
 
 /**
@@ -30,23 +35,26 @@ import com.datatorrent.netlet.util.Slice;
  * @since 3.5.0
  */
 @InterfaceStability.Evolving
-public class PassThruSliceSerde implements Serde<Slice, Slice>
+public class PassThruSliceSerde implements Serde<Slice>
 {
   @Override
-  public Slice serialize(Slice object)
-  {
-    return object;
-  }
-
-  @Override
-  public Slice deserialize(Slice object, MutableInt offset)
+  public void serialize(Slice slice, Output output)
   {
-    return object;
+    output.write(slice.buffer, slice.offset, slice.length);
   }
 
   @Override
-  public Slice deserialize(Slice object)
+  public Slice deserialize(Input input)
   {
-    return object;
+    if (input.getInputStream() != null) {
+      // The input is backed by a stream, cannot directly use its internal buffer
+      try {
+        return new Slice(input.readBytes(input.available()));
+      } catch (IOException ex) {
+        throw Throwables.propagate(ex);
+      }
+    } else {
+      return new Slice(input.getBuffer(), input.position(), input.limit() - input.position());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
index 6e02aee..d09612d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
@@ -18,46 +18,29 @@
  */
 package org.apache.apex.malhar.lib.utils.serde;
 
-import org.apache.commons.lang3.mutable.MutableInt;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 
 /**
  * This is an interface for a Serializer/Deserializer class.
- * @param <OBJ> The type of the object to Serialize and Deserialize.
- * @param <SER> The type to Serialize an Object to.
+ * @param <T> The type of the object to Serialize and Deserialize.
  *
  * @since 3.4.0
  */
-public interface Serde<OBJ, SER>
+public interface Serde<T>
 {
   /**
-   * Serialized the given object.
-   * @param object The object to serialize.
-   * @return The serialized representation of the object.
+   * Serialize the object to the given output.
+   * @param object
+   * @param output
    */
-  SER serialize(OBJ object);
+  void serialize(T object, Output output);
 
   /**
-   * Deserializes the given serialized representation of an object.
-   * @param object The serialized representation of an object.
-   * @param offset An offset in the serialized representation of the object. After the
-   * deserialize method completes the offset is updated, so that the offset points to
-   * the remaining unprocessed portion of the serialized object. For example:<br/>
-   * {@code
-   * Object obj;
-   * MutableInt mi;
-   * someObj1 = deserialize(obj, mi);
-   * someObj2 = deserialize(obj, mi);
-   * }
+   * Deserialize from the input and return a new object.
    *
-   * @return The deserialized object.
+   * @param input
+   * @return
    */
-  OBJ deserialize(SER object, MutableInt offset);
-
-  /**
-   * Deserializes the given serialized representation of an object.
-   * @param object The serialized representation of an object.
-   *
-   * @return The deserialized object.
-   */
-  OBJ deserialize(SER object);
+  T deserialize(Input input);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
deleted file mode 100644
index eca1d5f..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.Collection;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which serializes and deserializes lists.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeCollectionSlice<T, CollectionT extends Collection<T>> implements Serde<CollectionT, Slice>
-{
-  @NotNull
-  private Serde<T, Slice> serde;
-
-  @NotNull
-  private Class<? extends CollectionT> collectionClass;
-
-  private SerdeCollectionSlice()
-  {
-    // for Kryo
-  }
-
-  /**
-   * Creates a {@link SerdeCollectionSlice}.
-   * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
-   */
-  public SerdeCollectionSlice(@NotNull Serde<T, Slice> serde, @NotNull Class<? extends CollectionT> collectionClass)
-  {
-    this.serde = Preconditions.checkNotNull(serde);
-    this.collectionClass = Preconditions.checkNotNull(collectionClass);
-  }
-
-  @Override
-  public Slice serialize(CollectionT objects)
-  {
-    Slice[] slices = new Slice[objects.size()];
-
-    int size = 4;
-
-    int index = 0;
-    for (T object : objects) {
-      Slice slice = serde.serialize(object);
-      slices[index++] = slice;
-      size += slice.length;
-    }
-
-    byte[] bytes = new byte[size];
-    int offset = 0;
-
-    byte[] sizeBytes = GPOUtils.serializeInt(objects.size());
-    System.arraycopy(sizeBytes, 0, bytes, offset, 4);
-    offset += 4;
-
-    for (index = 0; index < slices.length; index++) {
-      Slice slice = slices[index];
-      System.arraycopy(slice.buffer, slice.offset, bytes, offset, slice.length);
-      offset += slice.length;
-    }
-
-    return new Slice(bytes);
-  }
-
-  @Override
-  public CollectionT deserialize(Slice slice, MutableInt offset)
-  {
-    MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue());
-
-    int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset);
-    sliceOffset.subtract(slice.offset);
-    try {
-      CollectionT collection = collectionClass.newInstance();
-
-      for (int index = 0; index < numElements; index++) {
-        T object = serde.deserialize(slice, sliceOffset);
-        collection.add(object);
-      }
-
-      offset.setValue(sliceOffset.intValue());
-      return collection;
-    } catch (Exception ex) {
-      throw Throwables.propagate(ex);
-    }
-  }
-
-  @Override
-  public CollectionT deserialize(Slice slice)
-  {
-    return deserialize(slice, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
deleted file mode 100644
index 3275a93..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which deserializes and serializes integers.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeIntSlice implements Serde<Integer, Slice>
-{
-  @Override
-  public Slice serialize(Integer object)
-  {
-    return new Slice(GPOUtils.serializeInt(object));
-  }
-
-  @Override
-  public Integer deserialize(Slice slice, MutableInt offset)
-  {
-    int val = GPOUtils.deserializeInt(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
-    offset.add(4);
-    return val;
-  }
-
-  @Override
-  public Integer deserialize(Slice object)
-  {
-    return deserialize(object, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
deleted file mode 100644
index d4b9488..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.io.ByteArrayOutputStream;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
- * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or
- * incompatible changes to the class being serialized.
- *
- * @param <T> The type being serialized
- */
-@InterfaceStability.Evolving
-public class SerdeKryoSlice<T> implements Serde<T, Slice>
-{
-  // Setup ThreadLocal of Kryo instances
-  private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
-  {
-    protected Kryo initialValue()
-    {
-      Kryo kryo = new Kryo();
-      // configure kryo instance, customize settings
-      return kryo;
-    }
-  };
-
-  private final Class<? extends T> clazz;
-
-  public SerdeKryoSlice()
-  {
-    this.clazz = null;
-  }
-
-  public SerdeKryoSlice(Class<? extends T> clazz)
-  {
-    this.clazz = clazz;
-  }
-
-  @Override
-  public Slice serialize(T object)
-  {
-    Kryo kryo = kryos.get();
-    ByteArrayOutputStream stream = new ByteArrayOutputStream();
-    Output output = new Output(stream);
-    if (clazz == null) {
-      kryo.writeClassAndObject(output, object);
-    } else {
-      kryo.writeObject(output, object);
-    }
-    return new Slice(output.toBytes());
-  }
-
-  @Override
-  public T deserialize(Slice slice, MutableInt offset)
-  {
-    byte[] bytes = slice.toByteArray();
-    Kryo kryo = kryos.get();
-    Input input = new Input(bytes, offset.intValue(), bytes.length - offset.intValue());
-    T object;
-    if (clazz == null) {
-      object = (T)kryo.readClassAndObject(input);
-    } else {
-      object = kryo.readObject(input, clazz);
-    }
-    offset.setValue(bytes.length - input.position());
-    return object;
-  }
-
-  @Override
-  public T deserialize(Slice slice)
-  {
-    return deserialize(slice, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
deleted file mode 100644
index 6fe07d9..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which deserializes and serializes integers.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeLongSlice implements Serde<Long, Slice>
-{
-  @Override
-  public Slice serialize(Long object)
-  {
-    return new Slice(GPOUtils.serializeLong(object));
-  }
-
-  @Override
-  public Long deserialize(Slice slice, MutableInt offset)
-  {
-    long val = GPOUtils.deserializeLong(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
-    offset.add(8);
-    return val;
-  }
-
-  @Override
-  public Long deserialize(Slice object)
-  {
-    return deserialize(object, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
deleted file mode 100644
index 59cf282..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.google.common.base.Preconditions;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which serializes and deserializes pairs.
- */
-@InterfaceStability.Evolving
-public class SerdePairSlice<T1, T2> implements Serde<Pair<T1, T2>, Slice>
-{
-  @NotNull
-  private Serde<T1, Slice> serde1;
-  @NotNull
-  private Serde<T2, Slice> serde2;
-
-  private SerdePairSlice()
-  {
-    // for Kryo
-  }
-
-  /**
-   * Creates a {@link SerdePairSlice}.
-   * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair
-   * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair
-   */
-  public SerdePairSlice(@NotNull Serde<T1, Slice> serde1, @NotNull Serde<T2, Slice> serde2)
-  {
-    this.serde1 = Preconditions.checkNotNull(serde1);
-    this.serde2 = Preconditions.checkNotNull(serde2);
-  }
-
-  @Override
-  public Slice serialize(Pair<T1, T2> pair)
-  {
-    int size = 0;
-
-    Slice slice1 = serde1.serialize(pair.getLeft());
-    size += slice1.length;
-    Slice slice2 = serde2.serialize(pair.getRight());
-    size += slice2.length;
-
-    byte[] bytes = new byte[size];
-    System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
-    System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
-
-    return new Slice(bytes);
-  }
-
-  @Override
-  public Pair<T1, T2> deserialize(Slice slice, MutableInt offset)
-  {
-    T1 first = serde1.deserialize(slice, offset);
-    T2 second = serde2.deserialize(slice, offset);
-    return new ImmutablePair<>(first, second);
-  }
-
-  @Override
-  public Pair<T1, T2> deserialize(Slice slice)
-  {
-    return deserialize(slice, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
deleted file mode 100644
index aaf0d61..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * An implementation of {@link Serde} which serializes and deserializes {@link String}s.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeStringSlice implements Serde<String, Slice>
-{
-  @Override
-  public Slice serialize(String object)
-  {
-    return new Slice(GPOUtils.serializeString(object));
-  }
-
-  @Override
-  public String deserialize(Slice object, MutableInt offset)
-  {
-    offset.add(object.offset);
-    String string = GPOUtils.deserializeString(object.buffer, offset);
-    offset.subtract(object.offset);
-    return string;
-  }
-
-  @Override
-  public String deserialize(Slice object)
-  {
-    return deserialize(object, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
new file mode 100644
index 0000000..f33f1e0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.spillable.WindowListener;
+
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SerializationBuffer extends Output implements WindowCompleteListener, WindowListener
+{
+  /*
+   * Singleton read buffer for serialization
+   */
+  public static final SerializationBuffer READ_BUFFER = new SerializationBuffer(new WindowedBlockStream());
+
+  private WindowedBlockStream windowedBlockStream;
+
+  @SuppressWarnings("unused")
+  private SerializationBuffer()
+  {
+    this(new WindowedBlockStream());
+  }
+
+  public SerializationBuffer(WindowedBlockStream windowedBlockStream)
+  {
+    super(windowedBlockStream);
+    this.windowedBlockStream = windowedBlockStream;
+  }
+
+  public long size()
+  {
+    return windowedBlockStream.size();
+  }
+
+  public long capacity()
+  {
+    return windowedBlockStream.capacity();
+  }
+
+  /**
+   * This method should be called only after the whole object has been written
+   * @return The slice which represents the object
+   */
+  public Slice toSlice()
+  {
+    this.flush();
+    return windowedBlockStream.toSlice();
+  }
+
+  /**
+   * reset the environment to reuse the resource.
+   */
+  public void reset()
+  {
+    windowedBlockStream.reset();
+  }
+
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    windowedBlockStream.beginWindow(windowId);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    windowedBlockStream.endWindow();
+  }
+
+  public void release()
+  {
+    reset();
+    windowedBlockStream.reset();
+  }
+
+  public WindowedBlockStream createWindowedBlockStream()
+  {
+    return new WindowedBlockStream();
+  }
+
+  public WindowedBlockStream createWindowedBlockStream(int capacity)
+  {
+    return new WindowedBlockStream(capacity);
+  }
+
+  public WindowedBlockStream getWindowedBlockStream()
+  {
+    return windowedBlockStream;
+  }
+
+  public void setWindowableByteStream(WindowedBlockStream windowableByteStream)
+  {
+    this.windowedBlockStream = windowableByteStream;
+  }
+
+  /**
+   * reset for all windows with window id less than or equal to the input windowId
+   * this interface doesn't call reset window for each windows. Several windows can be reset at the same time.
+   * @param windowId
+   */
+  @Override
+  public void completeWindow(long windowId)
+  {
+    windowedBlockStream.completeWindow(windowId);
+  }
+
+  public byte[] toByteArray()
+  {
+    return toSlice().toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
index 2671d5e..b504581 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
@@ -100,4 +100,14 @@ public class SliceUtils
 
     return new Slice(bytes);
   }
+
+  public static BufferSlice toBufferSlice(Slice slice)
+  {
+    if (slice instanceof BufferSlice) {
+      return (BufferSlice)slice;
+    }
+
+    //The hashCode of Slice was not correct, so correct it
+    return new BufferSlice(slice);
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
new file mode 100644
index 0000000..cb45e2a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * An implementation of {@link Serde} which serializes and deserializes {@link String}s.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class StringSerde implements Serde<String>
+{
+  @Override
+  public void serialize(String string, Output output)
+  {
+    output.writeString(string);
+  }
+
+  @Override
+  public String deserialize(Input input)
+  {
+    return input.readString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
new file mode 100644
index 0000000..d2d38a7
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+public interface WindowCompleteListener
+{
+  /**
+   * Notification that all windows which window id less or equal input windowId are complete
+   *
+   * @param windowId
+   */
+  void completeWindow(long windowId);
+}