You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/01/29 20:13:06 UTC
[49/50] [abbrv] incubator-apex-malhar git commit: APEXMALHAR-1984
#resolve #comment Create util function to clone operator object by kryo
APEXMALHAR-1984 #resolve #comment Create util function to clone operator object by kryo
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/48418737
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/48418737
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/48418737
Branch: refs/heads/master
Commit: 4841873715a9d84f16ee3165b42dd1f43aab61dc
Parents: 9e77ef7
Author: Siyuan Hua <hs...@apache.org>
Authored: Thu Jan 28 15:30:04 2016 -0800
Committer: Siyuan Hua <hs...@apache.org>
Committed: Thu Jan 28 15:30:04 2016 -0800
----------------------------------------------------------------------
.../AbstractCouchBaseInputOperator.java | 18 +-
.../kafka/AbstractKafkaInputOperator.java | 16 +-
.../kinesis/AbstractKinesisInputOperator.java | 16 +-
.../contrib/kafka/SimpleKakfaConsumerTest.java | 5 +-
.../util/FieldValueSerializableGenerator.java | 1 +
.../malhar/kafka/AbstractKafkaPartitioner.java | 14 +-
.../lib/io/block/AbstractBlockReader.java | 20 +--
.../lib/io/fs/AbstractFileInputOperator.java | 15 +-
.../datatorrent/lib/util/KryoCloneUtils.java | 171 +++++++++++++++++++
.../datastructs/DimensionalTableTest.java | 7 +-
.../appdata/schemas/ResultFormatterTest.java | 6 +-
.../schemas/SchemaRegistryMultipleTest.java | 5 +-
.../snapshot/AppDataSnapshotServerMapTest.java | 5 +-
.../lib/util/KryoCloneUtilsTest.java | 131 ++++++++++++++
.../com/datatorrent/lib/util/PojoUtilsTest.java | 3 +-
.../com/datatorrent/lib/util/TestUtils.java | 20 ---
16 files changed, 335 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java
index 60938ab..1cd4eb5 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/couchbase/AbstractCouchBaseInputOperator.java
@@ -26,23 +26,19 @@ import java.util.concurrent.TimeUnit;
import com.couchbase.client.CouchbaseClient;
import com.couchbase.client.vbucket.config.Config;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.commons.io.output.ByteArrayOutputStream;
-
import com.datatorrent.lib.db.AbstractStoreInputOperator;
import com.datatorrent.api.Context;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
+import com.datatorrent.lib.util.KryoCloneUtils;
import com.datatorrent.netlet.util.DTThrowable;
/**
@@ -149,19 +145,13 @@ public abstract class AbstractCouchBaseInputOperator<T> extends AbstractStoreInp
int numPartitions = conf.getServers().size();
List<String> list = conf.getServers();
Collection<Partition<AbstractCouchBaseInputOperator<T>>> newPartitions = Lists.newArrayListWithExpectedSize(numPartitions);
- Kryo kryo = new Kryo();
+ KryoCloneUtils<AbstractCouchBaseInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
for (int i = 0; i < numPartitions; i++) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Output output = new Output(bos);
- kryo.writeObject(output, this);
- output.close();
- Input lInput = new Input(bos.toByteArray());
- @SuppressWarnings("unchecked")
- AbstractCouchBaseInputOperator<T> oper = kryo.readObject(lInput, this.getClass());
+ AbstractCouchBaseInputOperator<T> oper = cloneUtils.getClone();
oper.setServerIndex(i);
oper.setServerURIString(list.get(i));
logger.debug("oper {} urlstring is {}", i, oper.getServerURIString());
- newPartitions.add(new DefaultPartition<AbstractCouchBaseInputOperator<T>>(oper));
+ newPartitions.add(new DefaultPartition<>(oper));
}
return newPartitions;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index c7bac18..4b22e5e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -29,9 +29,8 @@ import com.datatorrent.api.StatsListener;
import com.datatorrent.api.annotation.OperatorAnnotation;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.lib.io.IdempotentStorageManager;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterators;
@@ -54,7 +53,6 @@ import org.slf4j.LoggerFactory;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
@@ -640,14 +638,8 @@ public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implem
// Create a new partition with the partition Ids and initial offset positions
protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> initOffsets, Collection<IdempotentStorageManager> newManagers)
{
- Kryo kryo = new Kryo();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Output output = new Output(bos);
- kryo.writeObject(output, this);
- output.close();
- Input lInput = new Input(bos.toByteArray());
- @SuppressWarnings("unchecked")
- Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<AbstractKafkaInputOperator<K>>(kryo.readObject(lInput, this.getClass()));
+
+ Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<>(KryoCloneUtils.cloneObject(this));
if (p.getPartitionedInstance().getConsumer() instanceof SimpleKafkaConsumer) {
p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets);
if (initOffsets != null) {
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
index e7186b5..6306b04 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kinesis/AbstractKinesisInputOperator.java
@@ -26,11 +26,9 @@ import com.datatorrent.api.*;
import com.datatorrent.api.Operator.ActivationListener;
import com.datatorrent.common.util.Pair;
import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.util.KryoCloneUtils;
+
import com.esotericsoftware.kryo.DefaultSerializer;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.NotNull;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.JavaSerializer;
import com.google.common.collect.Sets;
@@ -40,8 +38,8 @@ import org.slf4j.LoggerFactory;
import javax.validation.Valid;
import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.*;
@@ -374,13 +372,7 @@ public abstract class AbstractKinesisInputOperator <T> implements InputOperator,
private
Partition<AbstractKinesisInputOperator> createPartition(Set<String> shardIds, Map<String, String> initShardPos, Collection<IdempotentStorageManager> newManagers)
{
- Kryo kryo = new Kryo();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Output output = new Output(bos);
- kryo.writeObject(output, this);
- output.close();
- Input lInput = new Input(bos.toByteArray());
- Partition<AbstractKinesisInputOperator> p = new DefaultPartition<AbstractKinesisInputOperator>(kryo.readObject(lInput, this.getClass()));
+ Partition<AbstractKinesisInputOperator> p = new DefaultPartition<AbstractKinesisInputOperator>(KryoCloneUtils.cloneObject(this));
newManagers.add(p.getPartitionedInstance().idempotentStorageManager);
p.getPartitionedInstance().getConsumer().setShardIds(shardIds);
p.getPartitionedInstance().getConsumer().resetShardPositions(initShardPos);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
index d2491e1..1368cc0 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
@@ -18,8 +18,7 @@
*/
package com.datatorrent.contrib.kafka;
-import com.datatorrent.lib.util.TestUtils;
-import com.esotericsoftware.kryo.Kryo;
+import com.datatorrent.lib.util.KryoCloneUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -39,7 +38,7 @@ public class SimpleKakfaConsumerTest
kc.setTopic("test_topic");
kc.setClientId("test_clientid");
- SimpleKafkaConsumer kcClone = TestUtils.clone(new Kryo(), kc);
+ SimpleKafkaConsumer kcClone = KryoCloneUtils.cloneObject(kc);
Assert.assertEquals("Buffer size is " + bufferSize, bufferSize, kcClone.getBufferSize());
Assert.assertEquals("Cache size is " + cacheSize, cacheSize, kcClone.getCacheSize());
Assert.assertEquals("Clint id is same", kc.getClientId(), kcClone.getClientId());
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java b/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
index 5340b0e..2975c9c 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/util/FieldValueSerializableGenerator.java
@@ -123,6 +123,7 @@ public class FieldValueSerializableGenerator< T extends FieldInfo> extends Field
{
if( _kryo == null )
_kryo = new Kryo();
+ _kryo.setClassLoader(clazz.getClassLoader());
}
}
return _kryo;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 53bbd2a..bf36064 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -18,7 +18,6 @@
*/
package org.apache.apex.malhar.kafka;
-import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -38,14 +37,12 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Joiner;
import com.datatorrent.api.DefaultPartition;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.StatsListener;
+import com.datatorrent.lib.util.KryoCloneUtils;
/**
* Abstract partitioner used to manage the partitions of kafka input operator.
@@ -161,14 +158,7 @@ public abstract class AbstractKafkaPartitioner implements Partitioner<AbstractKa
protected Partitioner.Partition<AbstractKafkaInputOperator> createPartition(Set<AbstractKafkaPartitioner.PartitionMeta> partitionAssignment)
{
- Kryo kryo = new Kryo();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Output output = new Output(bos);
- kryo.writeObject(output, prototypeOperator);
- output.close();
- Input lInput = new Input(bos.toByteArray());
- Partitioner.Partition<AbstractKafkaInputOperator> p = (Partitioner.Partition<AbstractKafkaInputOperator>)
- new DefaultPartition<>(kryo.readObject(lInput, prototypeOperator.getClass()));
+ Partitioner.Partition<AbstractKafkaInputOperator> p = new DefaultPartition<AbstractKafkaInputOperator>(KryoCloneUtils.cloneObject(prototypeOperator));
p.getPartitionedInstance().assign(partitionAssignment);
return p;
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
index afde623..a6d1bd9 100644
--- a/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
+++ b/library/src/main/java/com/datatorrent/lib/io/block/AbstractBlockReader.java
@@ -18,7 +18,6 @@
*/
package com.datatorrent.lib.io.block;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
@@ -36,9 +35,6 @@ import org.slf4j.LoggerFactory;
import org.apache.commons.lang.mutable.MutableLong;
import org.apache.hadoop.fs.PositionedReadable;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -52,6 +48,7 @@ import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.counters.BasicCounters;
+import com.datatorrent.lib.util.KryoCloneUtils;
/**
* AbstractBlockReader processes a block of data from a stream.<br/>
@@ -295,20 +292,9 @@ public abstract class AbstractBlockReader<R, B extends BlockMetadata, STREAM ext
partitionIterator.remove();
}
} else {
- //Add more partitions
- Kryo kryo = new Kryo();
+ KryoCloneUtils<AbstractBlockReader<R, B, STREAM>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
while (morePartitionsToCreate-- > 0) {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Output loutput = new Output(bos);
- kryo.writeObject(loutput, this);
- loutput.close();
- Input lInput = new Input(bos.toByteArray());
-
- @SuppressWarnings("unchecked")
- AbstractBlockReader<R, B, STREAM> blockReader = kryo.readObject(lInput, this.getClass());
-
- DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<>(
- blockReader);
+ DefaultPartition<AbstractBlockReader<R, B, STREAM>> partition = new DefaultPartition<>(cloneUtils.getClone());
newPartitions.add(partition);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
index 5067b07..0bcf956 100644
--- a/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/io/fs/AbstractFileInputOperator.java
@@ -25,9 +25,6 @@ import java.util.regex.Pattern;
import javax.validation.constraints.NotNull;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -45,6 +42,7 @@ import com.datatorrent.lib.io.IdempotentStorageManager;
import com.datatorrent.api.*;
import com.datatorrent.api.Context.CountersAggregator;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.util.KryoCloneUtils;
/**
* This is the base implementation of a directory input operator, which scans a directory for files.
@@ -829,21 +827,14 @@ public abstract class AbstractFileInputOperator<T> implements InputOperator, Par
*/
List<DirectoryScanner> scanners = scanner.partition(totalCount, oldscanners);
- Kryo kryo = new Kryo();
Collection<Partition<AbstractFileInputOperator<T>>> newPartitions = Lists.newArrayListWithExpectedSize(totalCount);
Collection<IdempotentStorageManager> newManagers = Lists.newArrayListWithExpectedSize(totalCount);
+ KryoCloneUtils<AbstractFileInputOperator<T>> cloneUtils = KryoCloneUtils.createCloneUtils(this);
for (int i=0; i<scanners.size(); i++) {
- // Kryo.copy fails as it attempts to clone transient fields
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Output loutput = new Output(bos);
- kryo.writeObject(loutput, this);
- loutput.close();
- Input lInput = new Input(bos.toByteArray());
@SuppressWarnings("unchecked")
- AbstractFileInputOperator<T> oper = kryo.readObject(lInput, this.getClass());
- lInput.close();
+ AbstractFileInputOperator<T> oper = cloneUtils.getClone();
DirectoryScanner scn = scanners.get(i);
oper.setScanner(scn);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java b/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java
new file mode 100644
index 0000000..fbe6ce4
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/util/KryoCloneUtils.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.io.ByteArrayOutputStream;
+import java.lang.reflect.Array;
+
+import org.apache.commons.io.IOUtils;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ *
+ * A Kryo Clone Util class that clone object by using Kryo serializer and deserializer
+ * The class has static method that are can be used directly to clone one object
+ * Or it can be used as util instance to clone as many objects as you need from the one source object
+ *
+ * @since 3.4.0
+ */
+public class KryoCloneUtils<T>
+{
+
+ /**
+ * Reusable Kryo object as deserializer
+ */
+ private final Kryo kryo;
+
+ /**
+ * Reusable binary data for object that would be deserialized from
+ */
+ private final byte[] bin;
+
+ /**
+ * The class of the object
+ */
+ private final Class<T> clazz;
+
+ @SuppressWarnings("unchecked")
+ private KryoCloneUtils(Kryo kryo, T t)
+ {
+ this.kryo = kryo;
+ ByteArrayOutputStream bos = null;
+ Output output = null;
+ try {
+ bos = new ByteArrayOutputStream();
+ output = new Output(bos);
+ kryo.writeObject(output, t);
+ output.close();
+ bin = bos.toByteArray();
+ } finally {
+ IOUtils.closeQuietly(output);
+ IOUtils.closeQuietly(bos);
+ }
+ clazz = (Class<T>)t.getClass();
+ kryo.setClassLoader(clazz.getClassLoader());
+ }
+
+ /**
+ * Clone from the binary data of the source object
+ * @return T
+ */
+ public T getClone()
+ {
+ try (Input input = new Input(bin)) {
+ return kryo.readObject(input, clazz);
+ }
+ }
+
+ /**
+ * Clone array of objects from source object
+ * @param num size of the return array
+ * @return array of T
+ */
+ @SuppressWarnings("unchecked")
+ public T[] getClones(int num)
+ {
+ T[] ts = (T[])Array.newInstance(clazz, num);
+ try (Input input = new Input(bin)) {
+ for (int i = 0; i < ts.length; i++) {
+ input.rewind();
+ ts[i] = kryo.readObject(input, clazz);
+ }
+ }
+ return ts;
+ }
+
+
+
+ /**
+ * Clone object by serializing and deserializing using Kryo.
+ * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields.
+ *
+ * @param kryo kryo object used to clone objects
+ * @param src src object that copy from
+ * @return
+ */
+ @SuppressWarnings("unchecked")
+ public static <SRC> SRC cloneObject(Kryo kryo, SRC src)
+ {
+ kryo.setClassLoader(src.getClass().getClassLoader());
+ ByteArrayOutputStream bos = null;
+ Output output;
+ Input input = null;
+ try {
+ bos = new ByteArrayOutputStream();
+ output = new Output(bos);
+ kryo.writeObject(output, src);
+ output.close();
+ input = new Input(bos.toByteArray());
+ return (SRC)kryo.readObject(input, src.getClass());
+ } finally {
+ IOUtils.closeQuietly(input);
+ IOUtils.closeQuietly(bos);
+ }
+ }
+
+
+ /**
+ * Clone object by serializing and deserializing using default Kryo.
+ * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields.
+ *
+ * @param src src object that copy from
+ * @return
+ */
+ public static <SRC> SRC cloneObject(SRC src)
+ {
+ return cloneObject(new Kryo(), src);
+ }
+
+ /**
+ * Factory function to return CloneUtils object
+ * @param template
+ * @param <SRC>
+ * @return
+ */
+ public static <SRC> KryoCloneUtils<SRC> createCloneUtils(SRC template)
+ {
+ return createCloneUtils(new Kryo(), template);
+ }
+
+ /**
+ * Factory function to return CloneUtils object with customized Kryo
+ * @param kryo
+ * @param template
+ * @param <SRC>
+ * @return
+ */
+ public static <SRC> KryoCloneUtils<SRC> createCloneUtils(Kryo kryo, SRC template)
+ {
+ return new KryoCloneUtils<>(kryo, template);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java b/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java
index bd2c660..db74007 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/datastructs/DimensionalTableTest.java
@@ -21,7 +21,6 @@ package com.datatorrent.lib.appdata.datastructs;
import java.util.Map;
import java.util.Set;
-import com.esotericsoftware.kryo.Kryo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -29,7 +28,7 @@ import com.google.common.collect.Sets;
import org.junit.Assert;
import org.junit.Test;
-import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.KryoCloneUtils;
public class DimensionalTableTest
{
@@ -38,7 +37,7 @@ public class DimensionalTableTest
{
DimensionalTable<Integer> table = createTestTable();
- TestUtils.clone(new Kryo(), table);
+ KryoCloneUtils.cloneObject(table);
}
@Test
@@ -47,7 +46,7 @@ public class DimensionalTableTest
DimensionalTable<Integer> table = createTestTable();
int size = table.size();
- table = TestUtils.clone(new Kryo(), table);
+ table = KryoCloneUtils.cloneObject(table);
Assert.assertEquals(size, table.size());
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java
index 1f1895b..5f2d924 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/ResultFormatterTest.java
@@ -18,19 +18,17 @@
*/
package com.datatorrent.lib.appdata.schemas;
-import com.esotericsoftware.kryo.Kryo;
-
import org.junit.Assert;
import org.junit.Test;
-import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.KryoCloneUtils;
public class ResultFormatterTest
{
@Test
public void serializationTest() throws Exception
{
- TestUtils.clone(new Kryo(), new ResultFormatter());
+ KryoCloneUtils.cloneObject(new ResultFormatter());
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java b/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java
index 8d997d8..0513079 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/schemas/SchemaRegistryMultipleTest.java
@@ -21,14 +21,13 @@ package com.datatorrent.lib.appdata.schemas;
import java.util.Collections;
import java.util.Map;
-import com.esotericsoftware.kryo.Kryo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
-import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.KryoCloneUtils;
public class SchemaRegistryMultipleTest
{
@@ -50,7 +49,7 @@ public class SchemaRegistryMultipleTest
{
SchemaRegistryMultiple schemaRegistryMultiple = createSchemaRegistry();
- schemaRegistryMultiple = TestUtils.clone(new Kryo(), schemaRegistryMultiple);
+ schemaRegistryMultiple = KryoCloneUtils.cloneObject(schemaRegistryMultiple);
Assert.assertEquals(2, schemaRegistryMultiple.size());
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java
index e02e2c0..96b348c 100644
--- a/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java
+++ b/library/src/test/java/com/datatorrent/lib/appdata/snapshot/AppDataSnapshotServerMapTest.java
@@ -21,7 +21,6 @@ package com.datatorrent.lib.appdata.snapshot;
import java.util.List;
import java.util.Map;
-import com.esotericsoftware.kryo.Kryo;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -32,7 +31,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datatorrent.lib.testbench.CollectorTestSink;
-import com.datatorrent.lib.util.TestUtils;
+import com.datatorrent.lib.util.KryoCloneUtils;
public class AppDataSnapshotServerMapTest
{
@@ -103,7 +102,7 @@ public class AppDataSnapshotServerMapTest
Assert.assertEquals("b", secondRow.getString("word"));
//Test serialization
- TestUtils.clone(new Kryo(), snapshotServer);
+ KryoCloneUtils.cloneObject(snapshotServer);
}
private static final Logger LOG = LoggerFactory.getLogger(AppDataSnapshotServerMapTest.class);
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java b/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java
new file mode 100644
index 0000000..5eaea2e
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/util/KryoCloneUtilsTest.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.util.Objects;
+
+import org.junit.Test;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.lang3.RandomStringUtils;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Unit test for KryoCloneUtils
+ */
+public class KryoCloneUtilsTest
+{
+
+ @Test
+ public void testGetClone() throws Exception
+ {
+ TestEntity from = getTestEntity(5);
+ KryoCloneUtils<TestEntity> cloneUtils = KryoCloneUtils.createCloneUtils(from);
+ TestEntity to = cloneUtils.getClone();
+ assertFalse(from == to);
+ assertEquals(from, to);
+ assertFalse(from.transientProp.equals(to.transientProp));
+ }
+
+ @Test
+ public void testGetClones() throws Exception
+ {
+ TestEntity from = getTestEntity(5);
+ KryoCloneUtils<TestEntity> cloneUtils = KryoCloneUtils.createCloneUtils(from);
+ TestEntity[] to = cloneUtils.getClones(10);
+ for (TestEntity te : to) {
+ assertFalse(te == from);
+ assertEquals(from, te);
+ assertFalse(from.transientProp.equals(te.transientProp));
+ }
+ }
+
+ @Test
+ public void testCloneObject() throws Exception
+ {
+ TestEntity from = getTestEntity(5);
+ TestEntity to = KryoCloneUtils.cloneObject(from);
+ assertFalse(from == to);
+ assertEquals(from, to);
+ assertFalse(from.transientProp.equals(to.transientProp));
+ }
+
+ private TestEntity getTestEntity(int depth) {
+ TestEntity returnVal = null;
+ TestEntity curr = null;
+ while (depth-- > 0) {
+ if (curr == null) {
+ curr = returnVal = new TestEntity();
+ } else {
+ curr.nestedProp = new TestEntity();
+ curr = curr.nestedProp;
+ }
+ }
+ return returnVal;
+ }
+
+
+ static class TestEntity {
+
+ String strProp = RandomStringUtils.random(10);
+
+ int intProp = RandomUtils.nextInt(1000);
+
+ // transient should be skipped
+ transient String transientProp = RandomStringUtils.random(20);
+
+ // deep clone should be supported
+ TestEntity nestedProp = null;
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TestEntity that = (TestEntity)o;
+ return Objects.equals(intProp, that.intProp) &&
+ Objects.equals(strProp, that.strProp) &&
+ Objects.equals(nestedProp, that.nestedProp);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(strProp, intProp, nestedProp);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TestEntity{" +
+ "strProp='" + strProp + '\'' +
+ ", intProp=" + intProp +
+ ", transientProp='" + transientProp + '\'' +
+ ", nestedProp=" + nestedProp +
+ '}';
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java b/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
index 3114800..5b1b3f1 100644
--- a/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
+++ b/library/src/test/java/com/datatorrent/lib/util/PojoUtilsTest.java
@@ -61,7 +61,6 @@ import com.datatorrent.lib.util.PojoUtils.SetterInt;
import com.datatorrent.lib.util.PojoUtils.SetterLong;
import com.datatorrent.lib.util.PojoUtils.SetterShort;
-import com.esotericsoftware.kryo.Kryo;
public class PojoUtilsTest
@@ -113,7 +112,7 @@ public class PojoUtilsTest
public void testSerialization() throws Exception
{
GetterBoolean<Object> getBoolean = createGetterBoolean(fqcn, "innerObj.boolVal");
- TestUtils.clone(new Kryo(), getBoolean);
+ KryoCloneUtils.cloneObject(getBoolean);
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/48418737/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
index 2932089..37d55e7 100644
--- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
+++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
@@ -51,26 +51,6 @@ public class TestUtils
}
}
- /**
- * Clone object by serializing and deserializing using Kryo.
- * Note this is different from using {@link Kryo#copy(Object)}, which will attempt to also clone transient fields.
- * @param kryo
- * @param src
- * @return
- * @throws IOException
- */
- public static <T> T clone(Kryo kryo, T src) throws IOException
- {
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- Output output = new Output(bos);
- kryo.writeObject(output, src);
- output.close();
- Input input = new Input(bos.toByteArray());
- @SuppressWarnings("unchecked")
- Class<T> clazz = (Class<T>)src.getClass();
- return kryo.readObject(input, clazz);
- }
-
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <S extends Sink, T> S setSink(OutputPort<T> port, S sink)
{