You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/29 03:43:47 UTC
[1/3] beam git commit: This closes #2773
Repository: beam
Updated Branches:
refs/heads/master 8cc4d59c2 -> 2b6cb8ca1
This closes #2773
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2b6cb8ca
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2b6cb8ca
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2b6cb8ca
Branch: refs/heads/master
Commit: 2b6cb8ca17a8a87aa9db2e5725f018ef564f27d7
Parents: 8cc4d59 e2b5d6e
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 28 20:16:21 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 28 20:16:21 2017 -0700
----------------------------------------------------------------------
runners/core-construction-java/pom.xml | 5 ----
.../beam/runners/core/construction/Coders.java | 30 +++++++++++---------
.../runners/core/construction/CodersTest.java | 3 +-
.../beam/runners/core/BeamFnDataReadRunner.java | 14 ++++-----
.../runners/core/BeamFnDataWriteRunner.java | 15 +++++-----
.../control/ProcessBundleHandlerTest.java | 28 +++++++++++-------
.../runners/core/BeamFnDataReadRunnerTest.java | 12 ++++----
.../runners/core/BeamFnDataWriteRunnerTest.java | 12 ++++----
8 files changed, 62 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Use Runner API Encodings in the Fn API
Posted by tg...@apache.org.
Use Runner API Encodings in the Fn API
Removes uses of Coder.toCloudObject
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e2b5d6ea
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e2b5d6ea
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e2b5d6ea
Branch: refs/heads/master
Commit: e2b5d6ea8a5d41ac27245a4999f59a73dfe24c43
Parents: ed0b63a
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 28 18:06:38 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 28 20:16:21 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/BeamFnDataReadRunner.java | 14 +++++-----
.../runners/core/BeamFnDataWriteRunner.java | 15 +++++------
.../control/ProcessBundleHandlerTest.java | 28 ++++++++++++--------
.../runners/core/BeamFnDataReadRunnerTest.java | 12 ++++-----
.../runners/core/BeamFnDataWriteRunnerTest.java | 12 ++++-----
5 files changed, 43 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
index 034ef84..805d480 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataReadRunner.java
@@ -21,7 +21,6 @@ package org.apache.beam.runners.core;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
@@ -30,8 +29,9 @@ import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
@@ -73,12 +73,12 @@ public class BeamFnDataReadRunner<OutputT> {
this.beamFnDataClientFactory = beamFnDataClientFactory;
this.consumers = ImmutableList.copyOf(FluentIterable.concat(outputMap.values()));
+ MessageWithComponents runnerApiCoder =
+ coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class);
@SuppressWarnings("unchecked")
- Coder<WindowedValue<OutputT>> coder = Serializer.deserialize(
- OBJECT_MAPPER.readValue(
- coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(),
- Map.class),
- Coder.class);
+ Coder<WindowedValue<OutputT>> coder =
+ (Coder<WindowedValue<OutputT>>)
+ Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents());
this.coder = coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
index 54fd626..0ba09e3 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/runners/core/BeamFnDataWriteRunner.java
@@ -19,15 +19,14 @@
package org.apache.beam.runners.core;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
-import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.MessageWithComponents;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -61,12 +60,12 @@ public class BeamFnDataWriteRunner<InputT> {
this.processBundleInstructionIdSupplier = processBundleInstructionIdSupplier;
this.outputTarget = outputTarget;
+ MessageWithComponents runnerApiCoder =
+ coderSpec.getFunctionSpec().getData().unpack(MessageWithComponents.class);
@SuppressWarnings("unchecked")
- Coder<WindowedValue<InputT>> coder = Serializer.deserialize(
- OBJECT_MAPPER.readValue(
- coderSpec.getFunctionSpec().getData().unpack(BytesValue.class).getValue().newInput(),
- Map.class),
- Coder.class);
+ Coder<WindowedValue<InputT>> coder =
+ (Coder<WindowedValue<InputT>>)
+ Coders.fromProto(runnerApiCoder.getCoder(), runnerApiCoder.getComponents());
this.coder = coder;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
index bd2fba9..5987267 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java
@@ -62,6 +62,7 @@ import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
import org.apache.beam.runners.dataflow.util.DoFnInfo;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
@@ -106,18 +107,23 @@ public class ProcessBundleHandlerTest {
static {
try {
STRING_CODER_SPEC =
- BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
- .setId(STRING_CODER_SPEC_ID)
- .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(STRING_CODER.asCloudObject()))).build())))
- .build();
+ BeamFnApi.Coder.newBuilder()
+ .setFunctionSpec(
+ BeamFnApi.FunctionSpec.newBuilder()
+ .setId(STRING_CODER_SPEC_ID)
+ .setData(Any.pack(Coders.toProto(STRING_CODER))))
+ .build();
LONG_CODER_SPEC =
- BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
- .setId(STRING_CODER_SPEC_ID)
- .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(WindowedValue.getFullCoder(
- VarLongCoder.of(), GlobalWindow.Coder.INSTANCE).asCloudObject()))).build())))
- .build();
+ BeamFnApi.Coder.newBuilder()
+ .setFunctionSpec(
+ BeamFnApi.FunctionSpec.newBuilder()
+ .setId(STRING_CODER_SPEC_ID)
+ .setData(
+ Any.pack(
+ Coders.toProto(
+ WindowedValue.getFullCoder(
+ VarLongCoder.of(), GlobalWindow.Coder.INSTANCE)))))
+ .build();
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
index 0cc5ef9..0d036fe 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataReadRunnerTest.java
@@ -32,8 +32,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -48,6 +46,7 @@ import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.test.TestExecutors;
import org.apache.beam.fn.harness.test.TestExecutors.TestExecutorService;
import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -76,10 +75,11 @@ public class BeamFnDataReadRunnerTest {
private static final BeamFnApi.Coder CODER_SPEC;
static {
try {
- CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
- .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(CODER.asCloudObject()))).build())))
- .build();
+ CODER_SPEC =
+ BeamFnApi.Coder.newBuilder()
+ .setFunctionSpec(
+ BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER))))
+ .build();
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e2b5d6ea/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
index 378567a..50fee7a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/runners/core/BeamFnDataWriteRunnerTest.java
@@ -30,14 +30,13 @@ import static org.mockito.Mockito.when;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.CloseableThrowingConsumer;
import org.apache.beam.fn.v1.BeamFnApi;
+import org.apache.beam.runners.core.construction.Coders;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -64,10 +63,11 @@ public class BeamFnDataWriteRunnerTest {
private static final BeamFnApi.Coder CODER_SPEC;
static {
try {
- CODER_SPEC = BeamFnApi.Coder.newBuilder().setFunctionSpec(BeamFnApi.FunctionSpec.newBuilder()
- .setData(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(CODER.asCloudObject()))).build())))
- .build();
+ CODER_SPEC =
+ BeamFnApi.Coder.newBuilder()
+ .setFunctionSpec(
+ BeamFnApi.FunctionSpec.newBuilder().setData(Any.pack(Coders.toProto(CODER))))
+ .build();
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
[3/3] beam git commit: Java-Serialize Coders Directly
Posted by tg...@apache.org.
Java-Serialize Coders Directly
Known Coders will be serialized via a known path. Unknown
Coders will be serialized as a java object.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ed0b63a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ed0b63a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ed0b63a4
Branch: refs/heads/master
Commit: ed0b63a4c1c55d58b94ab7574c14d221486f434a
Parents: 8cc4d59
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 28 18:06:13 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Apr 28 20:16:21 2017 -0700
----------------------------------------------------------------------
runners/core-construction-java/pom.xml | 5 ----
.../beam/runners/core/construction/Coders.java | 30 +++++++++++---------
.../runners/core/construction/CodersTest.java | 3 +-
3 files changed, 19 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index 854fdc1..b38e602 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -70,11 +70,6 @@
</dependency>
<dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
index 8793df4..094f21f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.BiMap;
import com.google.common.collect.ImmutableBiMap;
@@ -44,18 +43,15 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow.IntervalWindowCoder;
-import org.apache.beam.sdk.util.CloudObject;
-import org.apache.beam.sdk.util.Serializer;
+import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
/** Converts to and from Beam Runner API representations of {@link Coder Coders}. */
public class Coders {
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
// This URN says that the coder is just a UDF blob this SDK understands
// TODO: standardize such things
- public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
+ public static final String JAVA_SERIALIZED_CODER_URN = "urn:beam:coders:javasdk:0.1";
// The URNs for coders which are shared across languages
@VisibleForTesting
@@ -71,6 +67,15 @@ public class Coders {
.put(FullWindowedValueCoder.class, "urn:beam:coders:windowed_value:0.1")
.build();
+ public static RunnerApi.MessageWithComponents toProto(Coder<?> coder) throws IOException {
+ SdkComponents components = SdkComponents.create();
+ RunnerApi.Coder coderProto = toProto(coder, components);
+ return RunnerApi.MessageWithComponents.newBuilder()
+ .setCoder(coderProto)
+ .setComponents(components.toComponents())
+ .build();
+ }
+
public static RunnerApi.Coder toProto(
Coder<?> coder, @SuppressWarnings("unused") SdkComponents components) throws IOException {
if (KNOWN_CODER_URNS.containsKey(coder.getClass())) {
@@ -108,13 +113,13 @@ public class Coders {
SdkFunctionSpec.newBuilder()
.setSpec(
FunctionSpec.newBuilder()
- .setUrn(CUSTOM_CODER_URN)
+ .setUrn(JAVA_SERIALIZED_CODER_URN)
.setParameter(
Any.pack(
BytesValue.newBuilder()
.setValue(
ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(coder.asCloudObject())))
+ SerializableUtils.serializeToByteArray(coder)))
.build()))))
.build();
}
@@ -122,7 +127,7 @@ public class Coders {
public static Coder<?> fromProto(RunnerApi.Coder protoCoder, Components components)
throws IOException {
String coderSpecUrn = protoCoder.getSpec().getSpec().getUrn();
- if (coderSpecUrn.equals(CUSTOM_CODER_URN)) {
+ if (coderSpecUrn.equals(JAVA_SERIALIZED_CODER_URN)) {
return fromCustomCoder(protoCoder, components);
}
return fromKnownCoder(protoCoder, components);
@@ -165,8 +170,8 @@ public class Coders {
private static Coder<?> fromCustomCoder(
RunnerApi.Coder protoCoder, @SuppressWarnings("unused") Components components)
throws IOException {
- CloudObject coderCloudObject =
- OBJECT_MAPPER.readValue(
+ return (Coder<?>)
+ SerializableUtils.deserializeFromByteArray(
protoCoder
.getSpec()
.getSpec()
@@ -174,7 +179,6 @@ public class Coders {
.unpack(BytesValue.class)
.getValue()
.toByteArray(),
- CloudObject.class);
- return Serializer.deserialize(coderCloudObject, Coder.class);
+ protoCoder.getSpec().getSpec().getUrn());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/ed0b63a4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
index ca0fdc9..ecd0fa5 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CodersTest.java
@@ -133,7 +133,8 @@ public class CodersTest {
if (KNOWN_CODERS.contains(coder)) {
for (RunnerApi.Coder encodedCoder : encodedComponents.getCodersMap().values()) {
assertThat(
- encodedCoder.getSpec().getSpec().getUrn(), not(equalTo(Coders.CUSTOM_CODER_URN)));
+ encodedCoder.getSpec().getSpec().getUrn(),
+ not(equalTo(Coders.JAVA_SERIALIZED_CODER_URN)));
}
}
}