You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/12/11 13:42:47 UTC

[1/2] incubator-beam git commit: [BEAM-921] spark-runner: register sources and coders to serialize with java serializer

Repository: incubator-beam
Updated Branches:
  refs/heads/master e841b1a21 -> bf8a3cb3a


[BEAM-921] spark-runner: register sources and coders to serialize with java serializer


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aba40e2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aba40e2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aba40e2d

Branch: refs/heads/master
Commit: aba40e2de9ba058f33086eb6a913fa583a82b058
Parents: e841b1a
Author: Aviem Zur <av...@gmail.com>
Authored: Thu Dec 8 15:07:06 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Sun Dec 11 15:18:51 2016 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           | 35 +++++-------
 .../coders/BeamSparkRunnerRegistrator.java      | 60 +++++++++++++++-----
 .../coders/BeamSparkRunnerRegistratorTest.java  | 57 +++++++++++++++++++
 3 files changed, 118 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index d1ef225..86e9039 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -54,7 +54,7 @@
     </profile>
 
     <profile>
-      <!-- This profile adds execution of RunnableOnService integration tests 
+      <!-- This profile adds execution of RunnableOnService integration tests
            against a local Spark endpoint. -->
       <id>local-runnable-on-service-tests</id>
       <activation><activeByDefault>false</activeByDefault></activation>
@@ -134,28 +134,14 @@
       <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
+    <!-- Kryo bugfix version needed due to a state re-use issue in Kryo version 2.21 used in Spark 1.x
+    See: https://issues.apache.org/jira/browse/SPARK-7708
+    See: https://github.com/EsotericSoftware/kryo/issues/312
+    -->
     <dependency>
       <groupId>com.esotericsoftware.kryo</groupId>
       <artifactId>kryo</artifactId>
-      <version>2.21</version>
-      <scope>provided</scope>
-    </dependency>
-    <dependency>
-      <groupId>de.javakaffee</groupId>
-      <artifactId>kryo-serializers</artifactId>
-      <version>0.39</version>
-      <exclusions>
-        <!-- Use Spark's Kryo -->
-        <exclusion>
-          <groupId>com.esotericsoftware</groupId>
-          <artifactId>kryo</artifactId>
-        </exclusion>
-        <!-- We only really need the serializer implementations -->
-        <exclusion>
-          <groupId>com.google.protobuf</groupId>
-          <artifactId>protobuf-java</artifactId>
-        </exclusion>
-      </exclusions>
+      <version>2.21.1</version>
     </dependency>
     <dependency>
       <groupId>com.google.code.findbugs</groupId>
