You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:13:06 UTC

[49/50] [abbrv] incubator-apex-malhar git commit: APEXMALHAR-1984 #resolve #comment Create util function to clone operator object by kryo

APEXMALHAR-1984 #resolve #comment Create util function to clone operator object by kryo


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/48418737
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/48418737
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/48418737

Branch: refs/heads/master
Commit: 4841873715a9d84f16ee3165b42dd1f43aab61dc
Parents: 9e77ef7
Author: Siyuan Hua <hs...@apache.org>
Authored: Thu Jan 28 15:30:04 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu Jan 28 15:30:04 2016 -0800

----------------------------------------------------------------------
 .../AbstractCouchBaseInputOperator.java         |  18 +-
 .../kafka/AbstractKafkaInputOperator.java       |  16 +-
 .../kinesis/AbstractKinesisInputOperator.java   |  16 +-
 .../contrib/kafka/SimpleKakfaConsumerTest.java  |   5 +-
 .../util/FieldValueSerializableGenerator.java   |   1 +
 .../malhar/kafka/AbstractKafkaPartitioner.java  |  14 +-
 .../lib/io/block/AbstractBlockReader.java       |  20 +--
 .../lib/io/fs/AbstractFileInputOperator.java    |  15 +-
 .../datatorrent/lib/util/KryoCloneUtils.java    | 171 +++++++++++++++++++
 .../datastructs/DimensionalTableTest.java       |   7 +-
 .../appdata/schemas/ResultFormatterTest.java    |   6 +-
 .../schemas/SchemaRegistryMultipleTest.java     |   5 +-
 .../snapshot/AppDataSnapshotServerMapTest.java  |   5 +-
 .../lib/util/KryoCloneUtilsTest.java            | 131 ++++++++++++++
 .../com/datatorrent/lib/util/PojoUtilsTest.java |   3 +-
 .../com/datatorrent/lib/util/TestUtils.java     |  20 ---
 16 files changed, 335 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java
index 60938ab..1cd4eb5 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java
@@ -26,23 +26,19 @@ import java.util.concurrent.TimeUnit;
 
 import com.couchbase.client.CouchbaseClient;
 import com.couchbase.client.vbucket.config.Config;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.commons.io.output.ByteArrayOutputStream;
-
 import com.datatorrent.lib.db.AbstractStoreInputOperator;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.Partitioner;
 
