You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/24 20:14:11 UTC
[2/9] beam git commit: Rename WindowingStrategies to
WindowingStrategyTranslation
Rename WindowingStrategies to WindowingStrategyTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c8b2119a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c8b2119a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c8b2119a
Branch: refs/heads/master
Commit: c8b2119ab9a75c7f781ce73ea9352734640a6f46
Parents: 7e37b70
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 15:32:47 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 15:53:41 2017 -0700
----------------------------------------------------------------------
.../construction/PCollectionTranslation.java | 2 +-
.../core/construction/ParDoTranslation.java | 2 +-
.../core/construction/SdkComponents.java | 2 +-
.../construction/WindowIntoTranslation.java | 4 +-
.../core/construction/WindowingStrategies.java | 278 -------------------
.../WindowingStrategyTranslation.java | 278 +++++++++++++++++++
.../construction/WindowingStrategiesTest.java | 110 --------
.../WindowingStrategyTranslationTest.java | 111 ++++++++
.../dataflow/DataflowPipelineTranslator.java | 4 +-
9 files changed, 396 insertions(+), 395 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index 46f714e..303c02d 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -60,7 +60,7 @@ public class PCollectionTranslation {
public static WindowingStrategy<?, ?> getWindowingStrategy(
RunnerApi.PCollection pCollection, RunnerApi.Components components)
throws InvalidProtocolBufferException {
- return WindowingStrategies.fromProto(
+ return WindowingStrategyTranslation.fromProto(
components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()), components);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index bc5bb0e..28d577f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -265,7 +265,7 @@ public class ParDoTranslation {
RunnerApi.PCollection inputCollection =
components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
WindowingStrategy<?, ?> windowingStrategy =
- WindowingStrategies.fromProto(
+ WindowingStrategyTranslation.fromProto(
components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
components);
Coder<?> elemCoder =
http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 5c81875..b0f164f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -200,7 +200,7 @@ class SdkComponents {
String name = uniqify(baseName, windowingStrategyIds.values());
windowingStrategyIds.put(windowingStrategy, name);
RunnerApi.WindowingStrategy windowingStrategyProto =
- WindowingStrategies.toProto(windowingStrategy, this);
+ WindowingStrategyTranslation.toProto(windowingStrategy, this);
componentsBuilder.putWindowingStrategies(name, windowingStrategyProto);
return name;
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index 69793b5..215beba 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -49,13 +49,13 @@ public class WindowIntoTranslation {
public static WindowIntoPayload toProto(Window.Assign<?> transform, SdkComponents components) {
return WindowIntoPayload.newBuilder()
- .setWindowFn(WindowingStrategies.toProto(transform.getWindowFn(), components))
+ .setWindowFn(WindowingStrategyTranslation.toProto(transform.getWindowFn(), components))
.build();
}
public static WindowFn<?, ?> getWindowFn(WindowIntoPayload payload)
throws InvalidProtocolBufferException {
SdkFunctionSpec spec = payload.getWindowFn();
- return WindowingStrategies.windowFnFromProto(spec);
+ return WindowingStrategyTranslation.windowFnFromProto(spec);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
deleted file mode 100644
index 8dceebb..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
-import org.joda.time.Duration;
-
-/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
-public class WindowingStrategies implements Serializable {
-
- public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) {
- switch (proto) {
- case DISCARDING:
- return AccumulationMode.DISCARDING_FIRED_PANES;
- case ACCUMULATING:
- return AccumulationMode.ACCUMULATING_FIRED_PANES;
- case UNRECOGNIZED:
- default:
- // Whether or not it is proto that cannot recognize it (due to the version of the
- // generated code we link to) or the switch hasn't been updated to handle it,
- // the situation is the same: we don't know what this OutputTime means
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- RunnerApi.AccumulationMode.class.getCanonicalName(),
- AccumulationMode.class.getCanonicalName(),
- proto));
- }
- }
-
- public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) {
- switch (accumulationMode) {
- case DISCARDING_FIRED_PANES:
- return RunnerApi.AccumulationMode.DISCARDING;
- case ACCUMULATING_FIRED_PANES:
- return RunnerApi.AccumulationMode.ACCUMULATING;
- default:
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- AccumulationMode.class.getCanonicalName(),
- RunnerApi.AccumulationMode.class.getCanonicalName(),
- accumulationMode));
- }
- }
-
- public static RunnerApi.ClosingBehavior toProto(ClosingBehavior closingBehavior) {
- switch (closingBehavior) {
- case FIRE_ALWAYS:
- return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
- case FIRE_IF_NON_EMPTY:
- return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY;
- default:
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- ClosingBehavior.class.getCanonicalName(),
- RunnerApi.ClosingBehavior.class.getCanonicalName(),
- closingBehavior));
- }
- }
-
- public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) {
- switch (proto) {
- case EMIT_ALWAYS:
- return ClosingBehavior.FIRE_ALWAYS;
- case EMIT_IF_NONEMPTY:
- return ClosingBehavior.FIRE_IF_NON_EMPTY;
- case UNRECOGNIZED:
- default:
- // Whether or not it is proto that cannot recognize it (due to the version of the
- // generated code we link to) or the switch hasn't been updated to handle it,
- // the situation is the same: we don't know what this OutputTime means
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- RunnerApi.ClosingBehavior.class.getCanonicalName(),
- ClosingBehavior.class.getCanonicalName(),
- proto));
- }
- }
-
- public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
- switch(timestampCombiner) {
- case EARLIEST:
- return OutputTime.EARLIEST_IN_PANE;
- case END_OF_WINDOW:
- return OutputTime.END_OF_WINDOW;
- case LATEST:
- return OutputTime.LATEST_IN_PANE;
- default:
- throw new IllegalArgumentException(
- String.format(
- "Unknown %s: %s",
- TimestampCombiner.class.getSimpleName(),
- timestampCombiner));
- }
- }
-
- public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) {
- switch (proto) {
- case EARLIEST_IN_PANE:
- return TimestampCombiner.EARLIEST;
- case END_OF_WINDOW:
- return TimestampCombiner.END_OF_WINDOW;
- case LATEST_IN_PANE:
- return TimestampCombiner.LATEST;
- case UNRECOGNIZED:
- default:
- // Whether or not it is proto that cannot recognize it (due to the version of the
- // generated code we link to) or the switch hasn't been updated to handle it,
- // the situation is the same: we don't know what this OutputTime means
- throw new IllegalArgumentException(
- String.format(
- "Cannot convert unknown %s to %s: %s",
- RunnerApi.OutputTime.class.getCanonicalName(),
- OutputTime.class.getCanonicalName(),
- proto));
- }
- }
-
- // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
- // TODO: standardize such things
- public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
-
- /**
- * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
- * RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the
- * input {@link WindowFn}.
- */
- public static SdkFunctionSpec toProto(
- WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) {
- return SdkFunctionSpec.newBuilder()
- // TODO: Set environment ID
- .setSpec(
- FunctionSpec.newBuilder()
- .setUrn(CUSTOM_WINDOWFN_URN)
- .setParameter(
- Any.pack(
- BytesValue.newBuilder()
- .setValue(
- ByteString.copyFrom(
- SerializableUtils.serializeToByteArray(windowFn)))
- .build())))
- .build();
- }
-
- /**
- * Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where
- * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link
- * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link
- * WindowingStrategy}.
- */
- public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
- throws IOException {
- SdkComponents components = SdkComponents.create();
- RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components);
-
- return RunnerApi.MessageWithComponents.newBuilder()
- .setWindowingStrategy(windowingStrategyProto)
- .setComponents(components.toComponents())
- .build();
- }
-
- /**
- * Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering
- * any components in the provided {@link SdkComponents}.
- */
- public static RunnerApi.WindowingStrategy toProto(
- WindowingStrategy<?, ?> windowingStrategy, SdkComponents components) throws IOException {
- SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components);
-
- RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
- RunnerApi.WindowingStrategy.newBuilder()
- .setOutputTime(toProto(windowingStrategy.getTimestampCombiner()))
- .setAccumulationMode(toProto(windowingStrategy.getMode()))
- .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
- .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
- .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
- .setWindowFn(windowFnSpec)
- .setWindowCoderId(
- components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
-
- return windowingStrategyProto.build();
- }
-
- /**
- * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link Components}
- * to the SDK's {@link WindowingStrategy}.
- */
- public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto)
- throws InvalidProtocolBufferException {
- switch (proto.getRootCase()) {
- case WINDOWING_STRATEGY:
- return fromProto(proto.getWindowingStrategy(), proto.getComponents());
- default:
- throw new IllegalArgumentException(
- String.format(
- "Expected a %s with components but received %s",
- RunnerApi.WindowingStrategy.class.getCanonicalName(), proto));
- }
- }
-
- /**
- * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using
- * the provided components to dereferences identifiers found in the proto.
- */
- public static WindowingStrategy<?, ?> fromProto(
- RunnerApi.WindowingStrategy proto, Components components)
- throws InvalidProtocolBufferException {
-
- SdkFunctionSpec windowFnSpec = proto.getWindowFn();
- WindowFn<?, ?> windowFn = windowFnFromProto(windowFnSpec);
- TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
- AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
- Trigger trigger = Triggers.fromProto(proto.getTrigger());
- ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
- Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
-
- return WindowingStrategy.of(windowFn)
- .withAllowedLateness(allowedLateness)
- .withMode(accumulationMode)
- .withTrigger(trigger)
- .withTimestampCombiner(timestampCombiner)
- .withClosingBehavior(closingBehavior);
- }
-
- public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)
- throws InvalidProtocolBufferException {
- checkArgument(
- windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
- "Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
- WindowFn.class.getSimpleName(),
- CUSTOM_WINDOWFN_URN,
- windowFnSpec.getSpec().getUrn());
-
- Object deserializedWindowFn =
- SerializableUtils.deserializeFromByteArray(
- windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
- "WindowFn");
-
- return (WindowFn<?, ?>) deserializedWindowFn;
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
new file mode 100644
index 0000000..061f309
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
+import org.joda.time.Duration;
+
+/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
+public class WindowingStrategyTranslation implements Serializable {
+
+ public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) {
+ switch (proto) {
+ case DISCARDING:
+ return AccumulationMode.DISCARDING_FIRED_PANES;
+ case ACCUMULATING:
+ return AccumulationMode.ACCUMULATING_FIRED_PANES;
+ case UNRECOGNIZED:
+ default:
+ // Whether or not it is proto that cannot recognize it (due to the version of the
+ // generated code we link to) or the switch hasn't been updated to handle it,
+ // the situation is the same: we don't know what this OutputTime means
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ RunnerApi.AccumulationMode.class.getCanonicalName(),
+ AccumulationMode.class.getCanonicalName(),
+ proto));
+ }
+ }
+
+ public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) {
+ switch (accumulationMode) {
+ case DISCARDING_FIRED_PANES:
+ return RunnerApi.AccumulationMode.DISCARDING;
+ case ACCUMULATING_FIRED_PANES:
+ return RunnerApi.AccumulationMode.ACCUMULATING;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ AccumulationMode.class.getCanonicalName(),
+ RunnerApi.AccumulationMode.class.getCanonicalName(),
+ accumulationMode));
+ }
+ }
+
+ public static RunnerApi.ClosingBehavior toProto(ClosingBehavior closingBehavior) {
+ switch (closingBehavior) {
+ case FIRE_ALWAYS:
+ return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
+ case FIRE_IF_NON_EMPTY:
+ return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ ClosingBehavior.class.getCanonicalName(),
+ RunnerApi.ClosingBehavior.class.getCanonicalName(),
+ closingBehavior));
+ }
+ }
+
+ public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) {
+ switch (proto) {
+ case EMIT_ALWAYS:
+ return ClosingBehavior.FIRE_ALWAYS;
+ case EMIT_IF_NONEMPTY:
+ return ClosingBehavior.FIRE_IF_NON_EMPTY;
+ case UNRECOGNIZED:
+ default:
+ // Whether or not it is proto that cannot recognize it (due to the version of the
+ // generated code we link to) or the switch hasn't been updated to handle it,
+ // the situation is the same: we don't know what this OutputTime means
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ RunnerApi.ClosingBehavior.class.getCanonicalName(),
+ ClosingBehavior.class.getCanonicalName(),
+ proto));
+ }
+ }
+
+ public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
+ switch(timestampCombiner) {
+ case EARLIEST:
+ return OutputTime.EARLIEST_IN_PANE;
+ case END_OF_WINDOW:
+ return OutputTime.END_OF_WINDOW;
+ case LATEST:
+ return OutputTime.LATEST_IN_PANE;
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown %s: %s",
+ TimestampCombiner.class.getSimpleName(),
+ timestampCombiner));
+ }
+ }
+
+ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) {
+ switch (proto) {
+ case EARLIEST_IN_PANE:
+ return TimestampCombiner.EARLIEST;
+ case END_OF_WINDOW:
+ return TimestampCombiner.END_OF_WINDOW;
+ case LATEST_IN_PANE:
+ return TimestampCombiner.LATEST;
+ case UNRECOGNIZED:
+ default:
+ // Whether or not it is proto that cannot recognize it (due to the version of the
+ // generated code we link to) or the switch hasn't been updated to handle it,
+ // the situation is the same: we don't know what this OutputTime means
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot convert unknown %s to %s: %s",
+ RunnerApi.OutputTime.class.getCanonicalName(),
+ OutputTime.class.getCanonicalName(),
+ proto));
+ }
+ }
+
+ // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
+ // TODO: standardize such things
+ public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
+
+ /**
+ * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
+ * RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the
+ * input {@link WindowFn}.
+ */
+ public static SdkFunctionSpec toProto(
+ WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) {
+ return SdkFunctionSpec.newBuilder()
+ // TODO: Set environment ID
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(CUSTOM_WINDOWFN_URN)
+ .setParameter(
+ Any.pack(
+ BytesValue.newBuilder()
+ .setValue(
+ ByteString.copyFrom(
+ SerializableUtils.serializeToByteArray(windowFn)))
+ .build())))
+ .build();
+ }
+
+ /**
+ * Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where
+ * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link
+ * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link
+ * WindowingStrategy}.
+ */
+ public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
+ throws IOException {
+ SdkComponents components = SdkComponents.create();
+ RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components);
+
+ return RunnerApi.MessageWithComponents.newBuilder()
+ .setWindowingStrategy(windowingStrategyProto)
+ .setComponents(components.toComponents())
+ .build();
+ }
+
+ /**
+ * Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering
+ * any components in the provided {@link SdkComponents}.
+ */
+ public static RunnerApi.WindowingStrategy toProto(
+ WindowingStrategy<?, ?> windowingStrategy, SdkComponents components) throws IOException {
+ SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components);
+
+ RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
+ RunnerApi.WindowingStrategy.newBuilder()
+ .setOutputTime(toProto(windowingStrategy.getTimestampCombiner()))
+ .setAccumulationMode(toProto(windowingStrategy.getMode()))
+ .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
+ .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
+ .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
+ .setWindowFn(windowFnSpec)
+ .setWindowCoderId(
+ components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
+
+ return windowingStrategyProto.build();
+ }
+
+ /**
+ * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link Components}
+ * to the SDK's {@link WindowingStrategy}.
+ */
+ public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto)
+ throws InvalidProtocolBufferException {
+ switch (proto.getRootCase()) {
+ case WINDOWING_STRATEGY:
+ return fromProto(proto.getWindowingStrategy(), proto.getComponents());
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Expected a %s with components but received %s",
+ RunnerApi.WindowingStrategy.class.getCanonicalName(), proto));
+ }
+ }
+
+ /**
+ * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using
+ * the provided components to dereferences identifiers found in the proto.
+ */
+ public static WindowingStrategy<?, ?> fromProto(
+ RunnerApi.WindowingStrategy proto, Components components)
+ throws InvalidProtocolBufferException {
+
+ SdkFunctionSpec windowFnSpec = proto.getWindowFn();
+ WindowFn<?, ?> windowFn = windowFnFromProto(windowFnSpec);
+ TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
+ AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
+ Trigger trigger = Triggers.fromProto(proto.getTrigger());
+ ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
+ Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
+
+ return WindowingStrategy.of(windowFn)
+ .withAllowedLateness(allowedLateness)
+ .withMode(accumulationMode)
+ .withTrigger(trigger)
+ .withTimestampCombiner(timestampCombiner)
+ .withClosingBehavior(closingBehavior);
+ }
+
+ public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)
+ throws InvalidProtocolBufferException {
+ checkArgument(
+ windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
+ "Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
+ WindowFn.class.getSimpleName(),
+ CUSTOM_WINDOWFN_URN,
+ windowFnSpec.getSpec().getUrn());
+
+ Object deserializedWindowFn =
+ SerializableUtils.deserializeFromByteArray(
+ windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+ "WindowFn");
+
+ return (WindowFn<?, ?>) deserializedWindowFn;
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
deleted file mode 100644
index 7296a77..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Unit tests for {@link WindowingStrategy}. */
-@RunWith(Parameterized.class)
-public class WindowingStrategiesTest {
-
- // Each spec activates tests of all subsets of its fields
- @AutoValue
- abstract static class ToProtoAndBackSpec {
- abstract WindowingStrategy getWindowingStrategy();
- }
-
- private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowingStrategy) {
- return new AutoValue_WindowingStrategiesTest_ToProtoAndBackSpec(windowingStrategy);
- }
-
- private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN =
- FixedWindows.of(Duration.millis(12));
-
- private static final Trigger REPRESENTATIVE_TRIGGER = AfterWatermark.pastEndOfWindow();
-
- @Parameters(name = "{index}: {0}")
- public static Iterable<ToProtoAndBackSpec> data() {
- return ImmutableList.of(
- toProtoAndBackSpec(WindowingStrategy.globalDefault()),
- toProtoAndBackSpec(
- WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
- .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
- .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
- .withTrigger(REPRESENTATIVE_TRIGGER)
- .withAllowedLateness(Duration.millis(71))
- .withTimestampCombiner(TimestampCombiner.EARLIEST)),
- toProtoAndBackSpec(
- WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
- .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
- .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
- .withTrigger(REPRESENTATIVE_TRIGGER)
- .withAllowedLateness(Duration.millis(93))
- .withTimestampCombiner(TimestampCombiner.LATEST)));
- }
-
- @Parameter(0)
- public ToProtoAndBackSpec toProtoAndBackSpec;
-
- @Test
- public void testToProtoAndBack() throws Exception {
- WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
- WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy =
- WindowingStrategies.fromProto(WindowingStrategies.toProto(windowingStrategy));
-
- assertThat(
- toProtoAndBackWindowingStrategy,
- equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
- }
-
- @Test
- public void testToProtoAndBackWithComponents() throws Exception {
- WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
- SdkComponents components = SdkComponents.create();
- RunnerApi.WindowingStrategy proto =
- WindowingStrategies.toProto(windowingStrategy, components);
- RunnerApi.Components protoComponents = components.toComponents();
-
- assertThat(
- WindowingStrategies.fromProto(proto, protoComponents).fixDefaults(),
- Matchers.<WindowingStrategy<?, ?>>equalTo(windowingStrategy.fixDefaults()));
-
- protoComponents.getCodersOrThrow(
- components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
- }
-}
http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
new file mode 100644
index 0000000..1e52803
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Unit tests for {@link WindowingStrategy}. */
+@RunWith(Parameterized.class)
+public class WindowingStrategyTranslationTest {
+
+ // Each spec activates tests of all subsets of its fields
+ @AutoValue
+ abstract static class ToProtoAndBackSpec {
+ abstract WindowingStrategy getWindowingStrategy();
+ }
+
+ private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowingStrategy) {
+ return new AutoValue_WindowingStrategyTranslationTest_ToProtoAndBackSpec(windowingStrategy);
+ }
+
+ private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN =
+ FixedWindows.of(Duration.millis(12));
+
+ private static final Trigger REPRESENTATIVE_TRIGGER = AfterWatermark.pastEndOfWindow();
+
+ @Parameters(name = "{index}: {0}")
+ public static Iterable<ToProtoAndBackSpec> data() {
+ return ImmutableList.of(
+ toProtoAndBackSpec(WindowingStrategy.globalDefault()),
+ toProtoAndBackSpec(
+ WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+ .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
+ .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+ .withTrigger(REPRESENTATIVE_TRIGGER)
+ .withAllowedLateness(Duration.millis(71))
+ .withTimestampCombiner(TimestampCombiner.EARLIEST)),
+ toProtoAndBackSpec(
+ WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+ .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
+ .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+ .withTrigger(REPRESENTATIVE_TRIGGER)
+ .withAllowedLateness(Duration.millis(93))
+ .withTimestampCombiner(TimestampCombiner.LATEST)));
+ }
+
+ @Parameter(0)
+ public ToProtoAndBackSpec toProtoAndBackSpec;
+
+ @Test
+ public void testToProtoAndBack() throws Exception {
+ WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
+ WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy =
+ WindowingStrategyTranslation.fromProto(
+ WindowingStrategyTranslation.toProto(windowingStrategy));
+
+ assertThat(
+ toProtoAndBackWindowingStrategy,
+ equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
+ }
+
+ @Test
+ public void testToProtoAndBackWithComponents() throws Exception {
+ WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
+ SdkComponents components = SdkComponents.create();
+ RunnerApi.WindowingStrategy proto =
+ WindowingStrategyTranslation.toProto(windowingStrategy, components);
+ RunnerApi.Components protoComponents = components.toComponents();
+
+ assertThat(
+ WindowingStrategyTranslation.fromProto(proto, protoComponents).fixDefaults(),
+ Matchers.<WindowingStrategy<?, ?>>equalTo(windowingStrategy.fixDefaults()));
+
+ protoComponents.getCodersOrThrow(
+ components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 6d7a0f8..af93ef5 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,7 +56,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
-import org.apache.beam.runners.core.construction.WindowingStrategies;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues;
import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle;
@@ -124,7 +124,7 @@ public class DataflowPipelineTranslator {
private static byte[] serializeWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
try {
- return WindowingStrategies.toProto(windowingStrategy).toByteArray();
+ return WindowingStrategyTranslation.toProto(windowingStrategy).toByteArray();
} catch (Exception e) {
throw new RuntimeException(
String.format("Unable to format windowing strategy %s as bytes", windowingStrategy), e);