You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2017/01/02 13:52:33 UTC
[1/2] beam git commit: [BEAM-1146] Decrease spark runner startup
overhead
Repository: beam
Updated Branches:
refs/heads/master e136f12c3 -> ee69825ea
[BEAM-1146] Decrease spark runner startup overhead
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0fe004d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0fe004d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0fe004d
Branch: refs/heads/master
Commit: d0fe004db98b4f1743939f357da4193bc3759f77
Parents: e136f12
Author: Aviem Zur <av...@gmail.com>
Authored: Mon Jan 2 14:42:47 2017 +0200
Committer: Aviem Zur <av...@gmail.com>
Committed: Mon Jan 2 14:52:37 2017 +0200
----------------------------------------------------------------------
runners/spark/pom.xml | 5 --
.../coders/BeamSparkRunnerRegistrator.java | 48 ++---------------
.../coders/BeamSparkRunnerRegistratorTest.java | 57 --------------------
.../streaming/KafkaStreamingTest.java | 57 +++++++++++++++++++-
4 files changed, 59 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d0fe004d/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index dad5718..d9d45dd 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -247,11 +247,6 @@
<artifactId>metrics-core</artifactId>
<version>${dropwizard.metrics.version}</version>
</dependency>
- <dependency>
- <groupId>org.reflections</groupId>
- <artifactId>reflections</artifactId>
- <version>0.9.10</version>
- </dependency>
<!-- KafkaIO -->
<dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/d0fe004d/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
index 93217b7..9d63ab0 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -19,17 +19,8 @@
package org.apache.beam.runners.spark.coders;
import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import java.util.Arrays;
-import java.util.Set;
-import javax.annotation.Nullable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.Source;
+import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.spark.serializer.KryoRegistrator;
-import org.reflections.Reflections;
/**
@@ -39,40 +30,7 @@ public class BeamSparkRunnerRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo) {
- for (Class<?> clazz : ClassesForJavaSerialization.getClasses()) {
- kryo.register(clazz, new StatelessJavaSerializer());
- }
- }
-
- /**
- * Register coders and sources with {@link JavaSerializer} since they aren't guaranteed to be
- * Kryo-serializable.
- */
- private static class ClassesForJavaSerialization {
- private static final Class<?>[] CLASSES_FOR_JAVA_SERIALIZATION = new Class<?>[]{
- Coder.class, Source.class
- };
-
- private static final Iterable<Class<?>> INSTANCE;
-
- /**
- * Find all subclasses of ${@link CLASSES_FOR_JAVA_SERIALIZATION}
- */
- static {
- final Reflections reflections = new Reflections();
- INSTANCE = Iterables.concat(Lists.transform(Arrays.asList(CLASSES_FOR_JAVA_SERIALIZATION),
- new Function<Class, Set<Class<?>>>() {
- @SuppressWarnings({"unchecked", "ConstantConditions"})
- @Nullable
- @Override
- public Set<Class<?>> apply(@Nullable Class clazz) {
- return reflections.getSubTypesOf(clazz);
- }
- }));
- }
-
- static Iterable<Class<?>> getClasses() {
- return INSTANCE;
- }
+ // MicrobatchSource is serialized as data and may not be Kryo-serializable.
+ kryo.register(MicrobatchSource.class, new StatelessJavaSerializer());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/d0fe004d/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
deleted file mode 100644
index 0468cd0..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.coders;
-
-import com.esotericsoftware.kryo.Kryo;
-
-import com.google.common.collect.Iterables;
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.Source;
-import org.hamcrest.Matchers;
-import org.junit.Assert;
-import org.junit.Test;
-import org.reflections.Reflections;
-
-
-/**
- * BeamSparkRunnerRegistrator Test.
- */
-public class BeamSparkRunnerRegistratorTest {
- @Test
- public void testCodersAndSourcesRegistration() {
- BeamSparkRunnerRegistrator registrator = new BeamSparkRunnerRegistrator();
-
- Reflections reflections = new Reflections();
- Iterable<Class<? extends Serializable>> classesForJavaSerialization =
- Iterables.concat(reflections.getSubTypesOf(Coder.class),
- reflections.getSubTypesOf(Source.class));
-
- Kryo kryo = new Kryo();
-
- registrator.registerClasses(kryo);
-
- for (Class<?> clazz : classesForJavaSerialization) {
- Assert.assertThat("Registered serializer for class " + clazz.getName()
- + " was not an instance of " + StatelessJavaSerializer.class.getName(),
- kryo.getSerializer(clazz),
- Matchers.instanceOf(StatelessJavaSerializer.class));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/d0fe004d/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
index 6be92d0..0853e9f 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java
@@ -17,8 +17,13 @@
*/
package org.apache.beam.runners.spark.translation.streaming;
+import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectStreamException;
+import java.io.OutputStream;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@@ -30,6 +35,9 @@ import org.apache.beam.runners.spark.translation.streaming.utils.KafkaWriteOnBat
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
@@ -52,6 +60,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+
/**
* Test Kafka as input.
*/
@@ -163,7 +172,7 @@ public class KafkaStreamingTest {
.withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
.withTopics(Collections.singletonList(topic))
.withKeyCoder(StringUtf8Coder.of())
- .withValueCoder(StringUtf8Coder.of())
+ .withValueCoder(NonKryoSerializableStringCoder.of())
.updateConsumerProperties(consumerProps);
PCollection<String> formatted =
@@ -212,4 +221,50 @@ public class KafkaStreamingTest {
}
}
+ /**
+ * This coder is not Kryo serializable, used to make sure
+ * {@link org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator} registers needed
+ * classes to ensure Java serialization is used instead.
+ */
+ private static class NonKryoSerializableStringCoder extends CustomCoder<String>
+ implements Serializable {
+ private Coder<String> stringCoder;
+ private Boolean isSerialized = false;
+
+ private NonKryoSerializableStringCoder() {
+ }
+
+ @JsonCreator
+ public static NonKryoSerializableStringCoder of() {
+ return new NonKryoSerializableStringCoder();
+ }
+
+ private Object readResolve() throws ObjectStreamException {
+ NonKryoSerializableStringCoder deserialized = new NonKryoSerializableStringCoder();
+ deserialized.stringCoder = StringUtf8Coder.of();
+ deserialized.isSerialized = true;
+ return deserialized;
+ }
+
+ private Object writeReplace() throws ObjectStreamException {
+ return new NonKryoSerializableStringCoder();
+ }
+
+ @Override
+ public void encode(String value, OutputStream outStream, Context context)
+ throws CoderException, IOException {
+ if (!isSerialized) {
+ this.stringCoder = StringUtf8Coder.of();
+ }
+ stringCoder.encode(value, outStream, context);
+ }
+
+ @Override
+ public String decode(InputStream inStream, Context context) throws CoderException, IOException {
+ if (!isSerialized) {
+ this.stringCoder = StringUtf8Coder.of();
+ }
+ return stringCoder.decode(inStream, context);
+ }
+ }
}
[2/2] beam git commit: This closes #1674
Posted by am...@apache.org.
This closes #1674
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee69825e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee69825e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee69825e
Branch: refs/heads/master
Commit: ee69825ea7984deeb05dba05e0f98b4ae9866d35
Parents: e136f12 d0fe004
Author: Sela <an...@paypal.com>
Authored: Mon Jan 2 15:30:38 2017 +0200
Committer: Sela <an...@paypal.com>
Committed: Mon Jan 2 15:30:38 2017 +0200
----------------------------------------------------------------------
runners/spark/pom.xml | 5 --
.../coders/BeamSparkRunnerRegistrator.java | 48 ++---------------
.../coders/BeamSparkRunnerRegistratorTest.java | 57 --------------------
.../streaming/KafkaStreamingTest.java | 57 +++++++++++++++++++-
4 files changed, 59 insertions(+), 108 deletions(-)
----------------------------------------------------------------------