You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/04/11 02:25:04 UTC

[1/2] beam git commit: Move WindowingStrategies to runners-core-construction

Repository: beam
Updated Branches:
  refs/heads/master 7fd9c6516 -> 0a0b1c80c


Move WindowingStrategies to runners-core-construction


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/40b36686
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/40b36686
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/40b36686

Branch: refs/heads/master
Commit: 40b36686bac65995d606fe5cea7aa957d0ef3f9d
Parents: 7fd9c65
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 10 14:35:59 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 10 16:44:23 2017 -0700

----------------------------------------------------------------------
 runners/core-construction-java/pom.xml          |  17 ++
 .../core/construction/WindowingStrategies.java  | 268 +++++++++++++++++++
 .../construction/WindowingStrategiesTest.java   |  92 +++++++
 .../beam/sdk/util/WindowingStrategies.java      | 267 ------------------
 .../beam/sdk/util/WindowingStrategiesTest.java  |  91 -------
 5 files changed, 377 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/40b36686/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
index ee64f91..3f323dd 100644
--- a/runners/core-construction-java/pom.xml
+++ b/runners/core-construction-java/pom.xml
@@ -58,17 +58,28 @@
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-common-runner-api</artifactId>
     </dependency>
+
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 
     <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-annotations</artifactId>
     </dependency>
 
     <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
     </dependency>
@@ -88,6 +99,12 @@
       <artifactId>slf4j-api</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.auto.value</groupId>
