You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by am...@apache.org on 2016/10/26 16:09:59 UTC
[1/2] incubator-beam git commit: [BEAM-809] Create a KryoRegistrator
for the SparkRunner.
Repository: incubator-beam
Updated Branches:
refs/heads/master 53fe3ee42 -> 78e2c0387
[BEAM-809] Create a KryoRegistrator for the SparkRunner.
Use Class#getName() instead of canonicalName().
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/13b83858
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/13b83858
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/13b83858
Branch: refs/heads/master
Commit: 13b83858746356068a6d618e04da6839e837d28c
Parents: 53fe3ee
Author: Sela <an...@paypal.com>
Authored: Mon Oct 24 22:35:39 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 26 18:53:28 2016 +0300
----------------------------------------------------------------------
runners/spark/pom.xml | 23 ++++++++++
.../coders/BeamSparkRunnerRegistrator.java | 46 ++++++++++++++++++++
.../spark/translation/SparkContextFactory.java | 5 ++-
3 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index ccec3c6..458205a 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -147,6 +147,29 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>2.21</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>de.javakaffee</groupId>
+ <artifactId>kryo-serializers</artifactId>
+ <version>0.39</version>
+ <exclusions>
+ <!-- Use Spark's Kryo -->
+ <exclusion>
+ <groupId>com.esotericsoftware</groupId>
+ <artifactId>kryo</artifactId>
+ </exclusion>
+ <!-- We only really need the serializer implementations -->
+ <exclusion>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>1.3.9</version>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
new file mode 100644
index 0000000..0e62781
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.coders;
+
+import com.esotericsoftware.kryo.Kryo;
+import de.javakaffee.kryoserializers.UnmodifiableCollectionsSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableListSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableMultimapSerializer;
+import de.javakaffee.kryoserializers.guava.ImmutableSetSerializer;
+import de.javakaffee.kryoserializers.guava.ReverseListSerializer;
+import org.apache.spark.serializer.KryoRegistrator;
+
+
+/**
+ * Custom {@link com.esotericsoftware.kryo.Serializer}s for Beam's Spark runner needs.
+ */
+public class BeamSparkRunnerRegistrator implements KryoRegistrator {
+
+ @Override
+ public void registerClasses(Kryo kryo) {
+ UnmodifiableCollectionsSerializer.registerSerializers(kryo);
+ // Guava
+ ImmutableListSerializer.registerSerializers(kryo);
+ ImmutableSetSerializer.registerSerializers(kryo);
+ ImmutableMapSerializer.registerSerializers(kryo);
+ ImmutableMultimapSerializer.registerSerializers(kryo);
+ ReverseListSerializer.registerSerializers(kryo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/13b83858/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index 4877f6e..ee2104a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -19,6 +19,7 @@
package org.apache.beam.runners.spark.translation;
import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.coders.BeamSparkRunnerRegistrator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.serializer.KryoSerializer;
@@ -85,7 +86,9 @@ public final class SparkContextFactory {
conf.setMaster(options.getSparkMaster());
}
conf.setAppName(options.getAppName());
- conf.set("spark.serializer", KryoSerializer.class.getCanonicalName());
+ // register immutable collections serializers because the SDK uses them.
+ conf.set("spark.kryo.registrator", BeamSparkRunnerRegistrator.class.getName());
+ conf.set("spark.serializer", KryoSerializer.class.getName());
return new JavaSparkContext(conf);
}
}
[2/2] incubator-beam git commit: This closes #1171
Posted by am...@apache.org.
This closes #1171
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78e2c038
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78e2c038
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78e2c038
Branch: refs/heads/master
Commit: 78e2c038766063081a8c045607c74ad158afdf68
Parents: 53fe3ee 13b8385
Author: Sela <an...@paypal.com>
Authored: Wed Oct 26 18:54:09 2016 +0300
Committer: Sela <an...@paypal.com>
Committed: Wed Oct 26 18:54:09 2016 +0300
----------------------------------------------------------------------
runners/spark/pom.xml | 23 ++++++++++
.../coders/BeamSparkRunnerRegistrator.java | 46 ++++++++++++++++++++
.../spark/translation/SparkContextFactory.java | 5 ++-
3 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------