You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2017/01/11 21:31:41 UTC

[1/2] apex-malhar git commit: APEXMALHAR-2358 #resolve #comment Optimise GenericSerde to use specific serde to improve the performance

Repository: apex-malhar
Updated Branches:
  refs/heads/master 0852c6594 -> ca6995ca4


APEXMALHAR-2358 #resolve #comment Optimise GenericSerde to use specific serde to improve the performance


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/b2b135df
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/b2b135df
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/b2b135df

Branch: refs/heads/master
Commit: b2b135df218422d5966a5db7fb0add749564bb1f
Parents: 91767c5
Author: brightchen <br...@datatorrent.com>
Authored: Mon Nov 28 11:17:14 2016 -0800
Committer: brightchen <br...@datatorrent.com>
Committed: Tue Jan 10 17:09:10 2017 -0800

----------------------------------------------------------------------
 .../util/serde/GenericSerdePerformanceTest.java | 118 +++++++++++++++++++
 .../malhar/lib/utils/serde/GenericSerde.java    |  62 ++++++++--
 .../lib/utils/serde/ImmutablePairSerde.java     |  60 ++++++++++
 .../malhar/lib/utils/serde/TimeWindowSerde.java |  42 +++++++
 .../lib/utils/serde/GenericSerdeTest.java       |  17 +++
 5 files changed, 288 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