+import com.datatorrent.lib.util.KryoCloneUtils;
 import com.datatorrent.netlet.util.DTThrowable;
 
 /**
@@ -149,19 +145,13 @@ public abstract class AbstractCouchBaseInputOperator<T> extends AbstractStoreInp
     int numPartitions = conf.getServers().size();
     List<String> list = conf.getServers();
     Collection<Partition<AbstractCouchBaseInputOperator<T>>> newPartitions = Lists.newArrayListWithExpectedSize(numPartitions);
-    Kryo kryo = new Kryo();
+    KryoCloneUtils<AbstractCouchBaseInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
     for (int i = 0; i < numPartitions; i++) {
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      Output output = new Output(bos);
-      kryo.writeObject(output, this);
-      output.close();
-      Input lInput = new Input(bos.toByteArray());
-      @SuppressWarnings("unchecked")
-      AbstractCouchBaseInputOperator<T> oper = kryo.readObject(lInput, this.getClass());
+      AbstractCouchBaseInputOperator<T> oper = cloneUtils.getClone();
       oper.setServerIndex(i);
       oper.setServerURIString(list.get(i));
       logger.debug("oper {} urlstring is {}", i, oper.getServerURIString());
-      newPartitions.add(new DefaultPartition<AbstractCouchBaseInputOperator<T>>(oper));
+      newPartitions.add(new DefaultPartition<>(oper));
     }
 
     return newPartitions;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index c7bac18..4b22e5e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -29,9 +29,8 @@ import com.datatorrent.api.StatsListener;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.io.IdempotentStorageManager;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
 import com.google.common.base.Joiner;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
@@ -54,7 +53,6 @@ import org.slf4j.LoggerFactory;
 import javax.validation.Valid;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Array;
 import java.util.ArrayList;
@@ -640,14 +638,8 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
   // Create a new partition with the partition Ids and initial offset positions
   protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> initOffsets, Collection<IdempotentStorageManager> newManagers)
   {
-    Kryo kryo = new Kryo();
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    Output output = new Output(bos);
-    kryo.writeObject(output, this);
-    output.close();
-    Input lInput = new Input(bos.toByteArray());
-    @SuppressWarnings("unchecked")
-    Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<AbstractKafkaInputOperator<K>>(kryo.readObject(lInput, this.getClass()));
+
+    Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<>(KryoCloneUtils.cloneObject(this));
     if (p.getPartitionedInstance().getConsumer() instanceof SimpleKafkaConsumer) {
       p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets);
       if (initOffsets != null) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index e7186b5..6306b04 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -26,11 +26,9 @@ import com.datatorrent.api.*;
 import com.datatorrent.api.Operator.ActivationListener;
 import com.datatorrent.common.util.Pair;
 import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
 import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.NotNull;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.collect.Sets;
 
@@ -40,8 +38,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.validation.Valid;
 import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.lang.reflect.Array;
 import java.util.*;
@@ -374,13 +372,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
   private
   Partition<AbstractKinesisInputOperator> createPartition(Set<String> shardIds, Map<String, String> initShardPos, Collection<IdempotentStorageManager> newManagers)
   {
-    Kryo kryo = new Kryo();
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    Output output = new Output(bos);
-    kryo.writeObject(output, this);
-    output.close();
-    Input lInput = new Input(bos.toByteArray());
-    Partition<AbstractKinesisInputOperator> p = new DefaultPartition<AbstractKinesisInputOperator>(kryo.readObject(lInput, this.getClass()));
+    Partition<AbstractKinesisInputOperator> p = new DefaultPartition<AbstractKinesisInputOperator>(KryoCloneUtils.cloneObject(this));
     newManagers.add(p.getPartitionedInstance().idempotentStorageManager);
     p.getPartitionedInstance().getConsumer().setShardIds(shardIds);
     p.getPartitionedInstance().getConsumer().resetShardPositions(initShardPos);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
index d2491e1..1368cc0 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
@@ -18,8 +18,7 @@
  */
 package com.datatorrent.contrib.kafka;
 
-import com.datatorrent.lib.util.TestUtils;
-import com.esotericsoftware.kryo.Kryo;
+import com.datatorrent.lib.util.KryoCloneUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -39,7 +38,7 @@ public class SimpleKakfaConsumerTest
     kc.setTopic("test_topic");
     kc.setClientId("test_clientid");
 
-    SimpleKafkaConsumer kcClone = TestUtils.clone(new Kryo(), kc);
+    SimpleKafkaConsumer kcClone = KryoCloneUtils.cloneObject(kc);
     Assert.assertEquals("Buffer size is " + bufferSize, bufferSize, kcClone.getBufferSize());
     Assert.assertEquals("Cache size is " + cacheSize, cacheSize, kcClone.getCacheSize());
     Assert.assertEquals("Clint id is same", kc.getClientId(), kcClone.getClientId());

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java b/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
index 5340b0e..2975c9c 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
@@ -123,6 +123,7 @@ public class FieldValueSerializableGenerator< T extends FieldInfo> extends Field
       {
         if( _kryo == null )
           _kryo = new Kryo();
+        _kryo.setClassLoader(clazz.getClassLoader());
       }
     }
     return _kryo;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 53bbd2a..bf36064 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -18,7 +18,6 @@
  */
 package org.apache.apex.malhar.kafka;
 
