You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/01/02 11:06:16 UTC

[1/2] beam git commit: [BEAM-1144] Spark runner fails to deserialize MicrobatchSource in cluster mode

Repository: beam
Updated Branches:
  refs/heads/master e8865843b -> e136f12c3


[BEAM-1144] Spark runner fails to deserialize MicrobatchSource in cluster mode


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ffd6ff8
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ffd6ff8
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ffd6ff8

Branch: refs/heads/master
Commit: 1ffd6ff8f25806ea4421662f0bef570fc8e1eeb6
Parents: e886584
Author: Aviem Zur <av...@gmail.com>
Authored: Wed Dec 14 15:19:39 2016 +0200
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Jan 2 11:49:15 2017 +0100

----------------------------------------------------------------------
 runners/spark/pom.xml                           | 11 +--
 .../coders/BeamSparkRunnerRegistrator.java      |  2 +-
 .../spark/coders/StatelessJavaSerializer.java   | 97 ++++++++++++++++++++
 .../coders/BeamSparkRunnerRegistratorTest.java  |  6 +-
 4 files changed, 103 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 95b1d2e..dad5718 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -135,14 +135,11 @@
       <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
-    <!-- Kryo bugfix version needed due to a state re-use issue in Kryo version 2.21 used in Spark 1.x
-    See: https://issues.apache.org/jira/browse/SPARK-7708
-    See: https://github.com/EsotericSoftware/kryo/issues/312
-    -->
     <dependency>
       <groupId>com.esotericsoftware.kryo</groupId>
       <artifactId>kryo</artifactId>
-      <version>2.21.1</version>
+      <version>2.21</version>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>com.google.code.findbugs</groupId>
@@ -387,10 +384,6 @@
                     <pattern>com.google.thirdparty</pattern>
                     <shadedPattern>org.apache.beam.spark.relocated.com.google.thirdparty</shadedPattern>
                   </relocation>
-                  <relocation>
-                    <pattern>com.esotericsoftware.kryo</pattern>
-                    <shadedPattern>org.apache.beam.spark.relocated.com.esotericsoftware.kryo</shadedPattern>
-                  </relocation>
                 </relocations>
                 <shadedArtifactAttached>true</shadedArtifactAttached>
                 <shadedClassifierName>spark-app</shadedClassifierName>

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
index 41b0a01..93217b7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistrator.java
@@ -40,7 +40,7 @@ public class BeamSparkRunnerRegistrator implements KryoRegistrator {
   @Override
   public void registerClasses(Kryo kryo) {
     for (Class<?> clazz : ClassesForJavaSerialization.getClasses()) {
-      kryo.register(clazz, new JavaSerializer());
+      kryo.register(clazz, new StatelessJavaSerializer());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
new file mode 100644
index 0000000..b29cf0c
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/coders/StatelessJavaSerializer.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark.coders;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.KryoException;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+
+
+/**
+ * Stateless Java Serializer.
+ *
+ * <p>
+ * Solves state re-use issue in Kryo version 2.21 used in Spark 1.x
+ * See:
+ * https://issues.apache.org/jira/browse/SPARK-7708
+ * https://github.com/EsotericSoftware/kryo/issues/312
+ * </p>
+ *
+ * <p>
+ * Also, solves class loading issue in cluster caused by ${@link ObjectInputStream}
+ * by using ${@link ObjectInputStreamWithClassLoader}
+ * ${@link ObjectInputStream} uses the last user-defined class loader in the stack which can be the
+ * wrong class loader.
+ * This is a known Java issue and a similar solution is often used.
+ * See:
+ * https://github.com/apache/spark/blob/v1.6.3/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala#L154
+ * https://issues.apache.org/jira/browse/GROOVY-1627
+ * https://github.com/spring-projects/spring-loaded/issues/107
+ * </p>
+ */
+class StatelessJavaSerializer extends Serializer {
+  @SuppressWarnings("unchecked")
+  public void write(Kryo kryo, Output output, Object object) {
+    try {
+      ObjectOutputStream objectStream = new ObjectOutputStream(output);
+      objectStream.writeObject(object);
+      objectStream.flush();
+    } catch (Exception e) {
+      throw new KryoException("Error during Java serialization.", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public Object read (Kryo kryo, Input input, Class type) {
+    try {
+      return new ObjectInputStreamWithClassLoader(input, kryo.getClassLoader()).readObject();
+    } catch (Exception e) {
+      throw new KryoException("Error during Java deserialization.", e);
+    }
+  }
+
+  /**
+   * ObjectInputStream with specific ClassLoader.
+   */
+  private static class ObjectInputStreamWithClassLoader extends ObjectInputStream {
+    private final ClassLoader classLoader;
+
+    ObjectInputStreamWithClassLoader(InputStream in, ClassLoader classLoader) throws IOException {
+      super(in);
+      this.classLoader = classLoader;
+    }
+
+    @Override
+    protected Class<?> resolveClass(ObjectStreamClass desc) {
+      try {
+        return Class.forName(desc.getName(), false, classLoader);
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException("Could not find class: " + desc.getName(), e);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffd6ff8/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
index e353017..0468cd0 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.java
@@ -19,7 +19,7 @@
 package org.apache.beam.runners.spark.coders;
 
 import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
+
 import com.google.common.collect.Iterables;
 import java.io.Serializable;
 import org.apache.beam.sdk.coders.Coder;
@@ -49,9 +49,9 @@ public class BeamSparkRunnerRegistratorTest {
 
     for (Class<?> clazz : classesForJavaSerialization) {
       Assert.assertThat("Registered serializer for class " + clazz.getName()
-              + " was not an instance of " + JavaSerializer.class.getName(),
+          + " was not an instance of " + StatelessJavaSerializer.class.getName(),
           kryo.getSerializer(clazz),
-          Matchers.instanceOf(JavaSerializer.class));
+          Matchers.instanceOf(StatelessJavaSerializer.class));
     }
   }
 }


[2/2] beam git commit: [BEAM-1144] This closes #1613

Posted by jb...@apache.org.
[BEAM-1144] This closes #1613


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e136f12c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e136f12c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e136f12c

Branch: refs/heads/master
Commit: e136f12c37d5e044c03a3bfe34586a65a7ff6f85
Parents: e886584 1ffd6ff
Author: Jean-Baptiste Onofr� <jb...@apache.org>
Authored: Mon Jan 2 12:05:14 2017 +0100
Committer: Jean-Baptiste Onofr� <jb...@apache.org>
Committed: Mon Jan 2 12:05:14 2017 +0100

----------------------------------------------------------------------
 runners/spark/pom.xml                           | 11 +--
 .../coders/BeamSparkRunnerRegistrator.java      |  2 +-
 .../spark/coders/StatelessJavaSerializer.java   | 97 ++++++++++++++++++++
 .../coders/BeamSparkRunnerRegistratorTest.java  |  6 +-
 4 files changed, 103 insertions(+), 13 deletions(-)
----------------------------------------------------------------------