You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by na...@apache.org on 2019/05/07 03:32:06 UTC

[incubator-hudi] branch master updated: migrating kryo's dependency from twitter chill to plain kryo library

This is an automated email from the ASF dual-hosted git repository.

nagarwal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7386353  migrating kryo's dependency from twitter chill to plain kryo library
7386353 is described below

commit 738635306bef3d5e7c9395d71409bc4dc8feb2cc
Author: Omkar Joshi <om...@uber.com>
AuthorDate: Tue Apr 30 19:00:30 2019 -0700

    migrating kryo's dependency from twitter chill to plain kryo library
---
 hoodie-common/pom.xml                              |   5 +
 .../common/table/log/block/HoodieDeleteBlock.java  |   2 +-
 .../hoodie/common/util/SerializationUtils.java     | 224 ++++++++++-----------
 .../common/util/collection/DiskBasedMap.java       |   4 +-
 .../common/util/collection/LazyFileIterable.java   |   5 +-
 .../hoodie/common/util/TestSerializationUtils.java |  79 ++++++++
 hoodie-utilities/pom.xml                           |  15 ++
 packaging/hoodie-hadoop-mr-bundle/pom.xml          |  15 ++
 packaging/hoodie-presto-bundle/pom.xml             |  12 ++
 packaging/hoodie-spark-bundle/pom.xml              |  12 ++
 10 files changed, 252 insertions(+), 121 deletions(-)

diff --git a/hoodie-common/pom.xml b/hoodie-common/pom.xml
index 912f68d..674ba50 100644
--- a/hoodie-common/pom.xml
+++ b/hoodie-common/pom.xml
@@ -146,5 +146,10 @@
         <artifactId>objectsize</artifactId>
         <version>0.0.12</version>
     </dependency>
+    <dependency>
+      <groupId>com.esotericsoftware</groupId>
+      <artifactId>kryo-shaded</artifactId>
+      <version>4.0.2</version>
+    </dependency>
   </dependencies>
 </project>
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java
index 9ee4f29..a31b512 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/table/log/block/HoodieDeleteBlock.java
@@ -85,7 +85,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
         int dataLength = dis.readInt();
         byte[] data = new byte[dataLength];
         dis.readFully(data);
-        this.keysToDelete = SerializationUtils.deserialize(data);
+        this.keysToDelete = SerializationUtils.<HoodieKey[]>deserialize(data);
         deflate();
       }
       return keysToDelete;
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java
index c52a666..84c05b6 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/SerializationUtils.java
@@ -16,156 +16,148 @@
 
 package com.uber.hoodie.common.util;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.reflectasm.ConstructorAccess;
 import com.uber.hoodie.exception.HoodieSerializationException;
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.OutputStream;
 import java.io.Serializable;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import org.objenesis.instantiator.ObjectInstantiator;
