You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/12/11 13:42:47 UTC
[1/2] incubator-beam git commit: [BEAM-921] spark-runner: register
sources and coders to serialize with java serializer
Repository: incubator-beam
Updated Branches:
refs/heads/master e841b1a21 -> bf8a3cb3a
[BEAM-921] spark-runner: register sources and coders to serialize with java serializer
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aba40e2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aba40e2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aba40e2d
Branch: refs/heads/master
Commit: aba40e2de9ba058f33086eb6a913fa583a82b058
Parents: e841b1a
Author: Aviem Zur <av...@gmail.com>
Authored: Thu Dec 8 15:07:06 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Sun Dec 11 15:18:51 2016 +0200
----------------------------------------------------------------------
runners/spark/pom.xml | 35 +++++-------
.../coders/BeamSparkRunnerRegistrator.java | 60 +++++++++++++++-----
.../coders/BeamSparkRunnerRegistratorTest.java | 57 +++++++++++++++++++
3 files changed, 118 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index d1ef225..86e9039 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -54,7 +54,7 @@
</profile>
<profile>
- <!-- This profile adds execution of RunnableOnService integration tests
+ <!-- This profile adds execution of RunnableOnService integration tests
against a local Spark endpoint. -->
<id>local-runnable-on-service-tests</id>
<activation><activeByDefault>false</activeByDefault></activation>
@@ -134,28 +134,14 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
+ <!-- Kryo bugfix version needed due to a state re-use issue in Kryo version 2.21 used in Spark 1.x
+ See: https://issues.apache.org/jira/browse/SPARK-7708
+ See: https://github.com/EsotericSoftware/kryo/issues/312
+ -->
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
- <version>2.21</version>
- <scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>de.javakaffee</groupId>
- <artifactId>kryo-serializers</artifactId>
- <version>0.39</version>
- <exclusions>
- <!-- Use Spark's Kryo -->
- <exclusion>
- <groupId>com.esotericsoftware</groupId>
- <artifactId>kryo</artifactId>
- </exclusion>
- <!-- We only really need the serializer implementations -->
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
+ <version>2.21.1</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
@@ -264,6 +250,11 @@
<artifactId>metrics-core</artifactId>
<version>${dropwizard.metrics.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.reflections</groupId>
+ <artifactId>reflections</artifactId>
+ <version>0.9.10</version>
+ </dependency>
<!-- KafkaIO -->
<dependency>
@@ -405,6 +396,10 @@
<pattern>com.google.thirdparty</pattern>
<shadedPattern>org.apache.beam.spark.relocated.com.google.thirdparty</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>com.esotericsoftware.kryo</pattern>
+ <shadedPattern>org.apache.beam.spark.relocated.com.esotericsoftware.kryo</shadedPattern>
+ </relocation>
</relocations>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>spark-app</shadedClassifierName>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
index 0e62781..41b0a01 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -19,28 +19,60 @@
package org.apache.beam.runners.spark.coders;
import com.esotericsoftware.kryo.Kryo;
-import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer;
-import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
-import de.javakaffee.kryoserializers.guava.ReverseListSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.Source;
import org.apache.spark.serializer.KryoRegistrator;
+import org.reflections.Reflections;
/**
- * Custom {@link com.esotericsoftware.kryo.Serializer}s for Beam's Spark runner needs.
+ * Custom {@link KryoRegistrator}s for Beam's Spark runner needs.
*/
public class BeamSparkRunnerRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo) {
- UnmodifiableCollectionsSerializer.registerSerializers(kryo);
- // Guava
- ImmutableListSerializer.registerSerializers(kryo);
- ImmutableSetSerializer.registerSerializers(kryo);
- ImmutableMapSerializer.registerSerializers(kryo);
- ImmutableMultimapSerializer.registerSerializers(kryo);
- ReverseListSerializer.registerSerializers(kryo);
+ for (Class<?> clazz : ClassesForJavaSerialization.getClasses()) {
+ kryo.register(clazz, new JavaSerializer());
+ }
+ }
+
+ /**
+ * Register coders and sources with {@link JavaSerializer} since they aren't guaranteed to be
+ * Kryo-serializable.
+ */
+ private static class ClassesForJavaSerialization {
+ private static final Class<?>[] CLASSES_FOR_JAVA_SERIALIZATION = new Class<?>[]{
+ Coder.class, Source.class
+ };
+
+ private static final Iterable<Class<?>> INSTANCE;
+
+ /**
+ * Find all subclasses of ${@link CLASSES_FOR_JAVA_SERIALIZATION}
+ */
+ static {
+ final Reflections reflections = new Reflections();
+ INSTANCE = Iterables.concat(Lists.transform(Arrays.asList(CLASSES_FOR_JAVA_SERIALIZATION),
+ new Function<Class, Set<Class<?>>>() {
+ @SuppressWarnings({"unchecked", "ConstantConditions"})
+ @Nullable
+ @Override
+ public Set<Class<?>> apply(@Nullable Class clazz) {
+ return reflections.getSubTypesOf(clazz);
+ }
+ }));
+ }
+
+ static Iterable<Class<?>> getClasses() {
+ return INSTANCE;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aba40e2d/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
new file mode 100644
index 0000000..e353017
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.coders;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.Source;
+import org.hamcrest.Matchers;
+import org.junit.Assert;
+import org.junit.Test;
+import org.reflections.Reflections;
+
+
+/**
+ * BeamSparkRunnerRegistrator Test.
+ */
+public class BeamSparkRunnerRegistratorTest {
+ @Test
+ public void testCodersAndSourcesRegistration() {
+ BeamSparkRunnerRegistrator registrator = new BeamSparkRunnerRegistrator();
+
+ Reflections reflections = new Reflections();
+ Iterable<Class<? extends Serializable>> classesForJavaSerialization =
+ Iterables.concat(reflections.getSubTypesOf(Coder.class),
+ reflections.getSubTypesOf(Source.class));
+
+ Kryo kryo = new Kryo();
+
+ registrator.registerClasses(kryo);
+
+ for (Class<?> clazz : classesForJavaSerialization) {
+ Assert.assertThat("Registered serializer for class " + clazz.getName()
+ + " was not an instance of " + JavaSerializer.class.getName(),
+ kryo.getSerializer(clazz),
+ Matchers.instanceOf(JavaSerializer.class));
+ }
+ }
+}
[2/2] incubator-beam git commit: This closes #1552
Posted by am...@apache.org.
This closes #1552
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bf8a3cb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bf8a3cb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bf8a3cb3
Branch: refs/heads/master
Commit: bf8a3cb3a5948bd1ef7f7e5cef230ecd4e8f1c84
Parents: e841b1a aba40e2
Author: Sela <an...@paypal.com>
Authored: Sun Dec 11 15:19:24 2016 +0200
Committer: Sela <an...@paypal.com>
Committed: Sun Dec 11 15:19:24 2016 +0200
----------------------------------------------------------------------
runners/spark/pom.xml | 35 +++++-------
.../coders/BeamSparkRunnerRegistrator.java | 60 +++++++++++++++-----
.../coders/BeamSparkRunnerRegistratorTest.java | 57 +++++++++++++++++++
3 files changed, 118 insertions(+), 34 deletions(-)
----------------------------------------------------------------------