You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:30 UTC

[41/50] beam git commit: [BEAM-2407] Fix Flink CoderTyperSerializer ConfigSnapshot

[BEAM-2407] Fix Flink CoderTyperSerializer ConfigSnapshot

Before, the config snapshot was not deserializable because there was no
default constructor and read()/write() where not implemented.

This also changes the compatibility-check logic to compare the class
name of the Coder to avoid serializing the coder using Java
Serialization.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62b942a0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62b942a0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62b942a0

Branch: refs/heads/DSL_SQL
Commit: 62b942a02ec633c172d543946be9cfe0648825ea
Parents: b2de3db
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 5 09:10:59 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jun 7 19:43:11 2017 +0200

----------------------------------------------------------------------
 .../translation/types/CoderTypeSerializer.java  | 41 +++++++++++++++-----
 1 file changed, 32 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/62b942a0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index bea562e..ecfd3fb 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.types;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.Objects;
 import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
 import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
 import org.apache.beam.sdk.coders.Coder;
@@ -139,24 +140,28 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> {
 
   @Override
   public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
-    if (configSnapshot instanceof CoderTypeSerializerConfigSnapshot) {
-      if (coder.equals(((CoderTypeSerializerConfigSnapshot<?>) configSnapshot).coder)) {
-        return CompatibilityResult.compatible();
-      }
+    if (snapshotConfiguration().equals(configSnapshot)) {
+      return CompatibilityResult.compatible();
     }
     return CompatibilityResult.requiresMigration();
   }
 
   /**
-   *  TypeSerializerConfigSnapshot of CoderTypeSerializer.
+   *  TypeSerializerConfigSnapshot of CoderTypeSerializer. This uses the class name of the
+   *  {@link Coder} to determine compatibility. This is a bit crude but better than using
+   *  Java Serialization to (de)serialize the {@link Coder}.
    */
   public static class CoderTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot {
 
     private static final int VERSION = 1;
-    private Coder<T> coder;
+    private String coderName;
+
+    public CoderTypeSerializerConfigSnapshot() {
+      // empty constructor for satisfying IOReadableWritable which is used for deserialization
+    }
 
     public CoderTypeSerializerConfigSnapshot(Coder<T> coder) {
-      this.coder = coder;
+      this.coderName = coder.getClass().getCanonicalName();
     }
 
     @Override
@@ -175,13 +180,31 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> {
 
       CoderTypeSerializerConfigSnapshot<?> that = (CoderTypeSerializerConfigSnapshot<?>) o;
 
-      return coder != null ? coder.equals(that.coder) : that.coder == null;
+      return coderName != null ? coderName.equals(that.coderName) : that.coderName == null;
+    }
+
+    @Override
+    public void write(DataOutputView out) throws IOException {
+      super.write(out);
+      out.writeUTF(coderName);
+    }
+
+    @Override
+    public void read(DataInputView in) throws IOException {
+      super.read(in);
+      this.coderName = in.readUTF();
     }
 
     @Override
     public int hashCode() {
-      return coder.hashCode();
+      return Objects.hash(coderName);
     }
   }
 
+  @Override
+  public String toString() {
+    return "CoderTypeSerializer{"
+        + "coder=" + coder
+        + '}';
+  }
 }