new file mode 100644
index 0000000..98ecf67
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/util/serde/GenericSerdePerformanceTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.util.serde;
+
+import java.util.Random;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import com.esotericsoftware.kryo.Kryo;
+
+public class GenericSerdePerformanceTest
+{
+  private static final transient Logger logger = LoggerFactory.getLogger(GenericSerdePerformanceTest.class);
+  private SerializationBuffer buffer = SerializationBuffer.READ_BUFFER;
+  private Random random = new Random();
+  private int serdeDataSize = 1000000;
+
+
+  @Test
+  public void testCompareSerdeForString()
+  {
+    long beginTime = System.currentTimeMillis();
+    testSerdeForString(new GenericSerde<String>(String.class));
+    long genericSerdeCost = System.currentTimeMillis() - beginTime;
+    logger.info("Generic Serde cost for String: {}", genericSerdeCost);
+
+    beginTime = System.currentTimeMillis();
+    testSerdeForString(new StringSerde());
+    long stringSerdeCost = System.currentTimeMillis() - beginTime;
+    logger.info("String Serde cost for String: {}", stringSerdeCost);
+
+    beginTime = System.currentTimeMillis();
+    Kryo kryo = new Kryo();
+    for (int i = 0; i < serdeDataSize; ++i) {
+      kryo.writeObject(buffer, "" + random.nextInt(1000));
+      buffer.toSlice();
+    }
+    buffer.release();
+    long kryoSerdeCost = System.currentTimeMillis() - beginTime;
+    logger.info("Kryo Serde cost for String: {}", kryoSerdeCost);
+  }
+
+  protected void testSerdeForString(Serde<String> serde)
+  {
+    for (int i = 0; i < serdeDataSize; ++i) {
+      serde.serialize("" + random.nextInt(1000), buffer);
+      buffer.toSlice();
+    }
+    buffer.release();
+  }
+
+
+  @Test
+  public void testCompareSerdeForRealCase()
+  {
+    long beginTime = System.currentTimeMillis();
+    GenericSerde<ImmutablePair> serde = new GenericSerde<ImmutablePair>();
+    for (int i = 0; i < serdeDataSize; ++i) {
+      serde.serialize(generatePair(beginTime), buffer);
+      buffer.toSlice();
+    }
+    buffer.release();
+    long genericSerdeCost = System.currentTimeMillis() - beginTime;
+    logger.info("Generic Serde cost for ImmutablePair: {}", genericSerdeCost);
+
+
+    beginTime = System.currentTimeMillis();
+    Kryo kryo = new Kryo();
+    for (int i = 0; i < serdeDataSize; ++i) {
+      kryo.writeObject(buffer, generatePair(beginTime));
+      buffer.toSlice();
+    }
+    buffer.release();
+    long kryoSerdeCost = System.currentTimeMillis() - beginTime;
+    logger.info("Kryo Serde cost for ImmutablePair without class info: {}", kryoSerdeCost);
+
+
+    beginTime = System.currentTimeMillis();
+    Kryo kryo1 = new Kryo();
+    for (int i = 0; i < serdeDataSize; ++i) {
+      kryo1.writeClassAndObject(buffer, generatePair(beginTime));
+      buffer.toSlice();
+    }
+    buffer.release();
+    long kryoSerdeCost2 = System.currentTimeMillis() - beginTime;
+    logger.info("Kryo Serde cost for ImmutablePair with class info: {}", kryoSerdeCost2);
+  }
+
+  protected ImmutablePair generatePair(long now)
+  {
+    return new ImmutablePair(new Window.TimeWindow(now + random.nextInt(100), random.nextInt(100)), "" + random.nextInt(1000));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
index 8501614..4b28a00 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
@@ -18,11 +18,16 @@
  */
 package org.apache.apex.malhar.lib.utils.serde;
 
+import java.util.Map;
+
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.classification.InterfaceStability;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import com.google.common.collect.Maps;
 
 /**
  * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
@@ -36,31 +41,62 @@ import com.esotericsoftware.kryo.io.Output;
 @InterfaceStability.Evolving
 public class GenericSerde<T> implements Serde<T>
 {
-  private transient ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
-  {
-    @Override
-    public Kryo get()
-    {
-      return new Kryo();
-    }
-  };
+  /**
+   * The default GenericSerde use the default class to serde map
+   */
+  public static final GenericSerde DEFAULT = new GenericSerde();
+  private transient Kryo kryo = new Kryo();
 
   private final Class<? extends T> clazz;
 
+  @SuppressWarnings("rawtypes")
+  private Map<Class, Serde> typeToSerde = Maps.newHashMap();
+
+  public <C> void registerSerde(Class<C> type, Serde<C> serde)
+  {
+    typeToSerde.put(type, serde);
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void registerDefaultSerdes()
+  {
+    registerSerde(String.class, new StringSerde());
+    registerSerde(Long.class, new LongSerde());
+    registerSerde(Integer.class, new IntSerde());
+    registerSerde(ImmutablePair.class, new ImmutablePairSerde());
+    registerSerde(Window.TimeWindow.class, new TimeWindowSerde());
+  }
+
   public GenericSerde()
   {
-    this.clazz = null;
+    this(null);
   }
 
   public GenericSerde(Class<? extends T> clazz)
   {
     this.clazz = clazz;
+    registerDefaultSerdes();
+  }
+
+  public Serde getDefaultSerde(Class type)
+  {
+    return typeToSerde.get(type);
   }
 
   @Override
   public void serialize(T object, Output output)
   {
-    Kryo kryo = kryos.get();
+    Class type = object.getClass();
+    Serde serde = null;
+    if (clazz == type) {
+      serde = getDefaultSerde(type);
+    }
+    if (serde != null) {
+      serde.serialize(object, output);
+      return;
+    }
+
+    //delegate to kryo
     if (clazz == null) {
       kryo.writeClassAndObject(output, object);
     } else {
@@ -71,8 +107,12 @@ public class GenericSerde<T> implements Serde<T>
   @Override
   public T deserialize(Input input)
   {
+    Serde serde = clazz == null ? null : getDefaultSerde(clazz);
+    if (serde != null) {
+      return (T)serde.deserialize(input);
+    }
+
     T object;
-    Kryo kryo = kryos.get();
     if (clazz == null) {
       object = (T)kryo.readClassAndObject(input);
     } else {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java
new file mode 100644
index 0000000..98ced16
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ImmutablePairSerde.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * An implementation of {@link Serde} which serializes and deserializes {@link ImmtablePair}s.
+ *
+ */
+@InterfaceStability.Evolving
+public class ImmutablePairSerde<L, R> implements Serde<ImmutablePair<L, R>>
+{
+  private Serde<L> leftSerde;
+  private Serde<R> rightSerde;
+
+  public ImmutablePairSerde()
+  {
+    this(GenericSerde.DEFAULT, GenericSerde.DEFAULT);
+  }
+
+  public ImmutablePairSerde(Serde<L> leftSerde, Serde<R> rightSerde)
+  {
+    this.leftSerde = leftSerde;
+    this.rightSerde = rightSerde;
+  }
+
+  @Override
+  public void serialize(ImmutablePair<L, R> pair, Output output)
+  {
+    leftSerde.serialize(pair.left, output);
+    rightSerde.serialize(pair.right, output);
+  }
+
+  @Override
+  public ImmutablePair<L, R> deserialize(Input input)
+  {
+    throw new RuntimeException("Not Supported.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java
new file mode 100644
index 0000000..268a7ce
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/TimeWindowSerde.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.window.Window;
+import org.apache.apex.malhar.lib.window.Window.TimeWindow;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public class TimeWindowSerde implements Serde<Window.TimeWindow>
+{
+  @Override
+  public void serialize(TimeWindow timeWindow, Output output)
+  {
+    output.writeLong(timeWindow.getBeginTimestamp());
+    output.writeLong(timeWindow.getDurationMillis());
+  }
+
+  @Override
+  public TimeWindow deserialize(Input input)
+  {
+    return new TimeWindow(input.readLong(), input.readLong());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b2b135df/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
index 34b7088..5ac043e 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
@@ -24,6 +24,8 @@ import java.util.List;
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.apex.malhar.lib.window.Window;
+
 import com.esotericsoftware.kryo.io.Input;
 import com.google.common.collect.Lists;
 
@@ -70,6 +72,7 @@ public class GenericSerdeTest
     Assert.assertEquals(stringList, deserializedList);
   }
 
+
   @Test
   public void pojoTest()
   {
@@ -81,4 +84,18 @@ public class GenericSerdeTest
     TestPojo deserializedPojo = serdePojo.deserialize(new Input(slice.buffer, slice.offset, slice.length));
     Assert.assertEquals(pojo, deserializedPojo);
   }
+
+  @Test
+  public void timeWindowSerdeTest()
+  {
+    GenericSerde<Window.TimeWindow>[] serdes = new GenericSerde[] {new GenericSerde<>(Window.TimeWindow.class), GenericSerde.DEFAULT};
+    for (GenericSerde<Window.TimeWindow> serde : serdes) {
+      Window.TimeWindow pojo = new Window.TimeWindow(System.currentTimeMillis(), 1000);
+      SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+      serde.serialize(pojo, buffer);
+      Slice slice = buffer.toSlice();
+      Window.TimeWindow deserializedPojo = serde.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+      Assert.assertEquals(pojo, deserializedPojo);
+    }
+  }
 }


[2/2] apex-malhar git commit: Merge commit 'refs/pull/514/head' of github.com:apache/apex-malhar

Posted by da...@apache.org.
Merge commit 'refs/pull/514/head' of github.com:apache/apex-malhar


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/ca6995ca
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/ca6995ca
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/ca6995ca

Branch: refs/heads/master
Commit: ca6995ca4b5c97bf5c400ac28e65cde3f9d5b772
Parents: 0852c65 b2b135d
Author: David Yan <da...@apache.org>
Authored: Wed Jan 11 13:30:54 2017 -0800
Committer: David Yan <da...@apache.org>
Committed: Wed Jan 11 13:30:54 2017 -0800

----------------------------------------------------------------------
 .../util/serde/GenericSerdePerformanceTest.java | 118 +++++++++++++++++++
 .../malhar/lib/utils/serde/GenericSerde.java    |  62 ++++++++--
 .../lib/utils/serde/ImmutablePairSerde.java     |  60 ++++++++++
 .../malhar/lib/utils/serde/TimeWindowSerde.java |  42 +++++++
 .../lib/utils/serde/GenericSerdeTest.java       |  17 +++
 5 files changed, 288 insertions(+), 11 deletions(-)
----------------------------------------------------------------------