-import java.io.ByteArrayOutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -38,14 +37,12 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import com.google.common.base.Joiner;
 
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.StatsListener;
+import com.datatorrent.lib.util.KryoCloneUtils;
 
 /**
  * Abstract partitioner used to manage the partitions of kafka input operator.
@@ -161,14 +158,7 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
 
   protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition(Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment)
   {
-    Kryo kryo = new Kryo();
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    Output output = new Output(bos);
-    kryo.writeObject(output, prototypeOperator);
-    output.close();
-    Input lInput = new Input(bos.toByteArray());
-    Partitioner.Partition<AbstractKafkaInputOperator> p = (Partitioner.Partition<AbstractKafkaInputOperator>)
-        new DefaultPartition<>(kryo.readObject(lInput, prototypeOperator.getClass()));
+    Partitioner.Partition<AbstractKafkaInputOperator> p = new DefaultPartition<AbstractKafkaInputOperator>(KryoCloneUtils.cloneObject(prototypeOperator));
     p.getPartitionedInstance().assign(partitionAssignment);
     return p;
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
index afde623..a6d1bd9 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
@@ -18,7 +18,6 @@
  */
 package com.datatorrent.lib.io.block;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collection;
@@ -36,9 +35,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.commons.lang.mutable.MutableLong;
 import org.apache.hadoop.fs.PositionedReadable;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -52,6 +48,7 @@ import com.datatorrent.api.Stats;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.lib.counters.BasicCounters;
+import com.datatorrent.lib.util.KryoCloneUtils;
 
 /**
  * AbstractBlockReader processes a block of data from a stream.<br/>
@@ -295,20 +292,9 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
         partitionIterator.remove();
       }
     } else {
-      //Add more partitions
-      Kryo kryo = new Kryo();
+      KryoCloneUtils<AbstractBlockReader<R, B, STREAM>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
       while (morePartitionsToCreate-- > 0) {
-        ByteArrayOutputStream bos = new ByteArrayOutputStream();
-        Output loutput = new Output(bos);
-        kryo.writeObject(loutput, this);
-        loutput.close();
-        Input lInput = new Input(bos.toByteArray());
-
-        @SuppressWarnings("unchecked")
-        AbstractBlockReader<R, B, STREAM> blockReader = kryo.readObject(lInput, this.getClass());
-
-        DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<>(
-            blockReader);
+        DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<>(cloneUtils.getClone());
         newPartitions.add(partition);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 5067b07..0bcf956 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -25,9 +25,6 @@ import java.util.regex.Pattern;
 
 import javax.validation.constraints.NotNull;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -45,6 +42,7 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.api.*;
 import com.datatorrent.api.Context.CountersAggregator;
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.util.KryoCloneUtils;
 
 /**
  * This is the base implementation of a directory input operator, which scans a directory for files.&nbsp;
@@ -829,21 +827,14 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
      */
     List<DirectoryScanner> scanners = scanner.partition(totalCount, oldscanners);
 
-    Kryo kryo = new Kryo();
     Collection<Partition<AbstractFileInputOperator<T>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount);
     Collection<IdempotentStorageManager> newManagers = Lists.newArrayListWithExpectedSize(totalCount);
 