+      <artifactId>auto-value</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <!-- test dependencies -->
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/40b36686/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
new file mode 100644
index 0000000..353be05
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.protobuf.Any;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.BytesValue;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.UUID;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Triggers;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.SerializableUtils;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
+import org.joda.time.Duration;
+
+/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
+public class WindowingStrategies implements Serializable {
+
+  public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) {
+    switch (proto) {
+      case DISCARDING:
+        return AccumulationMode.DISCARDING_FIRED_PANES;
+      case ACCUMULATING:
+        return AccumulationMode.ACCUMULATING_FIRED_PANES;
+      case UNRECOGNIZED:
+      default:
+        // Whether or not it is proto that cannot recognize it (due to the version of the
+        // generated code we link to) or the switch hasn't been updated to handle it,
+        // the situation is the same: we don't know what this OutputTime means
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                RunnerApi.AccumulationMode.class.getCanonicalName(),
+                AccumulationMode.class.getCanonicalName(),
+                proto));
+    }
+  }
+
+  public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) {
+    switch (accumulationMode) {
+      case DISCARDING_FIRED_PANES:
+        return RunnerApi.AccumulationMode.DISCARDING;
+      case ACCUMULATING_FIRED_PANES:
+        return RunnerApi.AccumulationMode.ACCUMULATING;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                AccumulationMode.class.getCanonicalName(),
+                RunnerApi.AccumulationMode.class.getCanonicalName(),
+                accumulationMode));
+    }
+  }
+
+  public static RunnerApi.ClosingBehavior toProto(ClosingBehavior closingBehavior) {
+    switch (closingBehavior) {
+      case FIRE_ALWAYS:
+        return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
+      case FIRE_IF_NON_EMPTY:
+        return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY;
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                ClosingBehavior.class.getCanonicalName(),
+                RunnerApi.ClosingBehavior.class.getCanonicalName(),
+                closingBehavior));
+    }
+  }
+
+  public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) {
+    switch (proto) {
+      case EMIT_ALWAYS:
+        return ClosingBehavior.FIRE_ALWAYS;
+      case EMIT_IF_NONEMPTY:
+        return ClosingBehavior.FIRE_IF_NON_EMPTY;
+      case UNRECOGNIZED:
+      default:
+        // Whether or not it is proto that cannot recognize it (due to the version of the
+        // generated code we link to) or the switch hasn't been updated to handle it,
+        // the situation is the same: we don't know what this OutputTime means
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot convert unknown %s to %s: %s",
+                RunnerApi.ClosingBehavior.class.getCanonicalName(),
+                ClosingBehavior.class.getCanonicalName(),
+                proto));
+    }
+  }
+
+  public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
+    if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) {
+      return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn());
+    } else {
+      return OutputTimeFns.toProto(outputTimeFn);
+    }
+  }
+
+  // This URN says that the coder is just a UDF blob the indicated SDK understands
+  // TODO: standardize such things
+  public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
+
+  // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
+  // TODO: standardize such things
+  public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
+
+  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+
+  /**
+   * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where
+   * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link FunctionSpec}
+   * for the input {@link WindowFn}.
+   */
+  public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn)
+      throws IOException {
+    Coder<?> windowCoder = windowFn.windowCoder();
+
+    // TODO: re-use components
+    String windowCoderId = UUID.randomUUID().toString();
+
+    SdkFunctionSpec windowFnSpec =
+        SdkFunctionSpec.newBuilder()
+            .setSpec(
+                FunctionSpec.newBuilder()
+                    .setUrn(CUSTOM_WINDOWFN_URN)
+                    .setParameter(
+                        Any.pack(
+                            BytesValue.newBuilder()
+                                .setValue(
+                                    ByteString.copyFrom(
+                                        SerializableUtils.serializeToByteArray(windowFn)))
+                                .build())))
+            .build();
+
+    RunnerApi.Coder windowCoderProto =
+        RunnerApi.Coder.newBuilder()
+            .setSpec(
+                SdkFunctionSpec.newBuilder()
+                    .setSpec(
+                        FunctionSpec.newBuilder()
+                            .setUrn(CUSTOM_CODER_URN)
+                            .setParameter(
+                                Any.pack(
+                                    BytesValue.newBuilder()
+                                        .setValue(
+                                            ByteString.copyFrom(
+                                                OBJECT_MAPPER.writeValueAsBytes(
+                                                    windowCoder.asCloudObject())))
+                                        .build()))))
+            .build();
+
+    return RunnerApi.MessageWithComponents.newBuilder()
+        .setSdkFunctionSpec(windowFnSpec)
+        .setComponents(Components.newBuilder().putCoders(windowCoderId, windowCoderProto))
+        .build();
+  }
+
+  /**
+   * Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where
+   * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link
+   * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link
+   * WindowingStrategy}.
+   */
+  public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
+      throws IOException {
+
+    RunnerApi.MessageWithComponents windowFnWithComponents =
+        toProto(windowingStrategy.getWindowFn());
+
+    RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
+        RunnerApi.WindowingStrategy.newBuilder()
+            .setOutputTime(toProto(windowingStrategy.getOutputTimeFn()))
+            .setAccumulationMode(toProto(windowingStrategy.getMode()))
+            .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
+            .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
+            .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
+            .setWindowFn(windowFnWithComponents.getSdkFunctionSpec());
+
+    return RunnerApi.MessageWithComponents.newBuilder()
+        .setWindowingStrategy(windowingStrategyProto)
+        .setComponents(windowFnWithComponents.getComponents()).build();
+  }
+
+  /**
+   * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link Components}
+   * to the SDK's {@link WindowingStrategy}.
+   */
+  public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto)
+      throws InvalidProtocolBufferException {
+    switch (proto.getRootCase()) {
+      case WINDOWING_STRATEGY:
+        return fromProto(proto.getWindowingStrategy(), proto.getComponents());
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Expected a %s with components but received %s",
+                RunnerApi.WindowingStrategy.class.getCanonicalName(), proto));
+    }
+  }
+
+  /**
+   * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using
+   * the provided components to dereferences identifiers found in the proto.
+   */
+  public static WindowingStrategy<?, ?> fromProto(
+      RunnerApi.WindowingStrategy proto, Components components)
+      throws InvalidProtocolBufferException {
+
+    SdkFunctionSpec windowFnSpec = proto.getWindowFn();
+
+    checkArgument(
+        windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
+        "Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
+        WindowFn.class.getSimpleName(),
+        CUSTOM_WINDOWFN_URN,
+        windowFnSpec.getSpec().getUrn());
+
+    Object deserializedWindowFn =
+        SerializableUtils.deserializeFromByteArray(
+            windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
+            "WindowFn");
+
+    WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
+    OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime());
+    AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
+    Trigger trigger = Triggers.fromProto(proto.getTrigger());
+    ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
+    Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
+
+    return WindowingStrategy.of(windowFn)
+        .withAllowedLateness(allowedLateness)
+        .withMode(accumulationMode)
+        .withTrigger(trigger)
+        .withOutputTimeFn(outputTimeFn)
+        .withClosingBehavior(closingBehavior);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/40b36686/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
new file mode 100644
index 0000000..b603d65
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/WindowingStrategiesTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Unit tests for {@link WindowingStrategy}. */
+@RunWith(Parameterized.class)
+public class WindowingStrategiesTest {
+
+  // Each spec activates tests of all subsets of its fields
+  @AutoValue
+  abstract static class ToProtoAndBackSpec {
+    abstract WindowingStrategy getWindowingStrategy();
+  }
+
+  private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowingStrategy) {
+    return new AutoValue_WindowingStrategiesTest_ToProtoAndBackSpec(windowingStrategy);
+  }
+
+  private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN =
+      FixedWindows.of(Duration.millis(12));
+
+  private static final Trigger REPRESENTATIVE_TRIGGER = AfterWatermark.pastEndOfWindow();
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<ToProtoAndBackSpec> data() {
+    return ImmutableList.of(
+        toProtoAndBackSpec(WindowingStrategy.globalDefault()),
+        toProtoAndBackSpec(
+            WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+                .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
+                .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
+                .withTrigger(REPRESENTATIVE_TRIGGER)
+                .withAllowedLateness(Duration.millis(71))
+                .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())),
+        toProtoAndBackSpec(
+            WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
+                .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
+                .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
+                .withTrigger(REPRESENTATIVE_TRIGGER)
+                .withAllowedLateness(Duration.millis(93))
+                .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())));
+  }
+
+  @Parameter(0)
+  public ToProtoAndBackSpec toProtoAndBackSpec;
+
+  @Test
+  public void testToProtoAndBack() throws Exception {
+    WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
+    WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy =
+        WindowingStrategies.fromProto(WindowingStrategies.toProto(windowingStrategy));
+
+    assertThat(
+        toProtoAndBackWindowingStrategy,
+        equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/40b36686/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
deleted file mode 100644
index 9595362..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategies.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.protobuf.Any;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.BytesValue;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.UUID;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.apache.beam.sdk.util.WindowingStrategy.CombineWindowFnOutputTimes;
-import org.joda.time.Duration;
-
-/** Utilities for working with {@link WindowingStrategy WindowingStrategies}. */
-public class WindowingStrategies implements Serializable {
-
-  public static AccumulationMode fromProto(RunnerApi.AccumulationMode proto) {
-    switch (proto) {
-      case DISCARDING:
-        return AccumulationMode.DISCARDING_FIRED_PANES;
-      case ACCUMULATING:
-        return AccumulationMode.ACCUMULATING_FIRED_PANES;
-      case UNRECOGNIZED:
-      default:
-        // Whether or not it is proto that cannot recognize it (due to the version of the
-        // generated code we link to) or the switch hasn't been updated to handle it,
-        // the situation is the same: we don't know what this OutputTime means
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                RunnerApi.AccumulationMode.class.getCanonicalName(),
-                AccumulationMode.class.getCanonicalName(),
-                proto));
-    }
-  }
-
-  public static RunnerApi.AccumulationMode toProto(AccumulationMode accumulationMode) {
-    switch (accumulationMode) {
-      case DISCARDING_FIRED_PANES:
-        return RunnerApi.AccumulationMode.DISCARDING;
-      case ACCUMULATING_FIRED_PANES:
-        return RunnerApi.AccumulationMode.ACCUMULATING;
-      default:
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                AccumulationMode.class.getCanonicalName(),
-                RunnerApi.AccumulationMode.class.getCanonicalName(),
-                accumulationMode));
-    }
-  }
-
-  public static RunnerApi.ClosingBehavior toProto(Window.ClosingBehavior closingBehavior) {
-    switch (closingBehavior) {
-      case FIRE_ALWAYS:
-        return RunnerApi.ClosingBehavior.EMIT_ALWAYS;
-      case FIRE_IF_NON_EMPTY:
-        return RunnerApi.ClosingBehavior.EMIT_IF_NONEMPTY;
-      default:
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                ClosingBehavior.class.getCanonicalName(),
-                RunnerApi.ClosingBehavior.class.getCanonicalName(),
-                closingBehavior));
-    }
-  }
-
-  public static ClosingBehavior fromProto(RunnerApi.ClosingBehavior proto) {
-    switch (proto) {
-      case EMIT_ALWAYS:
-        return ClosingBehavior.FIRE_ALWAYS;
-      case EMIT_IF_NONEMPTY:
-        return ClosingBehavior.FIRE_IF_NON_EMPTY;
-      case UNRECOGNIZED:
-      default:
-        // Whether or not it is proto that cannot recognize it (due to the version of the
-        // generated code we link to) or the switch hasn't been updated to handle it,
-        // the situation is the same: we don't know what this OutputTime means
-        throw new IllegalArgumentException(
-            String.format(
-                "Cannot convert unknown %s to %s: %s",
-                RunnerApi.ClosingBehavior.class.getCanonicalName(),
-                ClosingBehavior.class.getCanonicalName(),
-                proto));
-    }
-  }
-
-  public static RunnerApi.OutputTime toProto(OutputTimeFn<?> outputTimeFn) {
-    if (outputTimeFn instanceof WindowingStrategy.CombineWindowFnOutputTimes) {
-      return toProto(((CombineWindowFnOutputTimes<?>) outputTimeFn).getOutputTimeFn());
-    } else {
-      return OutputTimeFns.toProto(outputTimeFn);
-    }
-  }
-
-  // This URN says that the coder is just a UDF blob the indicated SDK understands
-  // TODO: standardize such things
-  public static final String CUSTOM_CODER_URN = "urn:beam:coders:javasdk:0.1";
-
-  // This URN says that the WindowFn is just a UDF blob the indicated SDK understands
-  // TODO: standardize such things
-  public static final String CUSTOM_WINDOWFN_URN = "urn:beam:windowfn:javasdk:0.1";
-
-  private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-
-  /**
-   * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where
-   * {@link RunnerApi.MessageWithComponents#getFunctionSpec()} is a {@link RunnerApi.FunctionSpec}
-   * for the input {@link WindowFn}.
-   */
-  public static RunnerApi.MessageWithComponents toProto(WindowFn<?, ?> windowFn)
-      throws IOException {
-    Coder<?> windowCoder = windowFn.windowCoder();
-
-    // TODO: re-use components
-    String windowCoderId = UUID.randomUUID().toString();
-
-    RunnerApi.SdkFunctionSpec windowFnSpec =
-        RunnerApi.SdkFunctionSpec.newBuilder()
-            .setSpec(
-                FunctionSpec.newBuilder()
-                    .setUrn(CUSTOM_WINDOWFN_URN)
-                    .setParameter(
-                        Any.pack(
-                            BytesValue.newBuilder()
-                                .setValue(
-                                    ByteString.copyFrom(
-                                        SerializableUtils.serializeToByteArray(windowFn)))
-                                .build())))
-            .build();
-
-    RunnerApi.Coder windowCoderProto =
-        RunnerApi.Coder.newBuilder()
-            .setSpec(
-                SdkFunctionSpec.newBuilder()
-                    .setSpec(
-                        FunctionSpec.newBuilder()
-                            .setUrn(CUSTOM_CODER_URN)
-                            .setParameter(
-                                Any.pack(
-                                    BytesValue.newBuilder()
-                                        .setValue(
-                                            ByteString.copyFrom(
-                                                OBJECT_MAPPER.writeValueAsBytes(
-                                                    windowCoder.asCloudObject())))
-                                        .build()))))
-            .build();
-
-    return RunnerApi.MessageWithComponents.newBuilder()
-        .setSdkFunctionSpec(windowFnSpec)
-        .setComponents(Components.newBuilder().putCoders(windowCoderId, windowCoderProto))
-        .build();
-  }
-
-  /**
-   * Converts a {@link WindowingStrategy} into a {@link RunnerApi.MessageWithComponents} where
-   * {@link RunnerApi.MessageWithComponents#getWindowingStrategy()} ()} is a {@link
-   * RunnerApi.WindowingStrategy RunnerApi.WindowingStrategy (proto)} for the input {@link
-   * WindowingStrategy}.
-   */
-  public static RunnerApi.MessageWithComponents toProto(WindowingStrategy<?, ?> windowingStrategy)
-      throws IOException {
-
-    RunnerApi.MessageWithComponents windowFnWithComponents =
-        toProto(windowingStrategy.getWindowFn());
-
-    RunnerApi.WindowingStrategy.Builder windowingStrategyProto =
-        RunnerApi.WindowingStrategy.newBuilder()
-            .setOutputTime(toProto(windowingStrategy.getOutputTimeFn()))
-            .setAccumulationMode(toProto(windowingStrategy.getMode()))
-            .setClosingBehavior(toProto(windowingStrategy.getClosingBehavior()))
-            .setAllowedLateness(windowingStrategy.getAllowedLateness().getMillis())
-            .setTrigger(Triggers.toProto(windowingStrategy.getTrigger()))
-            .setWindowFn(windowFnWithComponents.getSdkFunctionSpec());
-
-    return RunnerApi.MessageWithComponents.newBuilder()
-        .setWindowingStrategy(windowingStrategyProto)
-        .setComponents(windowFnWithComponents.getComponents()).build();
-  }
-
-  /**
-   * Converts from a {@link RunnerApi.WindowingStrategy} accompanied by {@link RunnerApi.Components}
-   * to the SDK's {@link WindowingStrategy}.
-   */
-  public static WindowingStrategy<?, ?> fromProto(RunnerApi.MessageWithComponents proto)
-      throws InvalidProtocolBufferException {
-    switch (proto.getRootCase()) {
-      case WINDOWING_STRATEGY:
-        return fromProto(proto.getWindowingStrategy(), proto.getComponents());
-      default:
-        throw new IllegalArgumentException(
-            String.format(
-                "Expected a %s with components but received %s",
-                RunnerApi.WindowingStrategy.class.getCanonicalName(), proto));
-    }
-  }
-
-  /**
-   * Converts from {@link RunnerApi.WindowingStrategy} to the SDK's {@link WindowingStrategy} using
-   * the provided components to dereferences identifiers found in the proto.
-   */
-  public static WindowingStrategy<?, ?> fromProto(
-      RunnerApi.WindowingStrategy proto, RunnerApi.Components components)
-      throws InvalidProtocolBufferException {
-
-    SdkFunctionSpec windowFnSpec = proto.getWindowFn();
-
-    checkArgument(
-        windowFnSpec.getSpec().getUrn().equals(CUSTOM_WINDOWFN_URN),
-        "Only Java-serialized %s instances are supported, with URN %s. But found URN %s",
-        WindowFn.class.getSimpleName(),
-        CUSTOM_WINDOWFN_URN,
-        windowFnSpec.getSpec().getUrn());
-
-    Object deserializedWindowFn =
-        SerializableUtils.deserializeFromByteArray(
-            windowFnSpec.getSpec().getParameter().unpack(BytesValue.class).getValue().toByteArray(),
-            "WindowFn");
-
-    WindowFn<?, ?> windowFn = (WindowFn<?, ?>) deserializedWindowFn;
-    OutputTimeFn<?> outputTimeFn = OutputTimeFns.fromProto(proto.getOutputTime());
-    AccumulationMode accumulationMode = fromProto(proto.getAccumulationMode());
-    Trigger trigger = Triggers.fromProto(proto.getTrigger());
-    ClosingBehavior closingBehavior = fromProto(proto.getClosingBehavior());
-    Duration allowedLateness = Duration.millis(proto.getAllowedLateness());
-
-    return WindowingStrategy.of(windowFn)
-        .withAllowedLateness(allowedLateness)
-        .withMode(accumulationMode)
-        .withTrigger(trigger)
-        .withOutputTimeFn(outputTimeFn)
-        .withClosingBehavior(closingBehavior);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/40b36686/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
deleted file mode 100644
index 5d3de51..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/WindowingStrategiesTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
-import org.joda.time.Duration;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Unit tests for {@link WindowingStrategy}. */
-@RunWith(Parameterized.class)
-public class WindowingStrategiesTest {
-
-  // Each spec activates tests of all subsets of its fields
-  @AutoValue
-  abstract static class ToProtoAndBackSpec {
-    abstract WindowingStrategy getWindowingStrategy();
-  }
-
-  private static ToProtoAndBackSpec toProtoAndBackSpec(WindowingStrategy windowingStrategy) {
-    return new AutoValue_WindowingStrategiesTest_ToProtoAndBackSpec(windowingStrategy);
-  }
-
-  private static final WindowFn<?, ?> REPRESENTATIVE_WINDOW_FN =
-      FixedWindows.of(Duration.millis(12));
-
-  private static final Trigger REPRESENTATIVE_TRIGGER = AfterWatermark.pastEndOfWindow();
-
-  @Parameters(name = "{index}: {0}")
-  public static Iterable<ToProtoAndBackSpec> data() {
-    return ImmutableList.of(
-        toProtoAndBackSpec(WindowingStrategy.globalDefault()),
-        toProtoAndBackSpec(
-            WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
-                .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)
-                .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
-                .withTrigger(REPRESENTATIVE_TRIGGER)
-                .withAllowedLateness(Duration.millis(71))
-                .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())),
-        toProtoAndBackSpec(
-            WindowingStrategy.of(REPRESENTATIVE_WINDOW_FN)
-                .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)
-                .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
-                .withTrigger(REPRESENTATIVE_TRIGGER)
-                .withAllowedLateness(Duration.millis(93))
-                .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp())));
-  }
-
-  @Parameter(0)
-  public ToProtoAndBackSpec toProtoAndBackSpec;
-
-  @Test
-  public void testToProtoAndBack() throws Exception {
-    WindowingStrategy<?, ?> windowingStrategy = toProtoAndBackSpec.getWindowingStrategy();
-    WindowingStrategy<?, ?> toProtoAndBackWindowingStrategy =
-        WindowingStrategies.fromProto(WindowingStrategies.toProto(windowingStrategy));
-
-    assertThat(
-        toProtoAndBackWindowingStrategy,
-        equalTo((WindowingStrategy) windowingStrategy.fixDefaults()));
-  }
-}


[2/2] beam git commit: This closes #2486

Posted by tg...@apache.org.
This closes #2486


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0a0b1c80
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0a0b1c80
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0a0b1c80

Branch: refs/heads/master
Commit: 0a0b1c80c97949954add4b4357fe29409a08197d
Parents: 7fd9c65 40b3668
Author: Thomas Groh <tg...@google.com>
Authored: Mon Apr 10 19:24:52 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Mon Apr 10 19:24:52 2017 -0700

----------------------------------------------------------------------
 runners/core-construction-java/pom.xml          |  17 ++
 .../core/construction/WindowingStrategies.java  | 268 +++++++++++++++++++
 .../construction/WindowingStrategiesTest.java   |  92 +++++++
 .../beam/sdk/util/WindowingStrategies.java      | 267 ------------------
 .../beam/sdk/util/WindowingStrategiesTest.java  |  91 -------
 5 files changed, 377 insertions(+), 358 deletions(-)
----------------------------------------------------------------------