You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/24 20:46:36 UTC

[2/3] apex-malhar git commit: APEXMALHAR-2190 Use reusable buffer for serialization in spillable data structures closes #404

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
new file mode 100644
index 0000000..57638d8
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.managed.Bucket;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * All spillable data structures use this class to manage the buffers for serialization.
+ * This class contains serialization logic that is common for all spillable data structures
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class AffixKeyValueSerdeManager<K, V> extends KeyValueSerdeManager<K, V>
+{
+  /**
+   * The read buffer will be released when read is done, while write buffer should be held until the data has been persisted.
+   * The write buffer should be non-transient. The data which has been already saved to files will be removed by {@link Bucket}
+   * while the data which haven't been saved need to be recovered by the platform from checkpoint.
+   */
+  private AffixSerde<K> metaKeySerde;
+  private AffixSerde<K> dataKeySerde;
+
+
+  private AffixKeyValueSerdeManager()
+  {
+    //for kyro
+  }
+
+  public AffixKeyValueSerdeManager(byte[] metaKeySuffix, byte[] dataKeyIdentifier, Serde<K> keySerde, Serde<V> valueSerde)
+  {
+    this.valueSerde = valueSerde;
+    metaKeySerde = new AffixSerde<K>(null, keySerde, metaKeySuffix);
+    dataKeySerde = new AffixSerde<K>(dataKeyIdentifier, keySerde, null);
+  }
+
+  public Slice serializeMetaKey(K key, boolean write)
+  {
+    SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+    metaKeySerde.serialize(key, buffer);
+    return buffer.toSlice();
+  }
+
+  public Slice serializeDataKey(K key, boolean write)
+  {
+    SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+    dataKeySerde.serialize(key, buffer);
+    return buffer.toSlice();
+  }
+
+  public V deserializeValue(Input input)
+  {
+    V value = valueSerde.deserialize(input);
+    return value;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
new file mode 100644
index 0000000..7504633
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * AffixSerde provides serde for adding prefix or suffix
+ *
+ * @param <T>
+ */
+public class AffixSerde<T> implements Serde<T>
+{
+  private Serde<T> serde;
+  private byte[] prefix;
+  private byte[] suffix;
+
+  private AffixSerde()
+  {
+    //kyro
+  }
+
+  public AffixSerde(byte[] prefix, Serde<T> serde, byte[] suffix)
+  {
+    this.prefix = prefix;
+    this.suffix = suffix;
+    this.serde = serde;
+  }
+
+  @Override
+  public void serialize(T object, Output output)
+  {
+    if (prefix != null && prefix.length > 0) {
+      output.write(prefix);
+    }
+    serde.serialize(object, output);
+    if (suffix != null && suffix.length > 0) {
+      output.write(suffix);
+    }
+  }
+
+  @Override
+  public T deserialize(Input input)
+  {
+    if (prefix != null && prefix.length > 0) {
+      input.skip(prefix.length);
+    }
+    return serde.deserialize(input);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
new file mode 100644
index 0000000..4b2a45b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.lang.reflect.Array;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+
+public class ArraySerde<T> implements Serde<T[]>
+{
+  private Serde<T> itemSerde;
+  private Class<T> itemType;
+
+  private ArraySerde()
+  {
+  }
+
+  /**
+   * Serializer and Deserializer need different constructor, so use static factory method to wrap.
+   * The ArraySerde returned by newSerializer can only used for serialization
+   */
+  public static <T> ArraySerde<T> newSerializer(Serde<T> itemSerde)
+  {
+    return new ArraySerde<T>(Preconditions.checkNotNull(itemSerde));
+  }
+
+  public static <T> ArraySerde<T> newSerde(Serde<T> itemSerde, Class<T> itemType)
+  {
+    return new ArraySerde<T>(Preconditions.checkNotNull(itemSerde), Preconditions.checkNotNull(itemType));
+  }
+
+  private ArraySerde(Serde<T> itemSerde)
+  {
+    this.itemSerde = itemSerde;
+  }
+
+  private ArraySerde(Serde<T> itemSerde, Class<T> itemType)
+  {
+    this.itemSerde = itemSerde;
+    this.itemType = itemType;
+  }
+
+  @Override
+  public void serialize(T[] objects, Output output)
+  {
+    if (objects.length == 0) {
+      return;
+    }
+    output.writeInt(objects.length, true);
+    Serde<T> serializer = getItemSerde();
+    for (T object : objects) {
+      serializer.serialize(object, output);
+    }
+  }
+
+  protected Serde<T> getItemSerde()
+  {
+    return itemSerde;
+  }
+
+  @Override
+  public T[] deserialize(Input input)
+  {
+    int numOfElements = input.readInt(true);
+
+    T[] array = createObjectArray(numOfElements);
+
+    for (int index = 0; index < numOfElements; ++index) {
+      array[index] = getItemSerde().deserialize(input);
+    }
+    return array;
+  }
+
+  @SuppressWarnings("unchecked")
+  protected T[] createObjectArray(int length)
+  {
+    return (T[])Array.newInstance(itemType, length);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
new file mode 100644
index 0000000..c140962
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ *
+ * keep the information of one block
+ *
+ */
+public class Block
+{
+  public static class OutOfBlockBufferMemoryException extends RuntimeException
+  {
+    private static final long serialVersionUID = 3813792889200989131L;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(Block.class);
+
+  public static final int DEFAULT_BLOCK_SIZE = 100000;
+
+  //the capacity of the block
+  private int capacity;
+
+  /*
+   * the size of the data.
+   */
+  private volatile int size;
+
+  private int objectBeginOffset = 0;
+  private byte[] buffer;
+
+  /**
+   * whether any slices have been exposed to the caller.
+   */
+  private boolean exposedSlices;
+
+  private Block()
+  {
+    this(DEFAULT_BLOCK_SIZE);
+  }
+
+  public Block(int capacity)
+  {
+    if (capacity <= 0) {
+      throw new IllegalArgumentException("Invalid capacity: " + capacity);
+    }
+    buffer = new byte[capacity];
+    this.capacity = capacity;
+  }
+
+  public void write(byte data)
+  {
+    checkOrReallocateBuffer(1);
+    buffer[size++] = data;
+  }
+
+  public void write(byte[] data)
+  {
+    write(data, 0, data.length);
+  }
+
+  public void write(byte[] data, final int offset, final int length)
+  {
+    checkOrReallocateBuffer(length);
+
+    System.arraycopy(data, offset, buffer, size, length);
+    size += length;
+  }
+
+
+
+  /**
+   * check the buffer size and reallocate if buffer is not enough
+   *
+   * @param length
+   */
+  private void checkOrReallocateBuffer(int length) throws OutOfBlockBufferMemoryException
+  {
+    if (size + length <= capacity) {
+      return;
+    }
+
+    if (exposedSlices) {
+      throw new OutOfBlockBufferMemoryException();
+    }
+
+    //calculate the new capacity
+    capacity = (size + length) * 2;
+
+    byte[] oldBuffer = buffer;
+    buffer = new byte[capacity];
+
+    /**
+     * no slices are exposed in this block yet (this is the first object in this block).
+     * so we can reallocate and move the memory
+     */
+    if (size > 0) {
+      System.arraycopy(oldBuffer, 0, buffer, 0, size);
+    }
+  }
+
+  /**
+   * Similar to toSlice, this method is used to get the information of the
+   * object regards the data already write to buffer. But unlike toSlice() which
+   * indicates all the writes of this object are already done, this method can be called at
+   * any time
+   */
+  public Slice getLastObjectSlice()
+  {
+    return new Slice(buffer, objectBeginOffset, size - objectBeginOffset);
+  }
+
+  public void discardLastObjectData()
+  {
+    if (objectBeginOffset == 0) {
+      return;
+    }
+    size = objectBeginOffset;
+  }
+
+  public void moveLastObjectDataTo(Block newBlock)
+  {
+    if (size > objectBeginOffset) {
+      newBlock.write(buffer, objectBeginOffset, size - objectBeginOffset);
+      discardLastObjectData();
+    }
+  }
+
+  /**
+   * This method returns the slice that represents the serialized form.
+   * The process of serializing an object should be one or multiple calls of write() followed by a toSlice() call.
+   * A call to toSlice indicates the writes are done for this object
+   *
+   * @return
+   */
+  public BufferSlice toSlice()
+  {
+    if (size == objectBeginOffset) {
+      throw new RuntimeException("data size is zero.");
+    }
+    BufferSlice slice = new BufferSlice(buffer, objectBeginOffset, size - objectBeginOffset);
+    //prepare for next object
+    objectBeginOffset = size;
+    exposedSlices = true;
+    return slice;
+  }
+
+  public void reset()
+  {
+    size = 0;
+    objectBeginOffset = 0;
+    exposedSlices = false;
+  }
+
+  /**
+   * check if the block has enough space for the length
+   *
+   * @param length
+   * @return
+   */
+  public boolean hasEnoughSpace(int length)
+  {
+    return size + length < capacity;
+  }
+
+  public long size()
+  {
+    return size;
+  }
+
+  public long capacity()
+  {
+    return capacity;
+  }
+
+  public boolean isFresh()
+  {
+    return (size == 0 && objectBeginOffset == 0 && exposedSlices == false);
+  }
+
+  /**
+   * Returns whether the block is clear. The block is clear when there has not been any write calls since the last toSlice() call.
+   *
+   * @return
+   */
+  public boolean isClear()
+  {
+    return objectBeginOffset == size;
+  }
+
+  public void release()
+  {
+    reset();
+    buffer = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
new file mode 100644
index 0000000..f8a097e
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+/**
+ * The process of interface would be:
+ * - Stream keep on reporting how many free blocks it has in certain frequent. usually at the end of each window
+ * - Stream check how many block should release. Stream usually release the blocks but Stream can make its own decision
+ * - Stream report how many blocks actually released
+ */
+public interface BlockReleaseStrategy
+{
+  /**
+   * The stream should call this method to report to the strategy how many blocks are free currently.
+   * @param freeBlockNum
+   */
+  void currentFreeBlocks(int freeBlockNum);
+
+  /**
+   * Get how many blocks can be released
+   * @return
+   */
+  int getNumBlocksToRelease();
+
+  /**
+   * The stream should call this method to report how many block are released.
+   * @param numReleasedBlocks
+   */
+  void releasedBlocks(int numReleasedBlocks);
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
new file mode 100644
index 0000000..ee50f7d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A stream is a collection of blocks
+ * BlockStream avoids copying the data that are already exposed to the caller
+ *
+ */
+public class BlockStream extends OutputStream
+{
+  private static final Logger logger = LoggerFactory.getLogger(BlockStream.class);
+
+  //the initial capacity of each block
+  protected final int blockCapacity;
+
+  protected Map<Integer, Block> blocks = Maps.newHashMap();
+  //the index of current block, valid block index should >= 0
+  protected int currentBlockIndex = 0;
+  protected long size = 0;
+
+  protected Block currentBlock;
+
+  public BlockStream()
+  {
+    this(Block.DEFAULT_BLOCK_SIZE);
+  }
+
+  public BlockStream(int blockCapacity)
+  {
+    this.blockCapacity = blockCapacity;
+  }
+
+  @Override
+  public void write(byte[] data)
+  {
+    write(data, 0, data.length);
+  }
+
+  @Override
+  public void write(int b)
+  {
+    currentBlock = getOrCreateCurrentBlock();
+    try {
+      currentBlock.write((byte)b);
+    } catch (Block.OutOfBlockBufferMemoryException e) {
+      reallocateBlock();
+      currentBlock.write((byte)b);
+    }
+    size++;
+  }
+
+  /**
+   * This write could be called multiple times for an object.
+   * The write method makes sure the same object only write to one block
+   *
+   * @param data
+   * @param offset
+   * @param length
+   */
+  @Override
+  public void write(byte[] data, final int offset, final int length)
+  {
+    //start with a block which at least can hold this data
+    currentBlock = getOrCreateCurrentBlock();
+    try {
+      currentBlock.write(data, offset, length);
+    } catch (Block.OutOfBlockBufferMemoryException e) {
+      reallocateBlock();
+      currentBlock.write(data, offset, length);
+    }
+    size += length;
+  }
+
+  private void reallocateBlock()
+  {
+    //use next block
+    Block previousBlock = moveToNextBlock();
+    if (!currentBlock.isFresh()) {
+      throw new RuntimeException("New block is not fresh.");
+    }
+    if (!previousBlock.isClear()) {
+      previousBlock.moveLastObjectDataTo(currentBlock);
+    }
+  }
+
+  /**
+   *
+   * @return The previous block
+   */
+  protected Block moveToNextBlock()
+  {
+    Block previousBlock = currentBlock;
+
+    ++currentBlockIndex;
+    currentBlock = getOrCreateCurrentBlock();
+    if (!currentBlock.isFresh()) {
+      throw new RuntimeException("Assigned non fresh block.");
+    }
+    return previousBlock;
+  }
+
+  protected Block getOrCreateCurrentBlock()
+  {
+    Block block = blocks.get(currentBlockIndex);
+    if (block == null) {
+      block = new Block(blockCapacity);
+      blocks.put(currentBlockIndex, block);
+    }
+    return block;
+  }
+
+  public long size()
+  {
+    return size;
+  }
+
+  public long capacity()
+  {
+    long capacity = 0;
+    for (Block block : blocks.values()) {
+      capacity += block.capacity();
+    }
+    return capacity;
+  }
+
+  /**
+   *
+   * this is the call that represents the end of an object
+   */
+  public Slice toSlice()
+  {
+    return blocks.get(currentBlockIndex).toSlice();
+  }
+
+  /**
+   * resets all blocks
+   */
+  public void reset()
+  {
+    currentBlockIndex = 0;
+    size = 0;
+    for (Block block : blocks.values()) {
+      block.reset();
+    }
+  }
+
+  public void release()
+  {
+    reset();
+    blocks.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
new file mode 100644
index 0000000..5d830fe
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.getopt.util.hash.MurmurHash;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * com.datatorrent.netlet.util.Slice has problem with the hashCode(), so
+ * override here
+ *
+ */
+public class BufferSlice extends Slice
+{
+  private static final long serialVersionUID = -471209532589983329L;
+  public static final BufferSlice EMPTY_SLICE = new BufferSlice(ArrayUtils.EMPTY_BYTE_ARRAY);
+
+  //for kyro
+  private BufferSlice()
+  {
+    //the super class's default constructor is private and can't called.
+    super(null, 0, 0);
+  }
+
+  public BufferSlice(byte[] array, int offset, int length)
+  {
+    super(array, offset, length);
+  }
+
+  public BufferSlice(byte[] array)
+  {
+    super(array);
+  }
+
+  public BufferSlice(Slice netletSlice)
+  {
+    this(netletSlice.buffer, netletSlice.offset, netletSlice.length);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    int hash = 5;
+    hash = 59 * hash + MurmurHash.hash(buffer, hash, offset, length);
+    hash = 59 * hash + this.length;
+    return hash;
+  }
+
+  /**
+   * let this class equals with com.datatorrent.netlet.util.Slice
+   */
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (obj == null) {
+      return false;
+    }
+    if (!Slice.class.isAssignableFrom(obj.getClass())) {
+      return false;
+    }
+    final Slice other = (Slice)obj;
+    if (this.length != other.length) {
+      return false;
+    }
+
+    final int offset1 = this.offset;
+    final byte[] buffer1 = this.buffer;
+    int i = offset1 + this.length;
+
+    final byte[] buffer2 = other.buffer;
+    int j = other.offset + other.length;
+
+    while (i-- > offset1) {
+      if (buffer1[i] != buffer2[--j]) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
new file mode 100644
index 0000000..bcd0b74
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes lists.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class CollectionSerde<T, CollectionT extends Collection<T>> implements Serde<CollectionT>
+{
+  @NotNull
+  private Serde<T> serde;
+
+  @NotNull
+  private Class<? extends CollectionT> collectionClass;
+
+  private CollectionSerde()
+  {
+    // for Kryo
+  }
+
+  /**
+   * Creates a {@link CollectionSerde}.
+   * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
+   */
+  public CollectionSerde(@NotNull Serde<T> serde, @NotNull Class<? extends CollectionT> collectionClass /*Class<? extends C1> collectionClass*/ )
+  {
+    this.serde = Preconditions.checkNotNull(serde);
+    this.collectionClass = Preconditions.checkNotNull(collectionClass);
+  }
+
+  @Override
+  public void serialize(CollectionT objects, Output output)
+  {
+    if (objects.size() == 0) {
+      return;
+    }
+    output.writeInt(objects.size(), true);
+    Serde<T> serializer = getItemSerde();
+    for (T object : objects) {
+      serializer.serialize(object, output);
+    }
+  }
+
+  @Override
+  public CollectionT deserialize(Input input)
+  {
+    int numElements = input.readInt(true);
+
+    try {
+      CollectionT collection = collectionClass.newInstance();
+
+      for (int index = 0; index < numElements; index++) {
+        T object = serde.deserialize(input);
+        collection.add(object);
+      }
+
+      return collection;
+    } catch (Exception ex) {
+      throw Throwables.propagate(ex);
+    }
+  }
+
+  protected Serde<T> getItemSerde()
+  {
+    return serde;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
new file mode 100644
index 0000000..93929e4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Arrays;
+
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
+
+/**
+ * This implementation get the minimum number of free blocks in the period to release.
+ *
+ */
+public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
+{
+  public static final int DEFAULT_PERIOD = 60; // 60 reports
+  private CircularFifoBuffer freeBlockNumQueue;
+  private Integer[] tmpArray;
+
+  public DefaultBlockReleaseStrategy()
+  {
+    this(DEFAULT_PERIOD);
+  }
+
+  public DefaultBlockReleaseStrategy(int period)
+  {
+    freeBlockNumQueue = new CircularFifoBuffer(period);
+    tmpArray = new Integer[period];
+    Arrays.fill(tmpArray, 0);
+  }
+
+  /**
+   * The stream calls this to report to the strategy how many blocks are free currently.
+   * @param freeBlockNum
+   */
+  @Override
+  public void currentFreeBlocks(int freeBlockNum)
+  {
+    if (freeBlockNum < 0) {
+      throw new IllegalArgumentException("The number of free blocks could not less than zero.");
+    }
+    freeBlockNumQueue.add(freeBlockNum);
+  }
+
+  /**
+   * Get how many blocks that can be released
+   * @return
+   */
+  @Override
+  public int getNumBlocksToRelease()
+  {
+    int minNum = Integer.MAX_VALUE;
+    for (Object num : freeBlockNumQueue) {
+      minNum = Math.min((Integer)num, minNum);
+    }
+    return minNum;
+  }
+
+
+  /**
+   * report how many blocks that have been released.
+   * @param numReleasedBlocks
+   */
+  @Override
+  public void releasedBlocks(int numReleasedBlocks)
+  {
+    if (numReleasedBlocks == 0) {
+      return;
+    }
+    if (numReleasedBlocks < 0) {
+      throw new IllegalArgumentException("Num of released blocks should not be negative");
+    }
+    /**
+     * decrease by released blocks
+     */
+    for (Object num : freeBlockNumQueue) {
+      freeBlockNumQueue.add(Math.max((Integer)num - numReleasedBlocks, 0));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
new file mode 100644
index 0000000..0fbb2ab
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
+ * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or
+ * incompatible changes to the class being serialized.
+ *
+ * @param <T> The type being serialized
+ */
+@InterfaceStability.Evolving
+public class GenericSerde<T> implements Serde<T>
+{
+  private transient ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
+  {
+    @Override
+    public Kryo get()
+    {
+      return new Kryo();
+    }
+  };
+
+  private final Class<? extends T> clazz;
+
+  public GenericSerde()
+  {
+    this.clazz = null;
+  }
+
+  public GenericSerde(Class<? extends T> clazz)
+  {
+    this.clazz = clazz;
+  }
+
+  @Override
+  public void serialize(T object, Output output)
+  {
+    Kryo kryo = kryos.get();
+    if (clazz == null) {
+      kryo.writeClassAndObject(output, object);
+    } else {
+      kryo.writeObject(output, object);
+    }
+  }
+
+  @Override
+  public T deserialize(Input input)
+  {
+    T object;
+    Kryo kryo = kryos.get();
+    if (clazz == null) {
+      object = (T)kryo.readClassAndObject(input);
+    } else {
+      object = kryo.readObject(input, clazz);
+    }
+    return object;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
new file mode 100644
index 0000000..032b5e0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class IntSerde implements Serde<Integer>
+{
+  @Override
+  public void serialize(Integer value, Output output)
+  {
+    output.writeInt(value);
+  }
+
+  @Override
+  public Integer deserialize(Input input)
+  {
+    return input.readInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
new file mode 100644
index 0000000..a7dfa7f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+/**
+ * This interface provides methods for stream for key/value.
+ * The implementation can separate the stream for key and value or share the same one.
+ *
+ */
+public interface KeyValueByteStreamProvider
+{
+  /**
+   * @return The stream for keeping key
+   */
+  WindowedBlockStream getKeyStream();
+
+  /**
+   * @return The stream for keeping value
+   */
+  WindowedBlockStream getValueStream();
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
new file mode 100644
index 0000000..6fbe9fe
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.managed.Bucket;
+import org.apache.apex.malhar.lib.state.managed.BucketProvider;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class KeyValueSerdeManager<K, V>
+{
+  protected Serde<K> keySerde;
+  protected Serde<V> valueSerde;
+
+  protected SerializationBuffer keyBufferForWrite;
+  protected transient SerializationBuffer keyBufferForRead = SerializationBuffer.READ_BUFFER;
+
+  protected SerializationBuffer valueBuffer;
+
+
+  protected KeyValueSerdeManager()
+  {
+    //for kyro
+  }
+
+  public KeyValueSerdeManager(Serde<K> keySerde, Serde<V> valueSerde)
+  {
+    this.keySerde = keySerde;
+    this.valueSerde = valueSerde;
+  }
+
+  public void setup(BucketProvider bp, long bucketId)
+  {
+    //the bucket will not change for this class. so get streams from setup, else, need to set stream before serialize
+    Bucket bucketInst = bp.ensureBucket(bucketId);
+    this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream());
+
+    keyBufferForWrite = new SerializationBuffer(bucketInst.getKeyStream());
+  }
+
+  public Slice serializeKey(K key, boolean write)
+  {
+    SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+    keySerde.serialize(key, buffer);
+    return buffer.toSlice();
+  }
+
+
+  /**
+   * Value only serialize for writing
+   * @param value
+   * @return
+   */
+  public Slice serializeValue(V value)
+  {
+    valueSerde.serialize(value, valueBuffer);
+    return valueBuffer.toSlice();
+  }
+
+  public void beginWindow(long windowId)
+  {
+    keyBufferForWrite.beginWindow(windowId);
+    valueBuffer.beginWindow(windowId);
+  }
+
+  public void resetReadBuffer()
+  {
+    keyBufferForRead.release();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
new file mode 100644
index 0000000..0b63737
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class LongSerde implements Serde<Long>
+{
+  @Override
+  public void serialize(Long value, Output output)
+  {
+    output.writeLong(value);
+  }
+
+  @Override
+  public Long deserialize(Input input)
+  {
+    return input.readLong();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
new file mode 100644
index 0000000..3190880
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes pairs.
+ */
+@InterfaceStability.Evolving
+public class PairSerde<T1, T2> implements Serde<Pair<T1, T2>>
+{
+  @NotNull
+  private Serde<T1> serde1;
+  @NotNull
+  private Serde<T2> serde2;
+
+  private PairSerde()
+  {
+    // for Kryo
+  }
+
+  /**
+   * Creates a {@link PairSerde}.
+   * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair
+   * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair
+   */
+  public PairSerde(@NotNull Serde<T1> serde1, @NotNull Serde<T2> serde2)
+  {
+    this.serde1 = Preconditions.checkNotNull(serde1);
+    this.serde2 = Preconditions.checkNotNull(serde2);
+  }
+
+  @Override
+  public void serialize(Pair<T1, T2> pair, Output output)
+  {
+    serde1.serialize(pair.getLeft(), output);
+    serde2.serialize(pair.getRight(), output);
+  }
+
+  @Override
+  public Pair<T1, T2> deserialize(Input input)
+  {
+    T1 first = serde1.deserialize(input);
+    T2 second = serde2.deserialize(input);
+    return new ImmutablePair<>(first, second);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
deleted file mode 100644
index 9669981..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * This is a simple pass through {@link Serde}. When serialization is performed the input byte array is returned.
- * Similarly when deserialization is performed the input byte array is returned.
- *
- * @since 3.4.0
- */
-@InterfaceStability.Evolving
-public class PassThruByteArraySerde implements Serde<byte[], byte[]>
-{
-  @Override
-  public byte[] serialize(byte[] object)
-  {
-    return object;
-  }
-
-  @Override
-  public byte[] deserialize(byte[] object, MutableInt offset)
-  {
-    offset.add(object.length);
-    return object;
-  }
-
-  @Override
-  public byte[] deserialize(byte[] object)
-  {
-    return object;
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
deleted file mode 100644
index b22bf6f..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is a simple {@link Serde} which serializes and deserializes byte arrays to {@link Slice}s. A byte array is
- * serialized by simply wrapping it in a {@link Slice} object and deserialized by simply reading the byte array
- * out of the {@link Slice} object.
- *
- * <b>Note:</b> The deserialized method doesn't use the offset argument in this implementation.
- *
- * @since 3.5.0
- */
-public class PassThruByteArraySliceSerde implements Serde<byte[], Slice>
-{
-  @Override
-  public Slice serialize(byte[] object)
-  {
-    return new Slice(object);
-  }
-
-  @Override
-  public byte[] deserialize(Slice object, MutableInt offset)
-  {
-    offset.add(object.length);
-
-    if (object.offset == 0) {
-      return object.buffer;
-    }
-
-    byte[] bytes = new byte[object.length];
-    System.arraycopy(object.buffer, object.offset, bytes, 0, object.length);
-    return bytes;
-  }
-
-  @Override
-  public byte[] deserialize(Slice object)
-  {
-    return deserialize(object, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
index 2646c0e..679e116 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
@@ -18,9 +18,14 @@
  */
 package org.apache.apex.malhar.lib.utils.serde;
 
-import org.apache.commons.lang3.mutable.MutableInt;
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceStability;
 
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Throwables;
+
 import com.datatorrent.netlet.util.Slice;
 
 /**
@@ -30,23 +35,26 @@ import com.datatorrent.netlet.util.Slice;
  * @since 3.5.0
  */
 @InterfaceStability.Evolving
-public class PassThruSliceSerde implements Serde<Slice, Slice>
+public class PassThruSliceSerde implements Serde<Slice>
 {
   @Override
-  public Slice serialize(Slice object)
-  {
-    return object;
-  }
-
-  @Override
-  public Slice deserialize(Slice object, MutableInt offset)
+  public void serialize(Slice slice, Output output)
   {
-    return object;
+    output.write(slice.buffer, slice.offset, slice.length);
   }
 
   @Override
-  public Slice deserialize(Slice object)
+  public Slice deserialize(Input input)
   {
-    return object;
+    if (input.getInputStream() != null) {
+      // The input is backed by a stream, cannot directly use its internal buffer
+      try {
+        return new Slice(input.readBytes(input.available()));
+      } catch (IOException ex) {
+        throw Throwables.propagate(ex);
+      }
+    } else {
+      return new Slice(input.getBuffer(), input.position(), input.limit() - input.position());
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
index 6e02aee..d09612d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
@@ -18,46 +18,29 @@
  */
 package org.apache.apex.malhar.lib.utils.serde;
 
-import org.apache.commons.lang3.mutable.MutableInt;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
 
 /**
  * This is an interface for a Serializer/Deserializer class.
- * @param <OBJ> The type of the object to Serialize and Deserialize.
- * @param <SER> The type to Serialize an Object to.
+ * @param <T> The type of the object to Serialize and Deserialize.
  *
  * @since 3.4.0
  */
-public interface Serde<OBJ, SER>
+public interface Serde<T>
 {
   /**
-   * Serialized the given object.
-   * @param object The object to serialize.
-   * @return The serialized representation of the object.
+   * Serialize the object to the given output.
+   * @param object
+   * @param output
    */
-  SER serialize(OBJ object);
+  void serialize(T object, Output output);
 
   /**
-   * Deserializes the given serialized representation of an object.
-   * @param object The serialized representation of an object.
-   * @param offset An offset in the serialized representation of the object. After the
-   * deserialize method completes the offset is updated, so that the offset points to
-   * the remaining unprocessed portion of the serialized object. For example:<br/>
-   * {@code
-   * Object obj;
-   * MutableInt mi;
-   * someObj1 = deserialize(obj, mi);
-   * someObj2 = deserialize(obj, mi);
-   * }
+   * Deserialize from the input and return a new object.
    *
-   * @return The deserialized object.
+   * @param input
+   * @return
    */
-  OBJ deserialize(SER object, MutableInt offset);
-
-  /**
-   * Deserializes the given serialized representation of an object.
-   * @param object The serialized representation of an object.
-   *
-   * @return The deserialized object.
-   */
-  OBJ deserialize(SER object);
+  T deserialize(Input input);
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
deleted file mode 100644
index eca1d5f..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.Collection;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which serializes and deserializes lists.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeCollectionSlice<T, CollectionT extends Collection<T>> implements Serde<CollectionT, Slice>
-{
-  @NotNull
-  private Serde<T, Slice> serde;
-
-  @NotNull
-  private Class<? extends CollectionT> collectionClass;
-
-  private SerdeCollectionSlice()
-  {
-    // for Kryo
-  }
-
-  /**
-   * Creates a {@link SerdeCollectionSlice}.
-   * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
-   */
-  public SerdeCollectionSlice(@NotNull Serde<T, Slice> serde, @NotNull Class<? extends CollectionT> collectionClass)
-  {
-    this.serde = Preconditions.checkNotNull(serde);
-    this.collectionClass = Preconditions.checkNotNull(collectionClass);
-  }
-
-  @Override
-  public Slice serialize(CollectionT objects)
-  {
-    Slice[] slices = new Slice[objects.size()];
-
-    int size = 4;
-
-    int index = 0;
-    for (T object : objects) {
-      Slice slice = serde.serialize(object);
-      slices[index++] = slice;
-      size += slice.length;
-    }
-
-    byte[] bytes = new byte[size];
-    int offset = 0;
-
-    byte[] sizeBytes = GPOUtils.serializeInt(objects.size());
-    System.arraycopy(sizeBytes, 0, bytes, offset, 4);
-    offset += 4;
-
-    for (index = 0; index < slices.length; index++) {
-      Slice slice = slices[index];
-      System.arraycopy(slice.buffer, slice.offset, bytes, offset, slice.length);
-      offset += slice.length;
-    }
-
-    return new Slice(bytes);
-  }
-
-  @Override
-  public CollectionT deserialize(Slice slice, MutableInt offset)
-  {
-    MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue());
-
-    int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset);
-    sliceOffset.subtract(slice.offset);
-    try {
-      CollectionT collection = collectionClass.newInstance();
-
-      for (int index = 0; index < numElements; index++) {
-        T object = serde.deserialize(slice, sliceOffset);
-        collection.add(object);
-      }
-
-      offset.setValue(sliceOffset.intValue());
-      return collection;
-    } catch (Exception ex) {
-      throw Throwables.propagate(ex);
-    }
-  }
-
-  @Override
-  public CollectionT deserialize(Slice slice)
-  {
-    return deserialize(slice, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
deleted file mode 100644
index 3275a93..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which deserializes and serializes integers.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeIntSlice implements Serde<Integer, Slice>
-{
-  @Override
-  public Slice serialize(Integer object)
-  {
-    return new Slice(GPOUtils.serializeInt(object));
-  }
-
-  @Override
-  public Integer deserialize(Slice slice, MutableInt offset)
-  {
-    int val = GPOUtils.deserializeInt(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
-    offset.add(4);
-    return val;
-  }
-
-  @Override
-  public Integer deserialize(Slice object)
-  {
-    return deserialize(object, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
deleted file mode 100644
index d4b9488..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.io.ByteArrayOutputStream;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
- * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or
- * incompatible changes to the class being serialized.
- *
- * @param <T> The type being serialized
- */
-@InterfaceStability.Evolving
-public class SerdeKryoSlice<T> implements Serde<T, Slice>
-{
-  // Setup ThreadLocal of Kryo instances
-  private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
-  {
-    protected Kryo initialValue()
-    {
-      Kryo kryo = new Kryo();
-      // configure kryo instance, customize settings
-      return kryo;
-    }
-  };
-
-  private final Class<? extends T> clazz;
-
-  public SerdeKryoSlice()
-  {
-    this.clazz = null;
-  }
-
-  public SerdeKryoSlice(Class<? extends T> clazz)
-  {
-    this.clazz = clazz;
-  }
-
-  @Override
-  public Slice serialize(T object)
-  {
-    Kryo kryo = kryos.get();
-    ByteArrayOutputStream stream = new ByteArrayOutputStream();
-    Output output = new Output(stream);
-    if (clazz == null) {
-      kryo.writeClassAndObject(output, object);
-    } else {
-      kryo.writeObject(output, object);
-    }
-    return new Slice(output.toBytes());
-  }
-
-  @Override
-  public T deserialize(Slice slice, MutableInt offset)
-  {
-    byte[] bytes = slice.toByteArray();
-    Kryo kryo = kryos.get();
-    Input input = new Input(bytes, offset.intValue(), bytes.length - offset.intValue());
-    T object;
-    if (clazz == null) {
-      object = (T)kryo.readClassAndObject(input);
-    } else {
-      object = kryo.readObject(input, clazz);
-    }
-    offset.setValue(bytes.length - input.position());
-    return object;
-  }
-
-  @Override
-  public T deserialize(Slice slice)
-  {
-    return deserialize(slice, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
deleted file mode 100644
index 6fe07d9..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which deserializes and serializes integers.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeLongSlice implements Serde<Long, Slice>
-{
-  @Override
-  public Slice serialize(Long object)
-  {
-    return new Slice(GPOUtils.serializeLong(object));
-  }
-
-  @Override
-  public Long deserialize(Slice slice, MutableInt offset)
-  {
-    long val = GPOUtils.deserializeLong(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
-    offset.add(8);
-    return val;
-  }
-
-  @Override
-  public Long deserialize(Slice object)
-  {
-    return deserialize(object, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
deleted file mode 100644
index 59cf282..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.google.common.base.Preconditions;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which serializes and deserializes pairs.
- */
-@InterfaceStability.Evolving
-public class SerdePairSlice<T1, T2> implements Serde<Pair<T1, T2>, Slice>
-{
-  @NotNull
-  private Serde<T1, Slice> serde1;
-  @NotNull
-  private Serde<T2, Slice> serde2;
-
-  private SerdePairSlice()
-  {
-    // for Kryo
-  }
-
-  /**
-   * Creates a {@link SerdePairSlice}.
-   * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair
-   * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair
-   */
-  public SerdePairSlice(@NotNull Serde<T1, Slice> serde1, @NotNull Serde<T2, Slice> serde2)
-  {
-    this.serde1 = Preconditions.checkNotNull(serde1);
-    this.serde2 = Preconditions.checkNotNull(serde2);
-  }
-
-  @Override
-  public Slice serialize(Pair<T1, T2> pair)
-  {
-    int size = 0;
-
-    Slice slice1 = serde1.serialize(pair.getLeft());
-    size += slice1.length;
-    Slice slice2 = serde2.serialize(pair.getRight());
-    size += slice2.length;
-
-    byte[] bytes = new byte[size];
-    System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
-    System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
-
-    return new Slice(bytes);
-  }
-
-  @Override
-  public Pair<T1, T2> deserialize(Slice slice, MutableInt offset)
-  {
-    T1 first = serde1.deserialize(slice, offset);
-    T2 second = serde2.deserialize(slice, offset);
-    return new ImmutablePair<>(first, second);
-  }
-
-  @Override
-  public Pair<T1, T2> deserialize(Slice slice)
-  {
-    return deserialize(slice, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
deleted file mode 100644
index aaf0d61..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * An implementation of {@link Serde} which serializes and deserializes {@link String}s.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeStringSlice implements Serde<String, Slice>
-{
-  @Override
-  public Slice serialize(String object)
-  {
-    return new Slice(GPOUtils.serializeString(object));
-  }
-
-  @Override
-  public String deserialize(Slice object, MutableInt offset)
-  {
-    offset.add(object.offset);
-    String string = GPOUtils.deserializeString(object.buffer, offset);
-    offset.subtract(object.offset);
-    return string;
-  }
-
-  @Override
-  public String deserialize(Slice object)
-  {
-    return deserialize(object, new MutableInt(0));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
new file mode 100644
index 0000000..f33f1e0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.spillable.WindowListener;
+
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SerializationBuffer extends Output implements WindowCompleteListener, WindowListener
+{
+  /*
+   * Singleton read buffer for serialization
+   */
+  public static final SerializationBuffer READ_BUFFER = new SerializationBuffer(new WindowedBlockStream());
+
+  private WindowedBlockStream windowedBlockStream;
+
+  @SuppressWarnings("unused")
+  private SerializationBuffer()
+  {
+    this(new WindowedBlockStream());
+  }
+
+  public SerializationBuffer(WindowedBlockStream windowedBlockStream)
+  {
+    super(windowedBlockStream);
+    this.windowedBlockStream = windowedBlockStream;
+  }
+
+  public long size()
+  {
+    return windowedBlockStream.size();
+  }
+
+  public long capacity()
+  {
+    return windowedBlockStream.capacity();
+  }
+
+  /**
+   * This method should be called only after the whole object has been written
+   * @return The slice which represents the object
+   */
+  public Slice toSlice()
+  {
+    this.flush();
+    return windowedBlockStream.toSlice();
+  }
+
+  /**
+   * reset the environment to reuse the resource.
+   */
+  public void reset()
+  {
+    windowedBlockStream.reset();
+  }
+
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    windowedBlockStream.beginWindow(windowId);
+  }
+
+  @Override
+  public void endWindow()
+  {
+    windowedBlockStream.endWindow();
+  }
+
+  public void release()
+  {
+    reset();
+    windowedBlockStream.reset();
+  }
+
+  public WindowedBlockStream createWindowedBlockStream()
+  {
+    return new WindowedBlockStream();
+  }
+
+  public WindowedBlockStream createWindowedBlockStream(int capacity)
+  {
+    return new WindowedBlockStream(capacity);
+  }
+
+  public WindowedBlockStream getWindowedBlockStream()
+  {
+    return windowedBlockStream;
+  }
+
+  public void setWindowableByteStream(WindowedBlockStream windowableByteStream)
+  {
+    this.windowedBlockStream = windowableByteStream;
+  }
+
+  /**
+   * reset for all windows with window id less than or equal to the input windowId
+   * this interface doesn't call reset window for each windows. Several windows can be reset at the same time.
+   * @param windowId
+   */
+  @Override
+  public void completeWindow(long windowId)
+  {
+    windowedBlockStream.completeWindow(windowId);
+  }
+
+  public byte[] toByteArray()
+  {
+    return toSlice().toByteArray();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
index 2671d5e..b504581 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
@@ -100,4 +100,14 @@ public class SliceUtils
 
     return new Slice(bytes);
   }
+
+  public static BufferSlice toBufferSlice(Slice slice)
+  {
+    if (slice instanceof BufferSlice) {
+      return (BufferSlice)slice;
+    }
+
+    //The hashCode of Slice was not correct, so correct it
+    return new BufferSlice(slice);
+  }
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
new file mode 100644
index 0000000..cb45e2a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * An implementation of {@link Serde} which serializes and deserializes {@link String}s.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class StringSerde implements Serde<String>
+{
+  @Override
+  public void serialize(String string, Output output)
+  {
+    output.writeString(string);
+  }
+
+  @Override
+  public String deserialize(Input input)
+  {
+    return input.readString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
new file mode 100644
index 0000000..d2d38a7
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+public interface WindowCompleteListener
+{
+  /**
+   * Notification that all windows which window id less or equal input windowId are complete
+   *
+   * @param windowId
+   */
+  void completeWindow(long windowId);
+}