+    KryoCloneUtils<AbstractFileInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
     for (int i=0; i<scanners.size(); i++) {
 
-      // Kryo.copy fails as it attempts to clone transient fields
-      ByteArrayOutputStream bos = new ByteArrayOutputStream();
-      Output loutput = new Output(bos);
-      kryo.writeObject(loutput, this);
-      loutput.close();
-      Input lInput = new Input(bos.toByteArray());
       @SuppressWarnings("unchecked")
-      AbstractFileInputOperator<T> oper = kryo.readObject(lInput, this.getClass());
-      lInput.close();
+      AbstractFileInputOperator<T> oper = cloneUtils.getClone();
 
       DirectoryScanner scn = scanners.get(i);
       oper.setScanner(scn);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java b/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java
new file mode 100644
index 0000000..fbe6ce4
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Array;
+
+import org.apache.commons.io.IOUtils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ *
+ * A Kryo Clone Util class that clone object by using Kryo serializer and deserializer
+ * The class has static method that are can be used directly to clone one object
+ * Or it can be used as util instance to clone as many objects as you need from the one source object
+ *
+ * @since 3.4.0
+ */
+public class KryoCloneUtils<T>
+{
+
+  /**
+   * Reusable Kryo object as deserializer
+   */
+  private final Kryo kryo;
+
+  /**
+   * Reusable binary data for object that would be deserialized from
+   */
+  private final byte[] bin;
+
+  /**
+   * The class of the object
+   */
+  private final Class<T> clazz;
+
+  @SuppressWarnings("unchecked")
+  private KryoCloneUtils(Kryo kryo, T t)
+  {
+    this.kryo = kryo;
+    ByteArrayOutputStream bos = null;
+    Output output = null;
+    try {
+      bos = new ByteArrayOutputStream();
+      output = new Output(bos);
+      kryo.writeObject(output, t);
+      output.close();
+      bin = bos.toByteArray();
+    } finally {
+      IOUtils.closeQuietly(output);
+      IOUtils.closeQuietly(bos);
+    }
+    clazz = (Class<T>)t.getClass();
+    kryo.setClassLoader(clazz.getClassLoader());
+  }
+
+  /**
+   * Clone from the binary data of the source object
+   * @return T
+   */
+  public T getClone()
+  {
+    try (Input input = new Input(bin)) {
+      return kryo.readObject(input, clazz);
+    }
+  }
+
+  /**
+   * Clone array of objects from source object
+   * @param num size of the return array
+   * @return array of T
+   */
+  @SuppressWarnings("unchecked")
+  public T[] getClones(int num)
+  {
+    T[] ts = (T[])Array.newInstance(clazz, num);
+    try (Input input = new Input(bin)) {
+      for (int i = 0; i < ts.length; i++) {
+        input.rewind();
+        ts[i] = kryo.readObject(input, clazz);
+      }
+    }
+    return ts;
+  }
+
+
+
+  /**
+   * Clone object by serializing and deserializing using Kryo.
+   * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields.
+   *
+   * @param kryo kryo object used to clone objects
+   * @param src src object that copy from
+   * @return
+   */
+  @SuppressWarnings("unchecked")
+  public static <SRC> SRC cloneObject(Kryo kryo, SRC src)
+  {
+    kryo.setClassLoader(src.getClass().getClassLoader());
+    ByteArrayOutputStream bos = null;
+    Output output;
+    Input input = null;
+    try {
+      bos = new ByteArrayOutputStream();
+      output = new Output(bos);
+      kryo.writeObject(output, src);
+      output.close();
+      input = new Input(bos.toByteArray());
+      return (SRC)kryo.readObject(input, src.getClass());
+    } finally {
+      IOUtils.closeQuietly(input);
+      IOUtils.closeQuietly(bos);
+    }
+  }
+
+
+  /**
+   * Clone object by serializing and deserializing using default Kryo.
+   * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields.
+   *
+   * @param src src object that copy from
+   * @return
+   */
+  public static <SRC> SRC cloneObject(SRC src)
+  {
+    return cloneObject(new Kryo(), src);
+  }
+
+  /**
+   * Factory function to return CloneUtils object
+   * @param template
+   * @param <SRC>
+   * @return
+   */
+  public static <SRC> KryoCloneUtils<SRC> createCloneUtils(SRC template)
+  {
+    return createCloneUtils(new Kryo(), template);
+  }
+
+  /**
+   * Factory function to return CloneUtils object with customized Kryo
+   * @param kryo
+   * @param template
+   * @param <SRC>
+   * @return
+   */
+  public static <SRC> KryoCloneUtils<SRC> createCloneUtils(Kryo kryo, SRC template)
+  {
+    return new KryoCloneUtils<>(kryo, template);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java b/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java
index bd2c660..db74007 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java
@@ -21,7 +21,6 @@ package com.datatorrent.lib.appdata.datastructs;
 import java.util.Map;
 import java.util.Set;
 
-import com.esotericsoftware.kryo.Kryo;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -29,7 +28,7 @@ import com.google.common.collect.Sets;
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.KryoCloneUtils;
 
 public class DimensionalTableTest
 {
@@ -38,7 +37,7 @@ public class DimensionalTableTest
   {
     DimensionalTable<Integer> table = createTestTable();
 
-    TestUtils.clone(new Kryo(), table);
+    KryoCloneUtils.cloneObject(table);
   }
 
   @Test
@@ -47,7 +46,7 @@ public class DimensionalTableTest
     DimensionalTable<Integer> table = createTestTable();
     int size = table.size();
 
-    table = TestUtils.clone(new Kryo(), table);
+    table = KryoCloneUtils.cloneObject(table);
     Assert.assertEquals(size, table.size());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java
index 1f1895b..5f2d924 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java
@@ -18,19 +18,17 @@
  */
 package com.datatorrent.lib.appdata.schemas;
 
-import com.esotericsoftware.kryo.Kryo;
-
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.KryoCloneUtils;
 
 public class ResultFormatterTest
 {
   @Test
   public void serializationTest() throws Exception
   {
-    TestUtils.clone(new Kryo(), new ResultFormatter());
+    KryoCloneUtils.cloneObject(new ResultFormatter());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java
index 8d997d8..0513079 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java
@@ -21,14 +21,13 @@ package com.datatorrent.lib.appdata.schemas;
 import java.util.Collections;
 import java.util.Map;
 
-import com.esotericsoftware.kryo.Kryo;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import org.junit.Assert;
 import org.junit.Test;
 
-import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.KryoCloneUtils;
 
 public class SchemaRegistryMultipleTest
 {
@@ -50,7 +49,7 @@ public class SchemaRegistryMultipleTest
   {
     SchemaRegistryMultiple schemaRegistryMultiple = createSchemaRegistry();
 
-    schemaRegistryMultiple = TestUtils.clone(new Kryo(), schemaRegistryMultiple);
+    schemaRegistryMultiple = KryoCloneUtils.cloneObject(schemaRegistryMultiple);
 
     Assert.assertEquals(2, schemaRegistryMultiple.size());
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java
index e02e2c0..96b348c 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java
@@ -21,7 +21,6 @@ package com.datatorrent.lib.appdata.snapshot;
 import java.util.List;
 import java.util.Map;
 
-import com.esotericsoftware.kryo.Kryo;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -32,7 +31,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.KryoCloneUtils;
 
 public class AppDataSnapshotServerMapTest
 {
@@ -103,7 +102,7 @@ public class AppDataSnapshotServerMapTest
     Assert.assertEquals("b", secondRow.getString("word"));
 
     //Test serialization
-    TestUtils.clone(new Kryo(), snapshotServer);
+    KryoCloneUtils.cloneObject(snapshotServer);
   }
 
   private static final Logger LOG = LoggerFactory.getLogger(AppDataSnapshotServerMapTest.class);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java b/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java
new file mode 100644
index 0000000..5eaea2e
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.util.Objects;
+
+import org.junit.Test;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Unit test for KryoCloneUtils
+ */
+public class KryoCloneUtilsTest
+{
+
+  @Test
+  public void testGetClone() throws Exception
+  {
+    TestEntity from = getTestEntity(5);
+    KryoCloneUtils<TestEntity> cloneUtils = KryoCloneUtils.createCloneUtils(from);
+    TestEntity to = cloneUtils.getClone();
+    assertFalse(from == to);
+    assertEquals(from, to);
+    assertFalse(from.transientProp.equals(to.transientProp));
+  }
+
+  @Test
+  public void testGetClones() throws Exception
+  {
+    TestEntity from = getTestEntity(5);
+    KryoCloneUtils<TestEntity> cloneUtils = KryoCloneUtils.createCloneUtils(from);
+    TestEntity[] to = cloneUtils.getClones(10);
+    for (TestEntity te : to) {
+      assertFalse(te == from);
+      assertEquals(from, te);
+      assertFalse(from.transientProp.equals(te.transientProp));
+    }
+  }
+
+  @Test
+  public void testCloneObject() throws Exception
+  {
+    TestEntity from = getTestEntity(5);
+    TestEntity to = KryoCloneUtils.cloneObject(from);
+    assertFalse(from == to);
+    assertEquals(from, to);
+    assertFalse(from.transientProp.equals(to.transientProp));
+  }
+
+  private TestEntity getTestEntity(int depth) {
+    TestEntity returnVal = null;
+    TestEntity curr = null;
+    while (depth-- > 0) {
+      if (curr == null) {
+        curr = returnVal = new TestEntity();
+      } else {
+        curr.nestedProp = new TestEntity();
+        curr = curr.nestedProp;
+      }
+    }
+    return returnVal;
+  }
+
+
+  static class TestEntity {
+
+    String strProp = RandomStringUtils.random(10);
+
+    int intProp = RandomUtils.nextInt(1000);
+
+    // transient should be skipped
+    transient String transientProp = RandomStringUtils.random(20);
+
+    // deep clone should be supported
+    TestEntity nestedProp = null;
+
+    @Override
+    public boolean equals(Object o)
+    {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TestEntity that = (TestEntity)o;
+      return Objects.equals(intProp, that.intProp) &&
+        Objects.equals(strProp, that.strProp) &&
+        Objects.equals(nestedProp, that.nestedProp);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return Objects.hash(strProp, intProp, nestedProp);
+    }
+
+    @Override
+    public String toString()
+    {
+      return "TestEntity{" +
+        "strProp='" + strProp + '\'' +
+        ", intProp=" + intProp +
+        ", transientProp='" + transientProp + '\'' +
+        ", nestedProp=" + nestedProp +
+        '}';
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java b/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
index 3114800..5b1b3f1 100644
--- a/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
+++ b/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
@@ -61,7 +61,6 @@ import com.datatorrent.lib.util.PojoUtils.SetterInt;
 import com.datatorrent.lib.util.PojoUtils.SetterLong;
 import com.datatorrent.lib.util.PojoUtils.SetterShort;
 
-import com.esotericsoftware.kryo.Kryo;
 
 
 public class PojoUtilsTest
@@ -113,7 +112,7 @@ public class PojoUtilsTest
   public void testSerialization() throws Exception
   {
     GetterBoolean<Object> getBoolean = createGetterBoolean(fqcn, "innerObj.boolVal");
-    TestUtils.clone(new Kryo(), getBoolean);
+    KryoCloneUtils.cloneObject(getBoolean);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
index 2932089..37d55e7 100644
--- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
+++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
@@ -51,26 +51,6 @@ public class TestUtils
     }
   }
 
-  /**
-   * Clone object by serializing and deserializing using Kryo.
-   * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields.
-   * @param kryo
-   * @param src
-   * @return
-   * @throws IOException
-   */
-  public static <T> T clone(Kryo kryo, T src) throws IOException
-  {
-    ByteArrayOutputStream bos = new ByteArrayOutputStream();
-    Output output = new Output(bos);
-    kryo.writeObject(output, src);
-    output.close();
-    Input input = new Input(bos.toByteArray());
-    @SuppressWarnings("unchecked")
-    Class<T> clazz = (Class<T>)src.getClass();
-    return kryo.readObject(input, clazz);
-  }
-
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public static <S extends Sink, T> S setSink(OutputPort<T> port, S sink)
   {