+
 
 /**
- * (NOTE: Adapted from Apache commons-lang3)
- * This class defines API's to serde an object.
+ * {@link SerializationUtils} class internally uses {@link Kryo} serializer for serializing /
+ * deserializing objects.
  */
 public class SerializationUtils {
+
+  // Caching kryo serializer to avoid creating kryo instance for every serde operation
+  private static final ThreadLocal<KryoSerializerInstance> serializerRef =
+      ThreadLocal.withInitial(() -> new KryoSerializerInstance());
+
   // Serialize
   //-----------------------------------------------------------------------
 
   /**
-   * <p>Serializes an {@code Object} to the specified stream.</p>
-   *
-   * <p>The stream will be closed once the object is written.
-   * This avoids the need for a finally clause, and maybe also exception
-   * handling, in the application code.</p>
-   *
-   * <p>The stream passed in is not buffered internally within this method.
-   * This is the responsibility of your application if desired.</p>
-   *
-   * @param obj the object to serialize to bytes, may be null
-   * @param outputStream the stream to write to, must not be null
-   * @throws IllegalArgumentException if {@code outputStream} is {@code null}
-   * @throws HoodieSerializationException (runtime) if the serialization fails
-   */
-  public static void serialize(final Serializable obj, final OutputStream outputStream) {
-    if (outputStream == null) {
-      throw new IllegalArgumentException("The OutputStream must not be null");
-    }
-    ObjectOutputStream out = null;
-    try {
-      // stream closed in the finally
-      out = new ObjectOutputStream(outputStream);
-      out.writeObject(obj);
-
-    } catch (final IOException ex) {
-      throw new HoodieSerializationException("unable to serialize object", ex);
-    } finally {
-      try {
-        if (out != null) {
-          out.close();
-        }
-      } catch (final IOException ex) { // NOPMD
-        // ignore close exception
-      }
-    }
-  }
-
-  /**
-   * <p>Serializes an {@code Object} to a byte array for
-   * storage/serialization.</p>
+   * <p>Serializes an {@code Object} to a byte array for storage/serialization.</p>
    *
    * @param obj the object to serialize to bytes
    * @return a byte[] with the converted Serializable
-   * @throws HoodieSerializationException (runtime) if the serialization fails
+   * @throws IOException if the serialization fails
    */
-  public static byte[] serialize(final Serializable obj) {
-    final ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
-    serialize(obj, baos);
-    return baos.toByteArray();
+  public static byte[] serialize(final Object obj) throws IOException {
+    return serializerRef.get().serialize(obj);
   }
 
   // Deserialize
   //-----------------------------------------------------------------------
 
   /**
-   * <p>
-   * Deserializes an {@code Object} from the specified stream.
-   * </p>
+   * <p> Deserializes a single {@code Object} from an array of bytes. </p>
    *
-   * <p>
-   * The stream will be closed once the object is written. This avoids the need for a finally clause, and maybe also
-   * exception handling, in the application code.
-   * </p>
-   *
-   * <p>
-   * The stream passed in is not buffered internally within this method. This is the responsibility of your
-   * application if desired.
-   * </p>
-   *
-   * <p>
-   * If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site.
-   * Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException.
-   * Note that in both cases, the ClassCastException is in the call site, not in this method.
-   * </p>
+   * <p> If the call site incorrectly types the return value, a {@link ClassCastException} is thrown
+   * from the call site. Without Generics in this declaration, the call site must type cast and can
+   * cause the same ClassCastException. Note that in both cases, the ClassCastException is in the
+   * call site, not in this method. </p>
    *
    * @param <T> the object type to be deserialized
-   * @param inputStream the serialized object input stream, must not be null
+   * @param objectData the serialized object, must not be null
    * @return the deserialized object
-   * @throws IllegalArgumentException if {@code inputStream} is {@code null}
+   * @throws IllegalArgumentException if {@code objectData} is {@code null}
    * @throws HoodieSerializationException (runtime) if the serialization fails
    */
-  public static <T> T deserialize(final InputStream inputStream) {
-    if (inputStream == null) {
-      throw new IllegalArgumentException("The InputStream must not be null");
+  public static <T> T deserialize(final byte[] objectData) {
+    if (objectData == null) {
+      throw new IllegalArgumentException("The byte[] must not be null");
     }
-    ObjectInputStream in = null;
-    try {
-      // stream closed in the finally
-      in = new ObjectInputStream(inputStream);
-      @SuppressWarnings("unchecked") // may fail with CCE if serialised form is incorrect
-      final T obj = (T) in.readObject();
-      return obj;
-
-    } catch (final ClassCastException ex) {
-      throw new HoodieSerializationException("cannot cast class", ex);
-    } catch (final ClassNotFoundException ex) {
-      throw new HoodieSerializationException("class not found", ex);
-    } catch (final IOException ex) {
-      throw new HoodieSerializationException("unable to deserialize to object", ex);
-    } finally {
-      try {
-        if (in != null) {
-          in.close();
-        }
-      } catch (final IOException ex) { // NOPMD
-        // ignore close exception
-      }
+    return (T) serializerRef.get().deserialize(objectData);
+  }
+
+  private static class KryoSerializerInstance implements Serializable {
+    public static final int KRYO_SERIALIZER_INITIAL_BUFFER_SIZE = 1048576;
+    private final Kryo kryo;
+    // Caching ByteArrayOutputStream to avoid recreating it for every operation
+    private final ByteArrayOutputStream baos;
+
+    KryoSerializerInstance() {
+      KryoInstantiator kryoInstantiator = new KryoInstantiator();
+      kryo = kryoInstantiator.newKryo();
+      baos = new ByteArrayOutputStream(KRYO_SERIALIZER_INITIAL_BUFFER_SIZE);
+      kryo.setRegistrationRequired(false);
+    }
+
+    byte[] serialize(Object obj) throws IOException {
+      kryo.reset();
+      baos.reset();
+      Output output = new Output(baos);
+      this.kryo.writeClassAndObject(output, obj);
+      output.close();
+      return baos.toByteArray();
+    }
+
+    Object deserialize(byte[] objectData) {
+      return this.kryo.readClassAndObject(new Input(objectData));
     }
   }
 
   /**
-   * <p>
-   * Deserializes a single {@code Object} from an array of bytes.
-   * </p>
-   *
-   * <p>
-   * If the call site incorrectly types the return value, a {@link ClassCastException} is thrown from the call site.
-   * Without Generics in this declaration, the call site must type cast and can cause the same ClassCastException.
-   * Note that in both cases, the ClassCastException is in the call site, not in this method.
-   * </p>
-   *
-   * @param <T> the object type to be deserialized
-   * @param objectData the serialized object, must not be null
-   * @return the deserialized object
-   * @throws IllegalArgumentException if {@code objectData} is {@code null}
-   * @throws HoodieSerializationException (runtime) if the serialization fails
+   * This class has a no-arg constructor, suitable for use with reflection instantiation.
+   * For Details checkout com.twitter.chill.KryoBase.
    */
-  public static <T> T deserialize(final byte[] objectData) {
-    if (objectData == null) {
-      throw new IllegalArgumentException("The byte[] must not be null");
+  private static class KryoInstantiator implements Serializable {
+
+    public Kryo newKryo() {
+
+      Kryo kryo = new KryoBase();
+      // ensure that kryo doesn't fail if classes are not registered with kryo.
+      kryo.setRegistrationRequired(false);
+      // This would be used for object initialization if nothing else works out.
+      kryo.setInstantiatorStrategy(new org.objenesis.strategy.StdInstantiatorStrategy());
+      // Handle cases where we may have an odd classloader setup like with libjars
+      // for hadoop
+      kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+      return kryo;
+    }
+
+    private static class KryoBase extends Kryo {
+      @Override
+      protected Serializer newDefaultSerializer(Class type) {
+        final Serializer serializer = super.newDefaultSerializer(type);
+        if (serializer instanceof FieldSerializer) {
+          final FieldSerializer fieldSerializer = (FieldSerializer) serializer;
+          fieldSerializer.setIgnoreSyntheticFields(true);
+        }
+        return serializer;
+      }
+
+      @Override
+      protected ObjectInstantiator newInstantiator(Class type) {
+        return () -> {
+          // First try reflectasm - it is fastest way to instantiate an object.
+          try {
+            final ConstructorAccess access = ConstructorAccess.get(type);
+            return access.newInstance();
+          } catch (Throwable t) {
+            // ignore this exception. We may want to try other way.
+          }
+          // fall back to java based instantiation.
+          try {
+            final Constructor constructor = type.getConstructor();
+            constructor.setAccessible(true);
+            return constructor.newInstance();
+          } catch (NoSuchMethodException | IllegalAccessException
+              | InstantiationException | InvocationTargetException e) {
+            // ignore this exception. we will fall back to default instantiation strategy.
+          }
+          return super.getInstantiatorStrategy().newInstantiatorOf(type).newInstance();
+        };
+      }
     }
-    return deserialize(new ByteArrayInputStream(objectData));
   }
 }
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
index 047b997..9fd0091 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
@@ -156,8 +156,8 @@ public final class DiskBasedMap<T extends Serializable, R extends Serializable>
       return null;
     }
     try {
-      return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
-          entry.getOffsetOfValue(), entry.getSizeOfValue()));
+      return SerializationUtils.<R>deserialize(SpillableMapUtils
+          .readBytesFromDisk(readOnlyFileHandle, entry.getOffsetOfValue(), entry.getSizeOfValue()));
     } catch (IOException e) {
       throw new HoodieIOException("Unable to readFromDisk Hoodie Record from disk", e);
     }
diff --git a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java
index c2d74e0..08aa784 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/LazyFileIterable.java
@@ -81,8 +81,9 @@ public class LazyFileIterable<T, R> implements Iterable<R> {
     public R next() {
       Map.Entry<T, DiskBasedMap.ValueMetadata> entry = this.metadataIterator.next();
       try {
-        return SerializationUtils.deserialize(SpillableMapUtils.readBytesFromDisk(readOnlyFileHandle,
-            entry.getValue().getOffsetOfValue(), entry.getValue().getSizeOfValue()));
+        return SerializationUtils.<R>deserialize(SpillableMapUtils
+            .readBytesFromDisk(readOnlyFileHandle, entry.getValue().getOffsetOfValue(),
+                entry.getValue().getSizeOfValue()));
       } catch (IOException e) {
         throw new HoodieIOException("Unable to read hoodie record from value spilled to disk", e);
       }
diff --git a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java
new file mode 100644
index 0000000..f4a45b0
--- /dev/null
+++ b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestSerializationUtils.java
@@ -0,0 +1,79 @@
+/*
+ *  Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-group@uber.com)
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.uber.hoodie.common.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.Objects;
+import org.apache.avro.util.Utf8;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSerializationUtils {
+
+  @Test
+  public void testSerDeser() throws IOException {
+    // It should handle null object references.
+    verifyObject(null);
+    // Object with nulls.
+    verifyObject(new NonSerializableClass(null));
+    // Object with valid values & no default constructor.
+    verifyObject(new NonSerializableClass("testValue"));
+    // Object with multiple constructor
+    verifyObject(new NonSerializableClass("testValue1", "testValue2"));
+    // Object which is of non-serializable class.
+    verifyObject(new Utf8("test-key"));
+    // Verify serialization of list.
+    verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5)));
+  }
+
+  private <T> void verifyObject(T expectedValue) throws IOException {
+    byte[] serializedObject = SerializationUtils.serialize(expectedValue);
+    Assert.assertTrue(serializedObject != null && serializedObject.length > 0);
+
+    final T deserializedValue = SerializationUtils.<T>deserialize(serializedObject);
+    if (expectedValue == null) {
+      Assert.assertNull(deserializedValue);
+    } else {
+      Assert.assertTrue(expectedValue.equals(deserializedValue));
+    }
+  }
+
+  private static class NonSerializableClass {
+    private String id;
+    private String name;
+
+    NonSerializableClass(String id) {
+      this(id, "");
+    }
+
+    NonSerializableClass(String id, String name) {
+      this.id = id;
+      this.name = name;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (!(obj instanceof  NonSerializableClass)) {
+        return false;
+      }
+      final NonSerializableClass other = (NonSerializableClass) obj;
+      return Objects.equals(this.id, other.id) && Objects.equals(this.name, other.name);
+    }
+  }
+}
diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml
index c95950a..00b853d 100644
--- a/hoodie-utilities/pom.xml
+++ b/hoodie-utilities/pom.xml
@@ -81,6 +81,9 @@
                   <include>org.apache.hive:hive-service</include>
                   <include>org.apache.hive:hive-metastore</include>
                   <include>org.apache.hive:hive-jdbc</include>
+                  <include>com.esotericsoftware:kryo-shaded</include>
+                  <include>org.objenesis:objenesis</include>
+                  <include>com.esotericsoftware:minlog</include>
                 </includes>
               </artifactSet>
               <relocations>
@@ -120,6 +123,18 @@
                   <pattern>org.apache.hadoop.hive.service.</pattern>
                   <shadedPattern>com.uber.hoodie.org.apache.hadoop_hive.service.</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>com.esotericsoftware.kryo.</pattern>
+                  <shadedPattern>com.uber.hoodie.com.esotericsoftware.kryo.</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.objenesis.</pattern>
+                  <shadedPattern>com.uber.hoodie.org.objenesis.</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.esotericsoftware.minlog.</pattern>
+                  <shadedPattern>com.uber.hoodie.com.esotericsoftware.minlog.</shadedPattern>
+                </relocation>
               </relocations>
             </configuration>
           </execution>
diff --git a/packaging/hoodie-hadoop-mr-bundle/pom.xml b/packaging/hoodie-hadoop-mr-bundle/pom.xml
index b25cc29..f25d0f7 100644
--- a/packaging/hoodie-hadoop-mr-bundle/pom.xml
+++ b/packaging/hoodie-hadoop-mr-bundle/pom.xml
@@ -200,6 +200,18 @@
                    <pattern>org.apache.commons</pattern>
                    <shadedPattern>com.uber.hoodie.org.apache.commons</shadedPattern>
                  </relocation>
+                 <relocation>
+                   <pattern>com.esotericsoftware.kryo.</pattern>
+                   <shadedPattern>com.uber.hoodie.com.esotericsoftware.kryo.</shadedPattern>
+                 </relocation>
+                 <relocation>
+                   <pattern>org.objenesis.</pattern>
+                   <shadedPattern>com.uber.hoodie.org.objenesis.</shadedPattern>
+                 </relocation>
+                 <relocation>
+                   <pattern>com.esotericsoftware.minlog.</pattern>
+                   <shadedPattern>com.uber.hoodie.com.esotericsoftware.minlog.</shadedPattern>
+                 </relocation>
               </relocations>
               <createDependencyReducedPom>false</createDependencyReducedPom>
               <artifactSet>
@@ -211,6 +223,9 @@
                   <include>com.twitter.common:objectsize</include>
                   <include>commons-logging:commons-logging</include>
                   <include>commons-io:commons-io</include>
+                  <include>com.esotericsoftware:kryo-shaded</include>
+                  <include>org.objenesis:objenesis</include>
+                  <include>com.esotericsoftware:minlog</include>
                 </includes>
               </artifactSet>
               <finalName>${project.artifactId}-${project.version}</finalName>
diff --git a/packaging/hoodie-presto-bundle/pom.xml b/packaging/hoodie-presto-bundle/pom.xml
index 750fba9..8405b90 100644
--- a/packaging/hoodie-presto-bundle/pom.xml
+++ b/packaging/hoodie-presto-bundle/pom.xml
@@ -161,6 +161,18 @@
                    <pattern>parquet.schema.</pattern>
                    <shadedPattern>com.uber.hoodie.parquet.schema.</shadedPattern>
                  </relocation>
+                 <relocation>
+                   <pattern>com.esotericsoftware.kryo.</pattern>
+                   <shadedPattern>com.uber.hoodie.com.esotericsoftware.kryo.</shadedPattern>
+                 </relocation>
+                 <relocation>
+                   <pattern>org.objenesis.</pattern>
+                   <shadedPattern>com.uber.hoodie.org.objenesis.</shadedPattern>
+                 </relocation>
+                 <relocation>
+                   <pattern>com.esotericsoftware.minlog.</pattern>
+                   <shadedPattern>com.uber.hoodie.com.esotericsoftware.minlog.</shadedPattern>
+                 </relocation>
               </relocations>
               <createDependencyReducedPom>false</createDependencyReducedPom>
               <artifactSet>
diff --git a/packaging/hoodie-spark-bundle/pom.xml b/packaging/hoodie-spark-bundle/pom.xml
index f3b45f4..f1e3cee 100644
--- a/packaging/hoodie-spark-bundle/pom.xml
+++ b/packaging/hoodie-spark-bundle/pom.xml
@@ -148,6 +148,18 @@
                   <pattern>org.apache.hadoop.hive.service.</pattern>
                   <shadedPattern>com.uber.hoodie.org.apache.hadoop_hive.service.</shadedPattern>
                 </relocation>
+                <relocation>
+                  <pattern>com.esotericsoftware.kryo.</pattern>
+                  <shadedPattern>com.uber.hoodie.com.esotericsoftware.kryo.</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.objenesis.</pattern>
+                  <shadedPattern>com.uber.hoodie.org.objenesis.</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.esotericsoftware.minlog.</pattern>
+                  <shadedPattern>com.uber.hoodie.com.esotericsoftware.minlog.</shadedPattern>
+                </relocation>
               </relocations>
               <createDependencyReducedPom>false</createDependencyReducedPom>
               <artifactSet>