You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/01/02 11:06:16 UTC
[1/2] beam git commit: [BEAM-1144] Spark runner fails to deserialize
MicrobatchSource in cluster mode
Repository: beam
Updated Branches:
refs/heads/master e8865843b -> e136f12c3
[BEAM-1144] Spark runner fails to deserialize MicrobatchSource in cluster mode
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ffd6ff8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ffd6ff8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ffd6ff8
Branch: refs/heads/master
Commit: 1ffd6ff8f25806ea4421662f0bef570fc8e1eeb6
Parents: e886584
Author: Aviem Zur <av...@gmail.com>
Authored: Wed Dec 14 15:19:39 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Jan 2 11:49:15 2017 +0100
----------------------------------------------------------------------
runners/spark/pom.xml | 11 +--
.../coders/BeamSparkRunnerRegistrator.java | 2 +-
.../spark/coders/StatelessJavaSerializer.java | 97 ++++++++++++++++++++
.../coders/BeamSparkRunnerRegistratorTest.java | 6 +-
4 files changed, 103 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 95b1d2e..dad5718 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -135,14 +135,11 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
- <!-- Kryo bugfix version needed due to a state re-use issue in Kryo version 2.21 used in Spark 1.x
- See: https://issues.apache.org/jira/browse/SPARK-7708
- See: https://github.com/EsotericSoftware/kryo/issues/312
- -->
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
- <version>2.21.1</version>
+ <version>2.21</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
@@ -387,10 +384,6 @@
<pattern>com.google.thirdparty</pattern>
<shadedPattern>org.apache.beam.spark.relocated.com.google.thirdparty</shadedPattern>
</relocation>
- <relocation>
- <pattern>com.esotericsoftware.kryo</pattern>
- <shadedPattern>org.apache.beam.spark.relocated.com.esotericsoftware.kryo</shadedPattern>
- </relocation>
</relocations>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>spark-app</shadedClassifierName>
http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
index 41b0a01..93217b7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -40,7 +40,7 @@ public class BeamSparkRunnerRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo) {
for (Class<?> clazz : ClassesForJavaSerialization.getClasses()) {
- kryo.register(clazz, new JavaSerializer());
+ kryo.register(clazz, new StatelessJavaSerializer());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
new file mode 100644
index 0000000..b29cf0c
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.coders;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+
+
+/**
+ * Stateless Java Serializer.
+ *
+ * <p>
+ * Solves state re-use issue in Kryo version 2.21 used in Spark 1.x
+ * See:
+ * https://issues.apache.org/jira/browse/SPARK-7708
+ * https://github.com/EsotericSoftware/kryo/issues/312
+ * </p>
+ *
+ * <p>
+ * Also, solves class loading issue in cluster caused by ${@link ObjectInputStream}
+ * by using ${@link ObjectInputStreamWithClassLoader}
+ * ${@link ObjectInputStream} uses the last user-defined class loader in the stack which can be the
+ * wrong class loader.
+ * This is a known Java issue and a similar solution is often used.
+ * See:
+ * https://github.com/apache/spark/blob/v1.6.3/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala#L154
+ * https://issues.apache.org/jira/browse/GROOVY-1627
+ * https://github.com/spring-projects/spring-loaded/issues/107
+ * </p>
+ */
+class StatelessJavaSerializer extends Serializer {
+ @SuppressWarnings("unchecked")
+ public void write(Kryo kryo, Output output, Object object) {
+ try {
+ ObjectOutputStream objectStream = new ObjectOutputStream(output);
+ objectStream.writeObject(object);
+ objectStream.flush();
+ } catch (Exception e) {
+ throw new KryoException("Error during Java serialization.", e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public Object read (Kryo kryo, Input input, Class type) {
+ try {
+ return new ObjectInputStreamWithClassLoader(input, kryo.getClassLoader()).readObject();
+ } catch (Exception e) {
+ throw new KryoException("Error during Java deserialization.", e);
+ }
+ }
+
+ /**
+ * ObjectInputStream with specific ClassLoader.
+ */
+ private static class ObjectInputStreamWithClassLoader extends ObjectInputStream {
+ private final ClassLoader classLoader;
+
+ ObjectInputStreamWithClassLoader(InputStream in, ClassLoader classLoader) throws IOException {
+ super(in);
+ this.classLoader = classLoader;
+ }
+
+ @Override
+ protected Class<?> resolveClass(ObjectStreamClass desc) {
+ try {
+ return Class.forName(desc.getName(), false, classLoader);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException("Could not find class: " + desc.getName(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
index e353017..0468cd0 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
@@ -19,7 +19,7 @@
package org.apache.beam.runners.spark.coders;
import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
import com.google.common.collect.Iterables;
import java.io.Serializable;
import org.apache.beam.sdk.coders.Coder;
@@ -49,9 +49,9 @@ public class BeamSparkRunnerRegistratorTest {
for (Class<?> clazz : classesForJavaSerialization) {
Assert.assertThat("Registered serializer for class " + clazz.getName()
- + " was not an instance of " + JavaSerializer.class.getName(),
+ + " was not an instance of " + StatelessJavaSerializer.class.getName(),
kryo.getSerializer(clazz),
- Matchers.instanceOf(JavaSerializer.class));
+ Matchers.instanceOf(StatelessJavaSerializer.class));
}
}
}
[2/2] beam git commit: [BEAM-1144] This closes #1613
Posted by jb...@apache.org.
[BEAM-1144] This closes #1613
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e136f12c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e136f12c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e136f12c
Branch: refs/heads/master
Commit: e136f12c37d5e044c03a3bfe34586a65a7ff6f85
Parents: e886584 1ffd6ff
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Jan 2 12:05:14 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Jan 2 12:05:14 2017 +0100
----------------------------------------------------------------------
runners/spark/pom.xml | 11 +--
.../coders/BeamSparkRunnerRegistrator.java | 2 +-
.../spark/coders/StatelessJavaSerializer.java | 97 ++++++++++++++++++++
.../coders/BeamSparkRunnerRegistratorTest.java | 6 +-
4 files changed, 103 insertions(+), 13 deletions(-)
----------------------------------------------------------------------