You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/24 20:14:11 UTC

[2/9] beam git commit: Rename WindowingStrategies to WindowingStrategyTranslation

Rename WindowingStrategies to WindowingStrategyTranslation


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c8b2119a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c8b2119a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c8b2119a

Branch: refs/heads/master
Commit: c8b2119ab9a75c7f781ce73ea9352734640a6f46
Parents: 7e37b70
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue May 23 15:32:47 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Tue May 23 15:53:41 2017 -0700

----------------------------------------------------------------------
 .../construction/PCollectionTranslation.java    |   2 +-
 .../core/construction/ParDoTranslation.java     |   2 +-
 .../core/construction/SdkComponents.java        |   2 +-
 .../construction/WindowIntoTranslation.java     |   4 +-
 .../core/construction/WindowingStrategies.java  | 278 -------------------
 .../WindowingStrategyTranslation.java           | 278 +++++++++++++++++++
 .../construction/WindowingStrategiesTest.java   | 110 --------
 .../WindowingStrategyTranslationTest.java       | 111 ++++++++
 .../dataflow/DataflowPipelineTranslator.java    |   4 +-
 9 files changed, 396 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
index 46f714e..303c02d 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PCollectionTranslation.java
@@ -60,7 +60,7 @@ public class PCollectionTranslation {
   public static WindowingStrategy<?, ?> getWindowingStrategy(
       RunnerApi.PCollection pCollection, RunnerApi.Components components)
       throws InvalidProtocolBufferException {
-    return WindowingStrategies.fromProto(
+    return WindowingStrategyTranslation.fromProto(
         components.getWindowingStrategiesOrThrow(pCollection.getWindowingStrategyId()), components);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
index bc5bb0e..28d577f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java
@@ -265,7 +265,7 @@ public class ParDoTranslation {
     RunnerApi.PCollection inputCollection =
         components.getPcollectionsOrThrow(parDoTransform.getInputsOrThrow(id));
     WindowingStrategy<?, ?> windowingStrategy =
-        WindowingStrategies.fromProto(
+        WindowingStrategyTranslation.fromProto(
             components.getWindowingStrategiesOrThrow(inputCollection.getWindowingStrategyId()),
             components);
     Coder<?> elemCoder =

http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 5c81875..b0f164f 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -200,7 +200,7 @@ class SdkComponents {
     String name = uniqify(baseName, windowingStrategyIds.values());
     windowingStrategyIds.put(windowingStrategy, name);
     RunnerApi.WindowingStrategy windowingStrategyProto =
-        WindowingStrategies.toProto(windowingStrategy, this);
+        WindowingStrategyTranslation.toProto(windowingStrategy, this);
     componentsBuilder.putWindowingStrategies(name, windowingStrategyProto);
     return name;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
index 69793b5..215beba 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowIntoTranslation.java
@@ -49,13 +49,13 @@ public class WindowIntoTranslation {
 
   public static WindowIntoPayload toProto(Window.Assign<?> transform, SdkComponents components) {
     return WindowIntoPayload.newBuilder()
-        .setWindowFn(WindowingStrategies.toProto(transform.getWindowFn(), components))
+        .setWindowFn(WindowingStrategyTranslation.toProto(transform.getWindowFn(), components))
         .build();
   }
 
   public static WindowFn<?, ?> getWindowFn(WindowIntoPayload payload)
       throws InvalidProtocolBufferException {
     SdkFunctionSpec spec = payload.getWindowFn();
-    return WindowingStrategies.windowFnFromProto(spec);
+    return WindowingStrategyTranslation.windowFnFromProto(spec);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
deleted file mode 100644
index 8dceebb..0000000
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import java.io.Serializable;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
-import org.joda.time.Duration;
-
-/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
-public class WindowingStrategies implements Serializable {
-
-  public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) {
-    switch (proto) {
-      case DISCARDING:
-        return AccumulationMode.DISCARDING_FIRED_PANES;
-      case ACCUMULATING:
-        return AccumulationMode.ACCUMULATING_FIRED_PANES;
-      case UNRECOGNIZED:
-      default:
-        // Whether or not it is proto that cannot recognize it (due to the version of the
-        // generated code we link to) or the switch hasn't been updated to handle it,
-        // the situation is the same: we don't know what this OutputTime means
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                RunnerApi.AccumulationMode.class.getCanonicalName(),
-                AccumulationMode.class.getCanonicalName(),
-                proto));
-    }
-  }
-
-  public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) {
-    switch (accumulationMode) {
-      case DISCARDING_FIRED_PANES:
-        return RunnerApi.AccumulationMode.DISCARDING;
-      case ACCUMULATING_FIRED_PANES:
-        return RunnerApi.AccumulationMode.ACCUMULATING;
-      default:
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                AccumulationMode.class.getCanonicalName(),
-                RunnerApi.AccumulationMode.class.getCanonicalName(),
-                accumulationMode));
-    }
-  }
-
-  public static RunnerApi.ClosingBehavior toProto(ClosingBehavior closingBehavior) {
-    switch (closingBehavior) {
-      case FIRE_ALWAYS:
-        return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
-      case FIRE_IF_NON_EMPTY:
-        return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY;
-      default:
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                ClosingBehavior.class.getCanonicalName(),
-                RunnerApi.ClosingBehavior.class.getCanonicalName(),
-                closingBehavior));
-    }
-  }
-
-  public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) {
-    switch (proto) {
-      case EMIT_ALWAYS:
-        return ClosingBehavior.FIRE_ALWAYS;
-      case EMIT_IF_NONEMPTY:
-        return ClosingBehavior.FIRE_IF_NON_EMPTY;
-      case UNRECOGNIZED:
-      default:
-        // Whether or not it is proto that cannot recognize it (due to the version of the
-        // generated code we link to) or the switch hasn't been updated to handle it,
-        // the situation is the same: we don't know what this OutputTime means
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                RunnerApi.ClosingBehavior.class.getCanonicalName(),
-                ClosingBehavior.class.getCanonicalName(),
-                proto));
-    }
-  }
-
-  public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
-    switch(timestampCombiner) {
-      case EARLIEST:
-        return OutputTime.EARLIEST_IN_PANE;
-      case END_OF_WINDOW:
-        return OutputTime.END_OF_WINDOW;
-      case LATEST:
-        return OutputTime.LATEST_IN_PANE;
-      default:
-        throw new IllegalArgumentException(
-            String.format(
-                "Unknown %s: %s",
-                TimestampCombiner.class.getSimpleName(),
-                timestampCombiner));
-    }
-  }
-
-  public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) {
-    switch (proto) {
-      case EARLIEST_IN_PANE:
-        return TimestampCombiner.EARLIEST;
-      case END_OF_WINDOW:
-        return TimestampCombiner.END_OF_WINDOW;
-      case LATEST_IN_PANE:
-        return TimestampCombiner.LATEST;
-      case UNRECOGNIZED:
-      default:
-        // Whether or not it is proto that cannot recognize it (due to the version of the
-        // generated code we link to) or the switch hasn't been updated to handle it,
-        // the situation is the same: we don't know what this OutputTime means
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                RunnerApi.OutputTime.class.getCanonicalName(),
-                OutputTime.class.getCanonicalName(),
-                proto));
-    }
-  }
-
-  // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
-  // TODO: standardize such things
-  public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
-
-  /**
-   * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
-   * RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the
-   * input {@link WindowFn}.
-   */
-  public static SdkFunctionSpec toProto(
-      WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) {
-    return SdkFunctionSpec.newBuilder()
-        // TODO: Set environment ID
-        .setSpec(
-            FunctionSpec.newBuilder()
-                .setUrn(CUSTOM_WINDOWFN_URN)
-                .setParameter(
-                    Any.pack(
-                        BytesValue.newBuilder()
-                            .setValue(
-                                ByteString.copyFrom(
-                                    SerializableUtils.serializeToByteArray(windowFn)))
-                            .build())))
-        .build();
-  }
-
-  /**
-   * Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where
-   * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link
-   * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link
-   * WindowingStrategy}.
-   */
-  public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
-      throws IOException {
-    SdkComponents components = SdkComponents.create();
-    RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components);
-
-    return RunnerApi.MessageWithComponents.newBuilder()
-        .setWindowingStrategy(windowingStrategyProto)
-        .setComponents(components.toComponents())
-        .build();
-  }
-
-  /**
-   * Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering
-   * any components in the provided {@link SdkComponents}.
-   */
-  public static RunnerApi.WindowingStrategy toProto(
-      WindowingStrategy<?, ?> windowingStrategy, SdkComponents components) throws IOException {
-    SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components);
-
-    RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
-        RunnerApi.WindowingStrategy.newBuilder()
-            .setOutputTime(toProto(windowingStrategy.getTimestampCombiner()))
-            .setAccumulationMode(toProto(windowingStrategy.getMode()))
-            .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
-            .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
-            .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
-            .setWindowFn(windowFnSpec)
-            .setWindowCoderId(
-                components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
-
-    return windowingStrategyProto.build();
-  }
-
-  /**
-   * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link Components}
-   * to the SDK's {@link WindowingStrategy}.
-   */
-  public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto)
-      throws InvalidProtocolBufferException {
-    switch (proto.getRootCase()) {
-      case WINDOWING_STRATEGY:
-        return fromProto(proto.getWindowingStrategy(), proto.getComponents());
-      default:
-        throw new IllegalArgumentException(
-            String.format(
-                "Expected a %s with components but received %s",
-                RunnerApi.WindowingStrategy.class.getCanonicalName(), proto));
-    }
-  }
-
-  /**
-   * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using
-   * the provided components to dereferences identifiers found in the proto.
-   */
-  public static WindowingStrategy<?, ?> fromProto(
-      RunnerApi.WindowingStrategy proto, Components components)
-      throws InvalidProtocolBufferException {
-
-    SdkFunctionSpec windowFnSpec = proto.getWindowFn();
-    WindowFn<?, ?> windowFn = windowFnFromProto(windowFnSpec);
-    TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
-    AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
-    Trigger trigger = Triggers.fromProto(proto.getTrigger());
-    ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
-    Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
-
-    return WindowingStrategy.of(windowFn)
-        .withAllowedLateness(allowedLateness)
-        .withMode(accumulationMode)
-        .withTrigger(trigger)
-        .withTimestampCombiner(timestampCombiner)
-        .withClosingBehavior(closingBehavior);
-  }
-
-  public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)
-      throws InvalidProtocolBufferException {
-    checkArgument(
-        windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
-        "Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
-        WindowFn.class.getSimpleName(),
-        CUSTOM_WINDOWFN_URN,
-        windowFnSpec.getSpec().getUrn());
-
-    Object deserializedWindowFn =
-        SerializableUtils.deserializeFromByteArray(
-            windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
-            "WindowFn");
-
-    return (WindowFn<?, ?>) deserializedWindowFn;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
new file mode 100644
index 0000000..061f309
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.io.Serializable;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.OutputTime;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
+import org.joda.time.Duration;
+
+/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
+public class WindowingStrategyTranslation implements Serializable {
+
+  public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) {
+    switch (proto) {
+      case DISCARDING:
+        return AccumulationMode.DISCARDING_FIRED_PANES;
+      case ACCUMULATING:
+        return AccumulationMode.ACCUMULATING_FIRED_PANES;
+      case UNRECOGNIZED:
+      default:
+        // Whether or not it is proto that cannot recognize it (due to the version of the
+        // generated code we link to) or the switch hasn't been updated to handle it,
+        // the situation is the same: we don't know what this OutputTime means
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                RunnerApi.AccumulationMode.class.getCanonicalName(),
+                AccumulationMode.class.getCanonicalName(),
+                proto));
+    }
+  }
+
+  public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) {
+    switch (accumulationMode) {
+      case DISCARDING_FIRED_PANES:
+        return RunnerApi.AccumulationMode.DISCARDING;
+      case ACCUMULATING_FIRED_PANES:
+        return RunnerApi.AccumulationMode.ACCUMULATING;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                AccumulationMode.class.getCanonicalName(),
+                RunnerApi.AccumulationMode.class.getCanonicalName(),
+                accumulationMode));
+    }
+  }
+
+  public static RunnerApi.ClosingBehavior toProto(ClosingBehavior closingBehavior) {
+    switch (closingBehavior) {
+      case FIRE_ALWAYS:
+        return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
+      case FIRE_IF_NON_EMPTY:
+        return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                ClosingBehavior.class.getCanonicalName(),
+                RunnerApi.ClosingBehavior.class.getCanonicalName(),
+                closingBehavior));
+    }
+  }
+
+  public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) {
+    switch (proto) {
+      case EMIT_ALWAYS:
+        return ClosingBehavior.FIRE_ALWAYS;
+      case EMIT_IF_NONEMPTY:
+        return ClosingBehavior.FIRE_IF_NON_EMPTY;
+      case UNRECOGNIZED:
+      default:
+        // Whether or not it is proto that cannot recognize it (due to the version of the
+        // generated code we link to) or the switch hasn't been updated to handle it,
+        // the situation is the same: we don't know what this OutputTime means
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                RunnerApi.ClosingBehavior.class.getCanonicalName(),
+                ClosingBehavior.class.getCanonicalName(),
+                proto));
+    }
+  }
+
+  public static RunnerApi.OutputTime toProto(TimestampCombiner timestampCombiner) {
+    switch(timestampCombiner) {
+      case EARLIEST:
+        return OutputTime.EARLIEST_IN_PANE;
+      case END_OF_WINDOW:
+        return OutputTime.END_OF_WINDOW;
+      case LATEST:
+        return OutputTime.LATEST_IN_PANE;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Unknown %s: %s",
+                TimestampCombiner.class.getSimpleName(),
+                timestampCombiner));
+    }
+  }
+
+  public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime proto) {
+    switch (proto) {
+      case EARLIEST_IN_PANE:
+        return TimestampCombiner.EARLIEST;
+      case END_OF_WINDOW:
+        return TimestampCombiner.END_OF_WINDOW;
+      case LATEST_IN_PANE:
+        return TimestampCombiner.LATEST;
+      case UNRECOGNIZED:
+      default:
+        // Whether or not it is proto that cannot recognize it (due to the version of the
+        // generated code we link to) or the switch hasn't been updated to handle it,
+        // the situation is the same: we don't know what this OutputTime means
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                RunnerApi.OutputTime.class.getCanonicalName(),
+                OutputTime.class.getCanonicalName(),
+                proto));
+    }
+  }
+
+  // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
+  // TODO: standardize such things
+  public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
+
+  /**
+   * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
+   * RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec} for the
+   * input {@link WindowFn}.
+   */
+  public static SdkFunctionSpec toProto(
+      WindowFn<?, ?> windowFn, @SuppressWarnings("unused") SdkComponents components) {
+    return SdkFunctionSpec.newBuilder()
+        // TODO: Set environment ID
+        .setSpec(
+            FunctionSpec.newBuilder()
+                .setUrn(CUSTOM_WINDOWFN_URN)
+                .setParameter(
+                    Any.pack(
+                        BytesValue.newBuilder()
+                            .setValue(
+                                ByteString.copyFrom(
+                                    SerializableUtils.serializeToByteArray(windowFn)))
+                            .build())))
+        .build();
+  }
+
+  /**
+   * Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where
+   * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link
+   * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link
+   * WindowingStrategy}.
+   */
+  public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
+      throws IOException {
+    SdkComponents components = SdkComponents.create();
+    RunnerApi.WindowingStrategy windowingStrategyProto = toProto(windowingStrategy, components);
+
+    return RunnerApi.MessageWithComponents.newBuilder()
+        .setWindowingStrategy(windowingStrategyProto)
+        .setComponents(components.toComponents())
+        .build();
+  }
+
+  /**
+   * Converts a {@link WindowingStrategy} into a {@link RunnerApi.WindowingStrategy}, registering
+   * any components in the provided {@link SdkComponents}.
+   */
+  public static RunnerApi.WindowingStrategy toProto(
+      WindowingStrategy<?, ?> windowingStrategy, SdkComponents components) throws IOException {
+    SdkFunctionSpec windowFnSpec = toProto(windowingStrategy.getWindowFn(), components);
+
+    RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
+        RunnerApi.WindowingStrategy.newBuilder()
+            .setOutputTime(toProto(windowingStrategy.getTimestampCombiner()))
+            .setAccumulationMode(toProto(windowingStrategy.getMode()))
+            .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
+            .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
+            .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
+            .setWindowFn(windowFnSpec)
+            .setWindowCoderId(
+                components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
+
+    return windowingStrategyProto.build();
+  }
+
+  /**
+   * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link Components}
+   * to the SDK's {@link WindowingStrategy}.
+   */
+  public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto)
+      throws InvalidProtocolBufferException {
+    switch (proto.getRootCase()) {
+      case WINDOWING_STRATEGY:
+        return fromProto(proto.getWindowingStrategy(), proto.getComponents());
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Expected a %s with components but received %s",
+                RunnerApi.WindowingStrategy.class.getCanonicalName(), proto));
+    }
+  }
+
+  /**
+   * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using
+   * the provided components to dereferences identifiers found in the proto.
+   */
+  public static WindowingStrategy<?, ?> fromProto(
+      RunnerApi.WindowingStrategy proto, Components components)
+      throws InvalidProtocolBufferException {
+
+    SdkFunctionSpec windowFnSpec = proto.getWindowFn();
+    WindowFn<?, ?> windowFn = windowFnFromProto(windowFnSpec);
+    TimestampCombiner timestampCombiner = timestampCombinerFromProto(proto.getOutputTime());
+    AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
+    Trigger trigger = Triggers.fromProto(proto.getTrigger());
+    ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
+    Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
+
+    return WindowingStrategy.of(windowFn)
+        .withAllowedLateness(allowedLateness)
+        .withMode(accumulationMode)
+        .withTrigger(trigger)
+        .withTimestampCombiner(timestampCombiner)
+        .withClosingBehavior(closingBehavior);
+  }
+
+  public static WindowFn<?, ?> windowFnFromProto(SdkFunctionSpec windowFnSpec)
+      throws InvalidProtocolBufferException {
+    checkArgument(
+        windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
+        "Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
+        WindowFn.class.getSimpleName(),
+        CUSTOM_WINDOWFN_URN,
+        windowFnSpec.getSpec().getUrn());
+
+    Object deserializedWindowFn =
+        SerializableUtils.deserializeFromByteArray(
+            windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+            "WindowFn");
+
+    return (WindowFn<?, ?>) deserializedWindowFn;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
deleted file mode 100644
index 7296a77..0000000
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.construction;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.values.WindowingStrategy;
-import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Unit tests for {@link WindowingStrategy}. */
-@RunWith(Parameterized.class)
-public class WindowingStrategiesTest {
-
-  // Each spec activates tests of all subsets of its fields
-  @AutoValue
-  abstract static class ToProtoAndBackSpec {
-    abstract WindowingStrategy getWindowingStrategy();
-  }
-
-  private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowingStrategy) {
-    return new AutoValue_WindowingStrategiesTest_ToProtoAndBackSpec(windowingStrategy);
-  }
-
-  private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN =
-      FixedWindows.of(Duration.millis(12));
-
-  private static final Trigger REPRESENTATIVE_TRIGGER = AfterWatermark.pastEndOfWindow();
-
-  @Parameters(name = "{index}: {0}")
-  public static Iterable<ToProtoAndBackSpec> data() {
-    return ImmutableList.of(
-        toProtoAndBackSpec(WindowingStrategy.globalDefault()),
-        toProtoAndBackSpec(
-            WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
-                .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
-                .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
-                .withTrigger(REPRESENTATIVE_TRIGGER)
-                .withAllowedLateness(Duration.millis(71))
-                .withTimestampCombiner(TimestampCombiner.EARLIEST)),
-        toProtoAndBackSpec(
-            WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
-                .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
-                .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
-                .withTrigger(REPRESENTATIVE_TRIGGER)
-                .withAllowedLateness(Duration.millis(93))
-                .withTimestampCombiner(TimestampCombiner.LATEST)));
-  }
-
-  @Parameter(0)
-  public ToProtoAndBackSpec toProtoAndBackSpec;
-
-  @Test
-  public void testToProtoAndBack() throws Exception {
-    WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
-    WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy =
-        WindowingStrategies.fromProto(WindowingStrategies.toProto(windowingStrategy));
-
-    assertThat(
-        toProtoAndBackWindowingStrategy,
-        equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
-  }
-
-  @Test
-  public void testToProtoAndBackWithComponents() throws Exception {
-    WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
-    SdkComponents components = SdkComponents.create();
-    RunnerApi.WindowingStrategy proto =
-        WindowingStrategies.toProto(windowingStrategy, components);
-    RunnerApi.Components protoComponents = components.toComponents();
-
-    assertThat(
-        WindowingStrategies.fromProto(proto, protoComponents).fixDefaults(),
-        Matchers.<WindowingStrategy<?, ?>>equalTo(windowingStrategy.fixDefaults()));
-
-    protoComponents.getCodersOrThrow(
-        components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
new file mode 100644
index 0000000..1e52803
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslationTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Unit tests for {@link WindowingStrategy}. */
+@RunWith(Parameterized.class)
+public class WindowingStrategyTranslationTest {
+
+  // Each spec activates tests of all subsets of its fields
+  @AutoValue
+  abstract static class ToProtoAndBackSpec {
+    abstract WindowingStrategy getWindowingStrategy();
+  }
+
+  private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowingStrategy) {
+    return new AutoValue_WindowingStrategyTranslationTest_ToProtoAndBackSpec(windowingStrategy);
+  }
+
+  private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN =
+      FixedWindows.of(Duration.millis(12));
+
+  private static final Trigger REPRESENTATIVE_TRIGGER = AfterWatermark.pastEndOfWindow();
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<ToProtoAndBackSpec> data() {
+    return ImmutableList.of(
+        toProtoAndBackSpec(WindowingStrategy.globalDefault()),
+        toProtoAndBackSpec(
+            WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+                .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
+                .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+                .withTrigger(REPRESENTATIVE_TRIGGER)
+                .withAllowedLateness(Duration.millis(71))
+                .withTimestampCombiner(TimestampCombiner.EARLIEST)),
+        toProtoAndBackSpec(
+            WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+                .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
+                .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+                .withTrigger(REPRESENTATIVE_TRIGGER)
+                .withAllowedLateness(Duration.millis(93))
+                .withTimestampCombiner(TimestampCombiner.LATEST)));
+  }
+
+  @Parameter(0)
+  public ToProtoAndBackSpec toProtoAndBackSpec;
+
+  @Test
+  public void testToProtoAndBack() throws Exception {
+    WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
+    WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy =
+        WindowingStrategyTranslation.fromProto(
+            WindowingStrategyTranslation.toProto(windowingStrategy));
+
+    assertThat(
+        toProtoAndBackWindowingStrategy,
+        equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
+  }
+
+  @Test
+  public void testToProtoAndBackWithComponents() throws Exception {
+    WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
+    SdkComponents components = SdkComponents.create();
+    RunnerApi.WindowingStrategy proto =
+        WindowingStrategyTranslation.toProto(windowingStrategy, components);
+    RunnerApi.Components protoComponents = components.toComponents();
+
+    assertThat(
+        WindowingStrategyTranslation.fromProto(proto, protoComponents).fixDefaults(),
+        Matchers.<WindowingStrategy<?, ?>>equalTo(windowingStrategy.fixDefaults()));
+
+    protoComponents.getCodersOrThrow(
+        components.registerCoder(windowingStrategy.getWindowFn().windowCoder()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c8b2119a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 6d7a0f8..af93ef5 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -56,7 +56,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.construction.WindowingStrategies;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
 import org.apache.beam.runners.dataflow.BatchViewOverrides.GroupByKeyAndSortValuesOnly;
 import org.apache.beam.runners.dataflow.DataflowRunner.CombineGroupedValues;
 import org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory.ParDoSingle;
@@ -124,7 +124,7 @@ public class DataflowPipelineTranslator {
 
   private static byte[] serializeWindowingStrategy(WindowingStrategy<?, ?> windowingStrategy) {
     try {
-      return WindowingStrategies.toProto(windowingStrategy).toByteArray();
+      return WindowingStrategyTranslation.toProto(windowingStrategy).toByteArray();
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Unable to format windowing strategy %s as bytes", windowingStrategy), e);