You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/12 16:20:53 UTC
[1/2] beam git commit: Use SdkComponents in WindowingStrategy.toProto
Repository: beam
Updated Branches:
refs/heads/master 7e603d5c7 -> 571631a5e
Use SdkComponents in WindowingStrategy.toProto
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e0fd0a22
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e0fd0a22
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e0fd0a22
Branch: refs/heads/master
Commit: e0fd0a222510b733b98d6c33c694431139bbc40d
Parents: 7e603d5
Author: Thomas Groh <tg...@google.com>
Authored: Fri Apr 7 13:41:29 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 12 09:20:43 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/construction/Coders.java | 6 +-
.../core/construction/SdkComponents.java | 5 +-
.../core/construction/WindowingStrategies.java | 94 ++++++++------------
.../core/construction/SdkComponentsTest.java | 6 +-
.../construction/WindowingStrategiesTest.java | 18 ++++
5 files changed, 66 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
index d890de7..7b96240 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Coders.java
@@ -76,8 +76,10 @@ public class Coders {
private static RunnerApi.Coder toKnownCoder(Coder<?> coder, SdkComponents components)
throws IOException {
List<String> componentIds = new ArrayList<>();
- for (Coder<?> componentCoder : coder.getCoderArguments()) {
- componentIds.add(components.registerCoder(componentCoder));
+ if (coder.getCoderArguments() != null) {
+ for (Coder<?> componentCoder : coder.getCoderArguments()) {
+ componentIds.add(components.registerCoder(componentCoder));
+ }
}
return RunnerApi.Coder.newBuilder()
.addAllComponentCoderIds(componentIds)
http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 5cb0a00..03f3a03 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -96,7 +96,7 @@ class SdkComponents {
* unique ID for the {@link WindowingStrategy}. Multiple registrations of the same {@link
* WindowingStrategy} will return the same unique ID.
*/
- String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
+ String registerWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) throws IOException {
String existing = windowingStrategyIds.get(windowingStrategy);
if (existing != null) {
return existing;
@@ -108,6 +108,9 @@ class SdkComponents {
NameUtils.approximateSimpleName(windowingStrategy.getWindowFn()));
String name = uniqify(baseName, windowingStrategyIds.values());
windowingStrategyIds.put(windowingStrategy, name);
+ RunnerApi.WindowingStrategy windowingStrategyProto =
+ WindowingStrategies.toProto(windowingStrategy, this);
+ componentsBuilder.putWindowingStrategies(name, windowingStrategyProto);
return name;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
index 353be05..6d721b0 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -19,15 +19,12 @@ package org.apache.beam.runners.core.construction;
import static com.google.common.base.Preconditions.checkArgument;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.io.Serializable;
-import java.util.UUID;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
@@ -127,62 +124,30 @@ public class WindowingStrategies implements Serializable {
}
}
- // This URN says that the coder is just a UDF blob the indicated SDK understands
- // TODO: standardize such things
- public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
-
// This URN says that the WindowFn is just a UDF blob the indicated SDK understands
// TODO: standardize such things
public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
/**
- * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where
- * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link FunctionSpec}
- * for the input {@link WindowFn}.
+ * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
+ * RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the
+ * input {@link WindowFn}.
*/
- public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn)
+ public static SdkFunctionSpec toProto(
+ WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components)
throws IOException {
- Coder<?> windowCoder = windowFn.windowCoder();
-
- // TODO: re-use components
- String windowCoderId = UUID.randomUUID().toString();
-
- SdkFunctionSpec windowFnSpec =
- SdkFunctionSpec.newBuilder()
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(CUSTOM_WINDOWFN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(windowFn)))
- .build())))
- .build();
-
- RunnerApi.Coder windowCoderProto =
- RunnerApi.Coder.newBuilder()
- .setSpec(
- SdkFunctionSpec.newBuilder()
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(CUSTOM_CODER_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(
- windowCoder.asCloudObject())))
- .build()))))
- .build();
-
- return RunnerApi.MessageWithComponents.newBuilder()
- .setSdkFunctionSpec(windowFnSpec)
- .setComponents(Components.newBuilder().putCoders(windowCoderId, windowCoderProto))
+ return SdkFunctionSpec.newBuilder()
+ // TODO: Set environment ID
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(CUSTOM_WINDOWFN_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(windowFn)))
+ .build())))
.build();
}
@@ -194,9 +159,22 @@ public class WindowingStrategies implements Serializable {
*/
public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
throws IOException {
+ SdkComponents components = SdkComponents.create();
+ RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components);
- RunnerApi.MessageWithComponents windowFnWithComponents =
- toProto(windowingStrategy.getWindowFn());
+ return RunnerApi.MessageWithComponents.newBuilder()
+ .setWindowingStrategy(windowingStrategyProto)
+ .setComponents(components.toComponents())
+ .build();
+ }
+
+ /**
+ * Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering
+ * any components in the provided {@link SdkComponents}.
+ */
+ public static RunnerApi.WindowingStrategy toProto(
+ WindowingStrategy<?, ?> windowingStrategy, SdkComponents components) throws IOException {
+ SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components);
RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
RunnerApi.WindowingStrategy.newBuilder()
@@ -205,11 +183,11 @@ public class WindowingStrategies implements Serializable {
.setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
.setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
.setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
- .setWindowFn(windowFnWithComponents.getSdkFunctionSpec());
+ .setWindowFn(windowFnSpec)
+ .setWindowCoderId(
+ components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
- return RunnerApi.MessageWithComponents.newBuilder()
- .setWindowingStrategy(windowingStrategyProto)
- .setComponents(windowFnWithComponents.getComponents()).build();
+ return windowingStrategyProto.build();
}
/**
http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index 28b4911..ef4b16b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -132,15 +132,17 @@ public class SdkComponentsTest {
}
@Test
- public void registerWindowingStrategy() {
+ public void registerWindowingStrategy() throws IOException {
WindowingStrategy<?, ?> strategy =
WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
String name = components.registerWindowingStrategy(strategy);
assertThat(name, not(isEmptyOrNullString()));
+
+ components.toComponents().getWindowingStrategiesOrThrow(name);
}
@Test
- public void registerWindowingStrategyIdEqualStrategies() {
+ public void registerWindowingStrategyIdEqualStrategies() throws IOException {
WindowingStrategy<?, ?> strategy =
WindowingStrategy.globalDefault().withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
String name = components.registerWindowingStrategy(strategy);
http://git-wip-us.apache.org/repos/asf/beam/blob/e0fd0a22/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
index b603d65..62bba8e 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertThat;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
@@ -30,6 +31,7 @@ import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -89,4 +91,20 @@ public class WindowingStrategiesTest {
toProtoAndBackWindowingStrategy,
equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
}
+
+ @Test
+ public void testToProtoAndBackWithComponents() throws Exception {
+ WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
+ SdkComponents components = SdkComponents.create();
+ RunnerApi.WindowingStrategy proto =
+ WindowingStrategies.toProto(windowingStrategy, components);
+ RunnerApi.Components protoComponents = components.toComponents();
+
+ assertThat(
+ WindowingStrategies.fromProto(proto, protoComponents).fixDefaults(),
+ Matchers.<WindowingStrategy<?, ?>>equalTo(windowingStrategy.fixDefaults()));
+
+ protoComponents.getCodersOrThrow(
+ components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
+ }
}
[2/2] beam git commit: This closes #2490
Posted by dh...@apache.org.
This closes #2490
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/571631a5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/571631a5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/571631a5
Branch: refs/heads/master
Commit: 571631a5ef83ef31ea9f652c5698cea3cc95ad43
Parents: 7e603d5 e0fd0a2
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 12 09:20:46 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 12 09:20:46 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/construction/Coders.java | 6 +-
.../core/construction/SdkComponents.java | 5 +-
.../core/construction/WindowingStrategies.java | 94 ++++++++------------
.../core/construction/SdkComponentsTest.java | 6 +-
.../construction/WindowingStrategiesTest.java | 18 ++++
5 files changed, 66 insertions(+), 63 deletions(-)
----------------------------------------------------------------------