@@ -264,6 +250,11 @@
       <artifactId>metrics-core</artifactId>
       <version>${dropwizard.metrics.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.reflections</groupId>
+      <artifactId>reflections</artifactId>
+      <version>0.9.10</version>
+    </dependency>
 
     <!-- KafkaIO -->
     <dependency>
@@ -405,6 +396,10 @@
                     <pattern>com.google.thirdparty</pattern>
                     <shadedPattern>org.apache.beam.spark.relocated.com.google.thirdparty</shadedPattern>
                   </relocation>
+                  <relocation>
+                    <pattern>com.esotericsoftware.kryo</pattern>
+                    <shadedPattern>org.apache.beam.spark.relocated.com.esotericsoftware.kryo</shadedPattern>
+                  </relocation>
                 </relocations>
                 <shadedArtifactAttached>true</shadedArtifactAttached>
                 <shadedClassifierName>spark-app</shadedClassifierName>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
index 0e62781..41b0a01 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -19,28 +19,60 @@
 package org.apache.beam.runners.spark.coders;
 
 import com.esotericsoftware.kryo.Kryo;
-import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
-import de.javakaffee.kryoserializers.guava.ReverseListSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.Source;
 import org.apache.spark.serializer.KryoRegistrator;
+import org.reflections.Reflections;
 
 
 /**
- * Custom {@link com.esotericsoftware.kryo.Serializer}s for Beam's Spark runner needs.
+ * Custom {@link KryoRegistrator}s for Beam's Spark runner needs.
  */
 public class BeamSparkRunnerRegistrator implements KryoRegistrator {
 
   @Override
   public void registerClasses(Kryo kryo) {
-    UnmodifiableCollectionsSerializer.registerSerializers(kryo);
-    // Guava
-    ImmutableListSerializer.registerSerializers(kryo);
-    ImmutableSetSerializer.registerSerializers(kryo);
-    ImmutableMapSerializer.registerSerializers(kryo);
-    ImmutableMultimapSerializer.registerSerializers(kryo);
-    ReverseListSerializer.registerSerializers(kryo);
+    for (Class<?> clazz : ClassesForJavaSerialization.getClasses()) {
+      kryo.register(clazz, new JavaSerializer());
+    }
+  }
+
+  /**
+   * Register coders and sources with {@link JavaSerializer} since they aren't guaranteed to be
+   * Kryo-serializable.
+   */
+  private static class ClassesForJavaSerialization {
+    private static final Class<?>[] CLASSES_FOR_JAVA_SERIALIZATION = new Class<?>[]{
+        Coder.class, Source.class
+    };
+
+    private static final Iterable<Class<?>> INSTANCE;
+
+    /**
+     * Find all subclasses of ${@link CLASSES_FOR_JAVA_SERIALIZATION}
+     */
+    static {
+      final Reflections reflections = new Reflections();
+      INSTANCE = Iterables.concat(Lists.transform(Arrays.asList(CLASSES_FOR_JAVA_SERIALIZATION),
+          new Function<Class, Set<Class<?>>>() {
+            @SuppressWarnings({"unchecked", "ConstantConditions"})
+            @Nullable
+            @Override
+            public Set<Class<?>> apply(@Nullable Class clazz) {
+              return reflections.getSubTypesOf(clazz);
+            }
+          }));
+    }
+
+    static Iterable<Class<?>> getClasses() {
+      return INSTANCE;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
new file mode 100644
index 0000000..e353017
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.coders;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.Source;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.reflections.Reflections;
+
+
+/**
+ * BeamSparkRunnerRegistrator Test.
+ */
+public class BeamSparkRunnerRegistratorTest {
+  @Test
+  public void testCodersAndSourcesRegistration() {
+    BeamSparkRunnerRegistrator registrator = new BeamSparkRunnerRegistrator();
+
+    Reflections reflections = new Reflections();
+    Iterable<Class<? extends Serializable>> classesForJavaSerialization =
+        Iterables.concat(reflections.getSubTypesOf(Coder.class),
+            reflections.getSubTypesOf(Source.class));
+
+    Kryo kryo = new Kryo();
+
+    registrator.registerClasses(kryo);
+
+    for (Class<?> clazz : classesForJavaSerialization) {
+      Assert.assertThat("Registered serializer for class " + clazz.getName()
+              + " was not an instance of " + JavaSerializer.class.getName(),
+          kryo.getSerializer(clazz),
+          Matchers.instanceOf(JavaSerializer.class));
+    }
+  }
+}


[2/2] incubator-beam git commit: This closes #1552

Posted by am...@apache.org.
This closes #1552


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf8a3cb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf8a3cb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf8a3cb3

Branch: refs/heads/master
Commit: bf8a3cb3a5948bd1ef7f7e5cef230ecd4e8f1c84
Parents: e841b1a aba40e2
Author: Sela <an...@paypal.com>
Authored: Sun Dec 11 15:19:24 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Sun Dec 11 15:19:24 2016 +0200

----------------------------------------------------------------------
 runners/spark/pom.xml                           | 35 +++++-------
 .../coders/BeamSparkRunnerRegistrator.java      | 60 +++++++++++++++-----
 .../coders/BeamSparkRunnerRegistratorTest.java  | 57 +++++++++++++++++++
 3 files changed, 118 insertions(+), 34 deletions(-)
----------------------------------------------------------------------