You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2017/06/08 01:35:30 UTC
[41/50] beam git commit: [BEAM-2407] Fix Flink CoderTyperSerializer
ConfigSnapshot
[BEAM-2407] Fix Flink CoderTyperSerializer ConfigSnapshot
Before, the config snapshot was not deserializable because there was no
default constructor and read()/write() where not implemented.
This also changes the compatibility-check logic to compare the class
name of the Coder to avoid serializing the coder using Java
Serialization.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/62b942a0
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/62b942a0
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/62b942a0
Branch: refs/heads/DSL_SQL
Commit: 62b942a02ec633c172d543946be9cfe0648825ea
Parents: b2de3db
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jun 5 09:10:59 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jun 7 19:43:11 2017 +0200
----------------------------------------------------------------------
.../translation/types/CoderTypeSerializer.java | 41 +++++++++++++++-----
1 file changed, 32 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/62b942a0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
index bea562e..ecfd3fb 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.types;
import java.io.EOFException;
import java.io.IOException;
+import java.util.Objects;
import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper;
import org.apache.beam.runners.flink.translation.wrappers.DataOutputViewWrapper;
import org.apache.beam.sdk.coders.Coder;
@@ -139,24 +140,28 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> {
@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof CoderTypeSerializerConfigSnapshot) {
- if (coder.equals(((CoderTypeSerializerConfigSnapshot<?>) configSnapshot).coder)) {
- return CompatibilityResult.compatible();
- }
+ if (snapshotConfiguration().equals(configSnapshot)) {
+ return CompatibilityResult.compatible();
}
return CompatibilityResult.requiresMigration();
}
/**
- * TypeSerializerConfigSnapshot of CoderTypeSerializer.
+ * TypeSerializerConfigSnapshot of CoderTypeSerializer. This uses the class name of the
+ * {@link Coder} to determine compatibility. This is a bit crude but better than using
+ * Java Serialization to (de)serialize the {@link Coder}.
*/
public static class CoderTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot {
private static final int VERSION = 1;
- private Coder<T> coder;
+ private String coderName;
+
+ public CoderTypeSerializerConfigSnapshot() {
+ // empty constructor for satisfying IOReadableWritable which is used for deserialization
+ }
public CoderTypeSerializerConfigSnapshot(Coder<T> coder) {
- this.coder = coder;
+ this.coderName = coder.getClass().getCanonicalName();
}
@Override
@@ -175,13 +180,31 @@ public class CoderTypeSerializer<T> extends TypeSerializer<T> {
CoderTypeSerializerConfigSnapshot<?> that = (CoderTypeSerializerConfigSnapshot<?>) o;
- return coder != null ? coder.equals(that.coder) : that.coder == null;
+ return coderName != null ? coderName.equals(that.coderName) : that.coderName == null;
+ }
+
+ @Override
+ public void write(DataOutputView out) throws IOException {
+ super.write(out);
+ out.writeUTF(coderName);
+ }
+
+ @Override
+ public void read(DataInputView in) throws IOException {
+ super.read(in);
+ this.coderName = in.readUTF();
}
@Override
public int hashCode() {
- return coder.hashCode();
+ return Objects.hash(coderName);
}
}
+ @Override
+ public String toString() {
+ return "CoderTypeSerializer{"
+ + "coder=" + coder
+ + '}';
+ }
}