You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/11 02:25:04 UTC
[1/2] beam git commit: Move WindowingStrategies to
runners-core-construction
Repository: beam
Updated Branches:
refs/heads/master 7fd9c6516 -> 0a0b1c80c
Move WindowingStrategies to runners-core-construction
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40b36686
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40b36686
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40b36686
Branch: refs/heads/master
Commit: 40b36686bac65995d606fe5cea7aa957d0ef3f9d
Parents: 7fd9c65
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 10 14:35:59 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 10 16:44:23 2017 -0700
----------------------------------------------------------------------
runners/core-construction-java/pom.xml | 17 ++
.../core/construction/WindowingStrategies.java | 268 +++++++++++++++++++
.../construction/WindowingStrategiesTest.java | 92 +++++++
.../beam/sdk/util/WindowingStrategies.java | 267 ------------------
.../beam/sdk/util/WindowingStrategiesTest.java | 91 -------
5 files changed, 377 insertions(+), 358 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/40b36686/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index ee64f91..3f323dd 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -58,17 +58,28 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-common-runner-api</artifactId>
</dependency>
+
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
</dependency>
<dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
@@ -88,6 +99,12 @@
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.google.auto.value</groupId>
+ <artifactId>auto-value</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<!-- test dependencies -->
<dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/40b36686/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
new file mode 100644
index 0000000..353be05
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Triggers;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
+import org.joda.time.Duration;
+
+/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
+public class WindowingStrategies implements Serializable {
+
+ public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) {
+ switch (proto) {
+ case DISCARDING:
+ return AccumulationMode.DISCARDING_FIRED_PANES;
+ case ACCUMULATING:
+ return AccumulationMode.ACCUMULATING_FIRED_PANES;
+ case UNRECOGNIZED:
+ default:
+ // Whether or not it is proto that cannot recognize it (due to the version of the
+ // generated code we link to) or the switch hasn't been updated to handle it,
+ // the situation is the same: we don't know what this OutputTime means
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ RunnerApi.AccumulationMode.class.getCanonicalName(),
+ AccumulationMode.class.getCanonicalName(),
+ proto));
+ }
+ }
+
+ public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) {
+ switch (accumulationMode) {
+ case DISCARDING_FIRED_PANES:
+ return RunnerApi.AccumulationMode.DISCARDING;
+ case ACCUMULATING_FIRED_PANES:
+ return RunnerApi.AccumulationMode.ACCUMULATING;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ AccumulationMode.class.getCanonicalName(),
+ RunnerApi.AccumulationMode.class.getCanonicalName(),
+ accumulationMode));
+ }
+ }
+
+ public static RunnerApi.ClosingBehavior toProto(ClosingBehavior closingBehavior) {
+ switch (closingBehavior) {
+ case FIRE_ALWAYS:
+ return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
+ case FIRE_IF_NON_EMPTY:
+ return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ ClosingBehavior.class.getCanonicalName(),
+ RunnerApi.ClosingBehavior.class.getCanonicalName(),
+ closingBehavior));
+ }
+ }
+
+ public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) {
+ switch (proto) {
+ case EMIT_ALWAYS:
+ return ClosingBehavior.FIRE_ALWAYS;
+ case EMIT_IF_NONEMPTY:
+ return ClosingBehavior.FIRE_IF_NON_EMPTY;
+ case UNRECOGNIZED:
+ default:
+ // Whether or not it is proto that cannot recognize it (due to the version of the
+ // generated code we link to) or the switch hasn't been updated to handle it,
+ // the situation is the same: we don't know what this OutputTime means
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ RunnerApi.ClosingBehavior.class.getCanonicalName(),
+ ClosingBehavior.class.getCanonicalName(),
+ proto));
+ }
+ }
+
+ public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
+ if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) {
+ return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn());
+ } else {
+ return OutputTimeFns.toProto(outputTimeFn);
+ }
+ }
+
+ // This URN says that the coder is just a UDF blob the indicated SDK understands
+ // TODO: standardize such things
+ public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
+
+ // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
+ // TODO: standardize such things
+ public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+ /**
+ * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where
+ * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link FunctionSpec}
+ * for the input {@link WindowFn}.
+ */
+ public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn)
+ throws IOException {
+ Coder<?> windowCoder = windowFn.windowCoder();
+
+ // TODO: re-use components
+ String windowCoderId = UUID.randomUUID().toString();
+
+ SdkFunctionSpec windowFnSpec =
+ SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(CUSTOM_WINDOWFN_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(windowFn)))
+ .build())))
+ .build();
+
+ RunnerApi.Coder windowCoderProto =
+ RunnerApi.Coder.newBuilder()
+ .setSpec(
+ SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(CUSTOM_CODER_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ OBJECT_MAPPER.writeValueAsBytes(
+ windowCoder.asCloudObject())))
+ .build()))))
+ .build();
+
+ return RunnerApi.MessageWithComponents.newBuilder()
+ .setSdkFunctionSpec(windowFnSpec)
+ .setComponents(Components.newBuilder().putCoders(windowCoderId, windowCoderProto))
+ .build();
+ }
+
+ /**
+ * Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where
+ * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link
+ * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link
+ * WindowingStrategy}.
+ */
+ public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
+ throws IOException {
+
+ RunnerApi.MessageWithComponents windowFnWithComponents =
+ toProto(windowingStrategy.getWindowFn());
+
+ RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
+ RunnerApi.WindowingStrategy.newBuilder()
+ .setOutputTime(toProto(windowingStrategy.getOutputTimeFn()))
+ .setAccumulationMode(toProto(windowingStrategy.getMode()))
+ .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
+ .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
+ .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
+ .setWindowFn(windowFnWithComponents.getSdkFunctionSpec());
+
+ return RunnerApi.MessageWithComponents.newBuilder()
+ .setWindowingStrategy(windowingStrategyProto)
+ .setComponents(windowFnWithComponents.getComponents()).build();
+ }
+
+ /**
+ * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link Components}
+ * to the SDK's {@link WindowingStrategy}.
+ */
+ public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto)
+ throws InvalidProtocolBufferException {
+ switch (proto.getRootCase()) {
+ case WINDOWING_STRATEGY:
+ return fromProto(proto.getWindowingStrategy(), proto.getComponents());
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Expected a %s with components but received %s",
+ RunnerApi.WindowingStrategy.class.getCanonicalName(), proto));
+ }
+ }
+
+ /**
+ * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using
+ * the provided components to dereferences identifiers found in the proto.
+ */
+ public static WindowingStrategy<?, ?> fromProto(
+ RunnerApi.WindowingStrategy proto, Components components)
+ throws InvalidProtocolBufferException {
+
+ SdkFunctionSpec windowFnSpec = proto.getWindowFn();
+
+ checkArgument(
+ windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
+ "Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
+ WindowFn.class.getSimpleName(),
+ CUSTOM_WINDOWFN_URN,
+ windowFnSpec.getSpec().getUrn());
+
+ Object deserializedWindowFn =
+ SerializableUtils.deserializeFromByteArray(
+ windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+ "WindowFn");
+
+ WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
+ OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime());
+ AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
+ Trigger trigger = Triggers.fromProto(proto.getTrigger());
+ ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
+ Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
+
+ return WindowingStrategy.of(windowFn)
+ .withAllowedLateness(allowedLateness)
+ .withMode(accumulationMode)
+ .withTrigger(trigger)
+ .withOutputTimeFn(outputTimeFn)
+ .withClosingBehavior(closingBehavior);
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/40b36686/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
new file mode 100644
index 0000000..b603d65
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Unit tests for {@link WindowingStrategy}. */
+@RunWith(Parameterized.class)
+public class WindowingStrategiesTest {
+
+ // Each spec activates tests of all subsets of its fields
+ @AutoValue
+ abstract static class ToProtoAndBackSpec {
+ abstract WindowingStrategy getWindowingStrategy();
+ }
+
+ private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowingStrategy) {
+ return new AutoValue_WindowingStrategiesTest_ToProtoAndBackSpec(windowingStrategy);
+ }
+
+ private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN =
+ FixedWindows.of(Duration.millis(12));
+
+ private static final Trigger REPRESENTATIVE_TRIGGER = AfterWatermark.pastEndOfWindow();
+
+ @Parameters(name = "{index}: {0}")
+ public static Iterable<ToProtoAndBackSpec> data() {
+ return ImmutableList.of(
+ toProtoAndBackSpec(WindowingStrategy.globalDefault()),
+ toProtoAndBackSpec(
+ WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+ .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
+ .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withTrigger(REPRESENTATIVE_TRIGGER)
+ .withAllowedLateness(Duration.millis(71))
+ .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())),
+ toProtoAndBackSpec(
+ WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+ .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
+ .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+ .withTrigger(REPRESENTATIVE_TRIGGER)
+ .withAllowedLateness(Duration.millis(93))
+ .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())));
+ }
+
+ @Parameter(0)
+ public ToProtoAndBackSpec toProtoAndBackSpec;
+
+ @Test
+ public void testToProtoAndBack() throws Exception {
+ WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
+ WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy =
+ WindowingStrategies.fromProto(WindowingStrategies.toProto(windowingStrategy));
+
+ assertThat(
+ toProtoAndBackWindowingStrategy,
+ equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/40b36686/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
deleted file mode 100644
index 9595362..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.UUID;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
-import org.joda.time.Duration;
-
-/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
-public class WindowingStrategies implements Serializable {
-
- public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) {
- switch (proto) {
- case DISCARDING:
- return AccumulationMode.DISCARDING_FIRED_PANES;
- case ACCUMULATING:
- return AccumulationMode.ACCUMULATING_FIRED_PANES;
- case UNRECOGNIZED:
- default:
- // Whether or not it is proto that cannot recognize it (due to the version of the
- // generated code we link to) or the switch hasn't been updated to handle it,
- // the situation is the same: we don't know what this OutputTime means
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- RunnerApi.AccumulationMode.class.getCanonicalName(),
- AccumulationMode.class.getCanonicalName(),
- proto));
- }
- }
-
- public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) {
- switch (accumulationMode) {
- case DISCARDING_FIRED_PANES:
- return RunnerApi.AccumulationMode.DISCARDING;
- case ACCUMULATING_FIRED_PANES:
- return RunnerApi.AccumulationMode.ACCUMULATING;
- default:
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- AccumulationMode.class.getCanonicalName(),
- RunnerApi.AccumulationMode.class.getCanonicalName(),
- accumulationMode));
- }
- }
-
- public static RunnerApi.ClosingBehavior toProto(Window.ClosingBehavior closingBehavior) {
- switch (closingBehavior) {
- case FIRE_ALWAYS:
- return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
- case FIRE_IF_NON_EMPTY:
- return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY;
- default:
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- ClosingBehavior.class.getCanonicalName(),
- RunnerApi.ClosingBehavior.class.getCanonicalName(),
- closingBehavior));
- }
- }
-
- public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) {
- switch (proto) {
- case EMIT_ALWAYS:
- return ClosingBehavior.FIRE_ALWAYS;
- case EMIT_IF_NONEMPTY:
- return ClosingBehavior.FIRE_IF_NON_EMPTY;
- case UNRECOGNIZED:
- default:
- // Whether or not it is proto that cannot recognize it (due to the version of the
- // generated code we link to) or the switch hasn't been updated to handle it,
- // the situation is the same: we don't know what this OutputTime means
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- RunnerApi.ClosingBehavior.class.getCanonicalName(),
- ClosingBehavior.class.getCanonicalName(),
- proto));
- }
- }
-
- public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
- if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) {
- return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn());
- } else {
- return OutputTimeFns.toProto(outputTimeFn);
- }
- }
-
- // This URN says that the coder is just a UDF blob the indicated SDK understands
- // TODO: standardize such things
- public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
-
- // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
- // TODO: standardize such things
- public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
-
- private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
- /**
- * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where
- * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec}
- * for the input {@link WindowFn}.
- */
- public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn)
- throws IOException {
- Coder<?> windowCoder = windowFn.windowCoder();
-
- // TODO: re-use components
- String windowCoderId = UUID.randomUUID().toString();
-
- RunnerApi.SdkFunctionSpec windowFnSpec =
- RunnerApi.SdkFunctionSpec.newBuilder()
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(CUSTOM_WINDOWFN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(windowFn)))
- .build())))
- .build();
-
- RunnerApi.Coder windowCoderProto =
- RunnerApi.Coder.newBuilder()
- .setSpec(
- SdkFunctionSpec.newBuilder()
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(CUSTOM_CODER_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- OBJECT_MAPPER.writeValueAsBytes(
- windowCoder.asCloudObject())))
- .build()))))
- .build();
-
- return RunnerApi.MessageWithComponents.newBuilder()
- .setSdkFunctionSpec(windowFnSpec)
- .setComponents(Components.newBuilder().putCoders(windowCoderId, windowCoderProto))
- .build();
- }
-
- /**
- * Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where
- * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link
- * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link
- * WindowingStrategy}.
- */
- public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
- throws IOException {
-
- RunnerApi.MessageWithComponents windowFnWithComponents =
- toProto(windowingStrategy.getWindowFn());
-
- RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
- RunnerApi.WindowingStrategy.newBuilder()
- .setOutputTime(toProto(windowingStrategy.getOutputTimeFn()))
- .setAccumulationMode(toProto(windowingStrategy.getMode()))
- .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
- .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
- .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
- .setWindowFn(windowFnWithComponents.getSdkFunctionSpec());
-
- return RunnerApi.MessageWithComponents.newBuilder()
- .setWindowingStrategy(windowingStrategyProto)
- .setComponents(windowFnWithComponents.getComponents()).build();
- }
-
- /**
- * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link RunnerApi.Components}
- * to the SDK's {@link WindowingStrategy}.
- */
- public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto)
- throws InvalidProtocolBufferException {
- switch (proto.getRootCase()) {
- case WINDOWING_STRATEGY:
- return fromProto(proto.getWindowingStrategy(), proto.getComponents());
- default:
- throw new IllegalArgumentException(
- String.format(
- "Expected a %s with components but received %s",
- RunnerApi.WindowingStrategy.class.getCanonicalName(), proto));
- }
- }
-
- /**
- * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using
- * the provided components to dereferences identifiers found in the proto.
- */
- public static WindowingStrategy<?, ?> fromProto(
- RunnerApi.WindowingStrategy proto, RunnerApi.Components components)
- throws InvalidProtocolBufferException {
-
- SdkFunctionSpec windowFnSpec = proto.getWindowFn();
-
- checkArgument(
- windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
- "Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
- WindowFn.class.getSimpleName(),
- CUSTOM_WINDOWFN_URN,
- windowFnSpec.getSpec().getUrn());
-
- Object deserializedWindowFn =
- SerializableUtils.deserializeFromByteArray(
- windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
- "WindowFn");
-
- WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
- OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime());
- AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
- Trigger trigger = Triggers.fromProto(proto.getTrigger());
- ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
- Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
-
- return WindowingStrategy.of(windowFn)
- .withAllowedLateness(allowedLateness)
- .withMode(accumulationMode)
- .withTrigger(trigger)
- .withOutputTimeFn(outputTimeFn)
- .withClosingBehavior(closingBehavior);
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/40b36686/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
deleted file mode 100644
index 5d3de51..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Unit tests for {@link WindowingStrategy}. */
-@RunWith(Parameterized.class)
-public class WindowingStrategiesTest {
-
- // Each spec activates tests of all subsets of its fields
- @AutoValue
- abstract static class ToProtoAndBackSpec {
- abstract WindowingStrategy getWindowingStrategy();
- }
-
- private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowingStrategy) {
- return new AutoValue_WindowingStrategiesTest_ToProtoAndBackSpec(windowingStrategy);
- }
-
- private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN =
- FixedWindows.of(Duration.millis(12));
-
- private static final Trigger REPRESENTATIVE_TRIGGER = AfterWatermark.pastEndOfWindow();
-
- @Parameters(name = "{index}: {0}")
- public static Iterable<ToProtoAndBackSpec> data() {
- return ImmutableList.of(
- toProtoAndBackSpec(WindowingStrategy.globalDefault()),
- toProtoAndBackSpec(
- WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
- .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
- .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
- .withTrigger(REPRESENTATIVE_TRIGGER)
- .withAllowedLateness(Duration.millis(71))
- .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())),
- toProtoAndBackSpec(
- WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
- .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
- .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
- .withTrigger(REPRESENTATIVE_TRIGGER)
- .withAllowedLateness(Duration.millis(93))
- .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())));
- }
-
- @Parameter(0)
- public ToProtoAndBackSpec toProtoAndBackSpec;
-
- @Test
- public void testToProtoAndBack() throws Exception {
- WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
- WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy =
- WindowingStrategies.fromProto(WindowingStrategies.toProto(windowingStrategy));
-
- assertThat(
- toProtoAndBackWindowingStrategy,
- equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
- }
-}
[2/2] beam git commit: This closes #2486
Posted by tg...@apache.org.
This closes #2486
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a0b1c80
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a0b1c80
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a0b1c80
Branch: refs/heads/master
Commit: 0a0b1c80c97949954add4b4357fe29409a08197d
Parents: 7fd9c65 40b3668
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 10 19:24:52 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 10 19:24:52 2017 -0700
----------------------------------------------------------------------
runners/core-construction-java/pom.xml | 17 ++
.../core/construction/WindowingStrategies.java | 268 +++++++++++++++++++
.../construction/WindowingStrategiesTest.java | 92 +++++++
.../beam/sdk/util/WindowingStrategies.java | 267 ------------------
.../beam/sdk/util/WindowingStrategiesTest.java | 91 -------
5 files changed, 377 insertions(+), 358 deletions(-)
----------------------------------------------------------------------