You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/01/11 21:31:41 UTC
[1/2] apex-malhar git commit: APEXMALHAR-2358 #resolve #comment
Optimise GenericSerde to use specific serde to improve the performance
Repository: apex-malhar
Updated Branches:
refs/heads/master 0852c6594 -> ca6995ca4
APEXMALHAR-2358 #resolve #comment Optimise GenericSerde to use specific serde to improve the performance
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b2b135df
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b2b135df
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b2b135df
Branch: refs/heads/master
Commit: b2b135df218422d5966a5db7fb0add749564bb1f
Parents: 91767c5
Author: brightchen <br...@datatorrent.com>
Authored: Mon Nov 28 11:17:14 2016 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Tue Jan 10 17:09:10 2017 -0800
----------------------------------------------------------------------
.../util/serde/GenericSerdePerformanceTest.java | 118 +++++++++++++++++++
.../malhar/lib/utils/serde/GenericSerde.java | 62 ++++++++--
.../lib/utils/serde/ImmutablePairSerde.java | 60 ++++++++++
.../malhar/lib/utils/serde/TimeWindowSerde.java | 42 +++++++
.../lib/utils/serde/GenericSerdeTest.java | 17 +++
5 files changed, 288 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
new file mode 100644
index 0000000..98ecf67
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.util.serde;
+
+import java.util.Random;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import com.esotericsoftware.kryo.Kryo;
+
+public class GenericSerdePerformanceTest
+{
+ private static final transient Logger logger = LoggerFactory.getLogger(GenericSerdePerformanceTest.class);
+ private SerializationBuffer buffer = SerializationBuffer.READ_BUFFER;
+ private Random random = new Random();
+ private int serdeDataSize = 1000000;
+
+
+ @Test
+ public void testCompareSerdeForString()
+ {
+ long beginTime = System.currentTimeMillis();
+ testSerdeForString(new GenericSerde<String>(String.class));
+ long genericSerdeCost = System.currentTimeMillis() - beginTime;
+ logger.info("Generic Serde cost for String: {}", genericSerdeCost);
+
+ beginTime = System.currentTimeMillis();
+ testSerdeForString(new StringSerde());
+ long stringSerdeCost = System.currentTimeMillis() - beginTime;
+ logger.info("String Serde cost for String: {}", stringSerdeCost);
+
+ beginTime = System.currentTimeMillis();
+ Kryo kryo = new Kryo();
+ for (int i = 0; i < serdeDataSize; ++i) {
+ kryo.writeObject(buffer, "" + random.nextInt(1000));
+ buffer.toSlice();
+ }
+ buffer.release();
+ long kryoSerdeCost = System.currentTimeMillis() - beginTime;
+ logger.info("Kryo Serde cost for String: {}", kryoSerdeCost);
+ }
+
+ protected void testSerdeForString(Serde<String> serde)
+ {
+ for (int i = 0; i < serdeDataSize; ++i) {
+ serde.serialize("" + random.nextInt(1000), buffer);
+ buffer.toSlice();
+ }
+ buffer.release();
+ }
+
+
+ @Test
+ public void testCompareSerdeForRealCase()
+ {
+ long beginTime = System.currentTimeMillis();
+ GenericSerde<ImmutablePair> serde = new GenericSerde<ImmutablePair>();
+ for (int i = 0; i < serdeDataSize; ++i) {
+ serde.serialize(generatePair(beginTime), buffer);
+ buffer.toSlice();
+ }
+ buffer.release();
+ long genericSerdeCost = System.currentTimeMillis() - beginTime;
+ logger.info("Generic Serde cost for ImmutablePair: {}", genericSerdeCost);
+
+
+ beginTime = System.currentTimeMillis();
+ Kryo kryo = new Kryo();
+ for (int i = 0; i < serdeDataSize; ++i) {
+ kryo.writeObject(buffer, generatePair(beginTime));
+ buffer.toSlice();
+ }
+ buffer.release();
+ long kryoSerdeCost = System.currentTimeMillis() - beginTime;
+ logger.info("Kryo Serde cost for ImmutablePair without class info: {}", kryoSerdeCost);
+
+
+ beginTime = System.currentTimeMillis();
+ Kryo kryo1 = new Kryo();
+ for (int i = 0; i < serdeDataSize; ++i) {
+ kryo1.writeClassAndObject(buffer, generatePair(beginTime));
+ buffer.toSlice();
+ }
+ buffer.release();
+ long kryoSerdeCost2 = System.currentTimeMillis() - beginTime;
+ logger.info("Kryo Serde cost for ImmutablePair with class info: {}", kryoSerdeCost2);
+ }
+
+ protected ImmutablePair generatePair(long now)
+ {
+ return new ImmutablePair(new Window.TimeWindow(now + random.nextInt(100), random.nextInt(100)), "" + random.nextInt(1000));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
index 8501614..4b28a00 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
@@ -18,11 +18,16 @@
*/
package org.apache.apex.malhar.lib.utils.serde;
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Maps;
/**
* Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
@@ -36,31 +41,62 @@ import com.esotericsoftware.kryo.io.Output;
@InterfaceStability.Evolving
public class GenericSerde<T> implements Serde<T>
{
- private transient ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
- {
- @Override
- public Kryo get()
- {
- return new Kryo();
- }
- };
+ /**
+ * The default GenericSerde use the default class to serde map
+ */
+ public static final GenericSerde DEFAULT = new GenericSerde();
+ private transient Kryo kryo = new Kryo();
private final Class<? extends T> clazz;
+ @SuppressWarnings("rawtypes")
+ private Map<Class, Serde> typeToSerde = Maps.newHashMap();
+
+ public <C> void registerSerde(Class<C> type, Serde<C> serde)
+ {
+ typeToSerde.put(type, serde);
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ public void registerDefaultSerdes()
+ {
+ registerSerde(String.class, new StringSerde());
+ registerSerde(Long.class, new LongSerde());
+ registerSerde(Integer.class, new IntSerde());
+ registerSerde(ImmutablePair.class, new ImmutablePairSerde());
+ registerSerde(Window.TimeWindow.class, new TimeWindowSerde());
+ }
+
public GenericSerde()
{
- this.clazz = null;
+ this(null);
}
public GenericSerde(Class<? extends T> clazz)
{
this.clazz = clazz;
+ registerDefaultSerdes();
+ }
+
+ public Serde getDefaultSerde(Class type)
+ {
+ return typeToSerde.get(type);
}
@Override
public void serialize(T object, Output output)
{
- Kryo kryo = kryos.get();
+ Class type = object.getClass();
+ Serde serde = null;
+ if (clazz == type) {
+ serde = getDefaultSerde(type);
+ }
+ if (serde != null) {
+ serde.serialize(object, output);
+ return;
+ }
+
+ //delegate to kryo
if (clazz == null) {
kryo.writeClassAndObject(output, object);
} else {
@@ -71,8 +107,12 @@ public class GenericSerde<T> implements Serde<T>
@Override
public T deserialize(Input input)
{
+ Serde serde = clazz == null ? null : getDefaultSerde(clazz);
+ if (serde != null) {
+ return (T)serde.deserialize(input);
+ }
+
T object;
- Kryo kryo = kryos.get();
if (clazz == null) {
object = (T)kryo.readClassAndObject(input);
} else {
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java
new file mode 100644
index 0000000..98ced16
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * An implementation of {@link Serde} which serializes and deserializes {@link ImmtablePair}s.
+ *
+ */
+@InterfaceStability.Evolving
+public class ImmutablePairSerde<L, R> implements Serde<ImmutablePair<L, R>>
+{
+ private Serde<L> leftSerde;
+ private Serde<R> rightSerde;
+
+ public ImmutablePairSerde()
+ {
+ this(GenericSerde.DEFAULT, GenericSerde.DEFAULT);
+ }
+
+ public ImmutablePairSerde(Serde<L> leftSerde, Serde<R> rightSerde)
+ {
+ this.leftSerde = leftSerde;
+ this.rightSerde = rightSerde;
+ }
+
+ @Override
+ public void serialize(ImmutablePair<L, R> pair, Output output)
+ {
+ leftSerde.serialize(pair.left, output);
+ rightSerde.serialize(pair.right, output);
+ }
+
+ @Override
+ public ImmutablePair<L, R> deserialize(Input input)
+ {
+ throw new RuntimeException("Not Supported.");
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java
new file mode 100644
index 0000000..268a7ce
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.Window.TimeWindow;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class TimeWindowSerde implements Serde<Window.TimeWindow>
+{
+ @Override
+ public void serialize(TimeWindow timeWindow, Output output)
+ {
+ output.writeLong(timeWindow.getBeginTimestamp());
+ output.writeLong(timeWindow.getDurationMillis());
+ }
+
+ @Override
+ public TimeWindow deserialize(Input input)
+ {
+ return new TimeWindow(input.readLong(), input.readLong());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
index 34b7088..5ac043e 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
@@ -24,6 +24,8 @@ import java.util.List;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.apex.malhar.lib.window.Window;
+
import com.esotericsoftware.kryo.io.Input;
import com.google.common.collect.Lists;
@@ -70,6 +72,7 @@ public class GenericSerdeTest
Assert.assertEquals(stringList, deserializedList);
}
+
@Test
public void pojoTest()
{
@@ -81,4 +84,18 @@ public class GenericSerdeTest
TestPojo deserializedPojo = serdePojo.deserialize(new Input(slice.buffer, slice.offset, slice.length));
Assert.assertEquals(pojo, deserializedPojo);
}
+
+ @Test
+ public void timeWindowSerdeTest()
+ {
+ GenericSerde<Window.TimeWindow>[] serdes = new GenericSerde[] {new GenericSerde<>(Window.TimeWindow.class), GenericSerde.DEFAULT};
+ for (GenericSerde<Window.TimeWindow> serde : serdes) {
+ Window.TimeWindow pojo = new Window.TimeWindow(System.currentTimeMillis(), 1000);
+ SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+ serde.serialize(pojo, buffer);
+ Slice slice = buffer.toSlice();
+ Window.TimeWindow deserializedPojo = serde.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+ Assert.assertEquals(pojo, deserializedPojo);
+ }
+ }
}
[2/2] apex-malhar git commit: Merge commit 'refs/pull/514/head' of
github.com:apache/apex-malhar
Posted by da...@apache.org.
Merge commit 'refs/pull/514/head' of github.com:apache/apex-malhar
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ca6995ca
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ca6995ca
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ca6995ca
Branch: refs/heads/master
Commit: ca6995ca4b5c97bf5c400ac28e65cde3f9d5b772
Parents: 0852c65 b2b135d
Author: David Yan <da...@apache.org>
Authored: Wed Jan 11 13:30:54 2017 -0800
Committer: David Yan <da...@apache.org>
Committed: Wed Jan 11 13:30:54 2017 -0800
----------------------------------------------------------------------
.../util/serde/GenericSerdePerformanceTest.java | 118 +++++++++++++++++++
.../malhar/lib/utils/serde/GenericSerde.java | 62 ++++++++--
.../lib/utils/serde/ImmutablePairSerde.java | 60 ++++++++++
.../malhar/lib/utils/serde/TimeWindowSerde.java | 42 +++++++
.../lib/utils/serde/GenericSerdeTest.java | 17 +++
5 files changed, 288 insertions(+), 11 deletions(-)
----------------------------------------------------------------------