You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/29 03:43:49 UTC
[3/3] beam git commit: Java-Serialize Coders Directly
Java-Serialize Coders Directly
Known Coders will be serialized via a known path. Unknown
Coders will be serialized as a java object.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ed0b63a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ed0b63a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ed0b63a4
Branch: refs/heads/master
Commit: ed0b63a4c1c55d58b94ab7574c14d221486f434a
Parents: 8cc4d59
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 28 18:06:13 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 28 20:16:21 2017 -0700
----------------------------------------------------------------------
runners/core-construction-java/pom.xml | 5 ----
.../beam/runners/core/construction/Coders.java | 30 +++++++++++---------
.../runners/core/construction/CodersTest.java | 3 +-
3 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 854fdc1..b38e602 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -70,11 +70,6 @@
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
index 8793df4..094f21f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
@@ -44,18 +43,15 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
/** Converts to and from Beam Runner API representations of {@link Coder Coders}. */
public class Coders {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
// This URN says that the coder is just a UDF blob this SDK understands
// TODO: standardize such things
- public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
+ public static final String JAVA_SERIALIZED_CODER_URN = "urn:beam:coders:javasdk:0.1";
// The URNs for coders which are shared across languages
@VisibleForTesting
@@ -71,6 +67,15 @@ public class Coders {
.put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1")
.build();
+ public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOException {
+ SdkComponents components = SdkComponents.create();
+ RunnerApi.Coder coderProto = toProto(coder, components);
+ return RunnerApi.MessageWithComponents.newBuilder()
+ .setCoder(coderProto)
+ .setComponents(components.toComponents())
+ .build();
+ }
+
public static RunnerApi.Coder toProto(
Coder<?> coder, @SuppressWarnings("unused") SdkComponents components) throws IOException {
if (KNOWN_CODER_URNS.containsKey(coder.getClass())) {
@@ -108,13 +113,13 @@ public class Coders {
SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
- .setUrn(CUSTOM_CODER_URN)
+ .setUrn(JAVA_SERIALIZED_CODER_URN)
.setParameter(
Any.pack(
BytesValue.newBuilder()
.setValue(
ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(coder.asCloudObject())))
+ SerializableUtils.serializeToByteArray(coder)))
.build()))))
.build();
}
@@ -122,7 +127,7 @@ public class Coders {
public static Coder<?> fromProto(RunnerApi.Coder protoCoder, Components components)
throws IOException {
String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn();
- if (coderSpecUrn.equals(CUSTOM_CODER_URN)) {
+ if (coderSpecUrn.equals(JAVA_SERIALIZED_CODER_URN)) {
return fromCustomCoder(protoCoder, components);
}
return fromKnownCoder(protoCoder, components);
@@ -165,8 +170,8 @@ public class Coders {
private static Coder<?> fromCustomCoder(
RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components components)
throws IOException {
- CloudObject coderCloudObject =
- OBJECT_MAPPER.readValue(
+ return (Coder<?>)
+ SerializableUtils.deserializeFromByteArray(
protoCoder
.getSpec()
.getSpec()
@@ -174,7 +179,6 @@ public class Coders {
.unpack(BytesValue.class)
.getValue()
.toByteArray(),
- CloudObject.class);
- return Serializer.deserialize(coderCloudObject, Coder.class);
+ protoCoder.getSpec().getSpec().getUrn());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index ca0fdc9..ecd0fa5 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -133,7 +133,8 @@ public class CodersTest {
if (KNOWN_CODERS.contains(coder)) {
for (RunnerApi.Coder encodedCoder : encodedComponents.getCodersMap().values()) {
assertThat(
- encodedCoder.getSpec().getSpec().getUrn(), not(equalTo(Coders.CUSTOM_CODER_URN)));
+ encodedCoder.getSpec().getSpec().getUrn(),
+ not(equalTo(Coders.JAVA_SERIALIZED_CODER_URN)));
}
}
}