You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/12 23:08:51 UTC

[1/2] beam git commit: Move Triggers from sdk-core to runners-core-construction

Repository: beam
Updated Branches:
  refs/heads/master 8479094c2 -> dc672f420


Move Triggers from sdk-core to runners-core-construction

Converting to/from Runner API is not the SDK API surface.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/facdc108
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/facdc108
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/facdc108

Branch: refs/heads/master
Commit: facdc108bbd7bd00a4e7d7aacf447e77785529a0
Parents: 8479094
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 12 10:00:45 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 12 16:08:32 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/Triggers.java     | 336 +++++++++++++++++++
 .../core/construction/WindowingStrategies.java  |   1 -
 .../runners/core/construction/TriggersTest.java | 111 ++++++
 runners/core-java/pom.xml                       |   5 +
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |   2 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   2 +-
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |   2 +-
 .../beam/runners/core/ReduceFnTester.java       |   2 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   2 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |   3 +-
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   2 +-
 .../beam/sdk/transforms/windowing/Triggers.java |   1 +
 .../sdk/transforms/windowing/TriggersTest.java  | 100 ------
 13 files changed, 460 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
new file mode 100644
index 0000000..81f738d
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.TimestampTransform;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.ReshuffleTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** Utilities for working with {@link Triggers Triggers}. */
+@Experimental(Experimental.Kind.TRIGGER)
+public class Triggers implements Serializable {
+
+  @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter();
+
+  public static RunnerApi.Trigger toProto(Trigger trigger) {
+    return CONVERTER.convertTrigger(trigger);
+  }
+
+  @VisibleForTesting
+  static class ProtoConverter {
+
+    public RunnerApi.Trigger convertTrigger(Trigger trigger) {
+      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+      return tryConvert(evaluationMethod, trigger);
+    }
+
+    private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger trigger) {
+      try {
+        return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger);
+      } catch (InvocationTargetException exc) {
+        if (exc.getCause() instanceof RuntimeException) {
+          throw (RuntimeException) exc.getCause();
+        } else {
+          throw new RuntimeException(exc.getCause());
+        }
+      } catch (IllegalAccessException exc) {
+        throw new IllegalStateException(
+            String.format("Internal error: could not invoke %s", evaluationMethod));
+      }
+    }
+
+    private Method getEvaluationMethod(Class<?> clazz) {
+      try {
+        return getClass().getDeclaredMethod("convertSpecific", clazz);
+      } catch (NoSuchMethodException exc) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Cannot translate trigger class %s to a runner-API proto.",
+                clazz.getCanonicalName()),
+            exc);
+      }
+    }
+
+    private RunnerApi.Trigger convertSpecific(DefaultTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setDefault(RunnerApi.Trigger.Default.getDefaultInstance())
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(FromEndOfWindow v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder())
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(NeverTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setNever(RunnerApi.Trigger.Never.getDefaultInstance())
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setAlways(RunnerApi.Trigger.Always.getDefaultInstance())
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setAfterSynchronizedProcessingTime(
+              RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance())
+          .build();
+    }
+
+    private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) {
+      switch (timeDomain) {
+        case EVENT_TIME:
+          return RunnerApi.TimeDomain.EVENT_TIME;
+        case PROCESSING_TIME:
+          return RunnerApi.TimeDomain.PROCESSING_TIME;
+        case SYNCHRONIZED_PROCESSING_TIME:
+          return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
+        default:
+          throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain));
+      }
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterFirst v) {
+      RunnerApi.Trigger.AfterAny.Builder builder = RunnerApi.Trigger.AfterAny.newBuilder();
+
+      for (Trigger subtrigger : v.subTriggers()) {
+        builder.addSubtriggers(toProto(subtrigger));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterAll v) {
+      RunnerApi.Trigger.AfterAll.Builder builder = RunnerApi.Trigger.AfterAll.newBuilder();
+
+      for (Trigger subtrigger : v.subTriggers()) {
+        builder.addSubtriggers(toProto(subtrigger));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterPane v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setElementCount(
+              RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount()))
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterWatermarkEarlyAndLate v) {
+      RunnerApi.Trigger.AfterEndOfWindow.Builder builder =
+          RunnerApi.Trigger.AfterEndOfWindow.newBuilder();
+
+      builder.setEarlyFirings(toProto(v.getEarlyTrigger()));
+      if (v.getLateTrigger() != null) {
+        builder.setLateFirings(toProto(v.getLateTrigger()));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterEach v) {
+      RunnerApi.Trigger.AfterEach.Builder builder = RunnerApi.Trigger.AfterEach.newBuilder();
+
+      for (Trigger subtrigger : v.subTriggers()) {
+        builder.addSubtriggers(toProto(subtrigger));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(Repeatedly v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setRepeat(
+              RunnerApi.Trigger.Repeat.newBuilder()
+                  .setSubtrigger(toProto(v.getRepeatedTrigger())))
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) {
+      return RunnerApi.Trigger.newBuilder()
+          .setOrFinally(
+              RunnerApi.Trigger.OrFinally.newBuilder()
+                  .setMain(toProto(v.getMainTrigger()))
+                  .setFinally(toProto(v.getUntilTrigger())))
+          .build();
+    }
+
+    private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) {
+      RunnerApi.Trigger.AfterProcessingTime.Builder builder =
+          RunnerApi.Trigger.AfterProcessingTime.newBuilder();
+
+      for (TimestampTransform transform : v.getTimestampTransforms()) {
+        builder.addTimestampTransforms(convertTimestampTransform(transform));
+      }
+
+      return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
+    }
+
+    private RunnerApi.TimestampTransform convertTimestampTransform(TimestampTransform transform) {
+      if (transform instanceof TimestampTransform.Delay) {
+        return RunnerApi.TimestampTransform.newBuilder()
+            .setDelay(
+                RunnerApi.TimestampTransform.Delay.newBuilder()
+                    .setDelayMillis(((TimestampTransform.Delay) transform).getDelay().getMillis()))
+            .build();
+      } else if (transform instanceof TimestampTransform.AlignTo) {
+        TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform;
+        return RunnerApi.TimestampTransform.newBuilder()
+            .setAlignTo(
+                RunnerApi.TimestampTransform.AlignTo.newBuilder()
+                    .setPeriod(alignTo.getPeriod().getMillis())
+                    .setOffset(alignTo.getOffset().getMillis()))
+            .build();
+
+      } else {
+        throw new IllegalArgumentException(
+            String.format("Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform));
+      }
+    }
+  }
+
+  public static Trigger fromProto(RunnerApi.Trigger triggerProto) {
+    switch (triggerProto.getTriggerCase()) {
+      case AFTER_ALL:
+        return AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList()));
+      case AFTER_ANY:
+        return AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList()));
+      case AFTER_EACH:
+        return AfterEach.inOrder(
+            protosToTriggers(triggerProto.getAfterEach().getSubtriggersList()));
+      case AFTER_END_OF_WINDOW:
+        RunnerApi.Trigger.AfterEndOfWindow eowProto = triggerProto.getAfterEndOfWindow();
+
+        if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) {
+          return AfterWatermark.pastEndOfWindow();
+        }
+
+        // It either has early or late firings or both; our typing in Java makes this a smidge
+        // annoying
+        if (triggerProto.getAfterEndOfWindow().hasEarlyFirings()) {
+          AfterWatermarkEarlyAndLate trigger =
+              AfterWatermark.pastEndOfWindow()
+                  .withEarlyFirings(
+                      (OnceTrigger)
+                          fromProto(triggerProto.getAfterEndOfWindow().getEarlyFirings()));
+
+          if (triggerProto.getAfterEndOfWindow().hasLateFirings()) {
+            trigger =
+                trigger.withLateFirings(
+                    (OnceTrigger)
+                        fromProto(triggerProto.getAfterEndOfWindow().getLateFirings()));
+          }
+          return trigger;
+        } else {
+          // only late firings, so return directly
+          return AfterWatermark.pastEndOfWindow()
+              .withLateFirings((OnceTrigger) fromProto(eowProto.getLateFirings()));
+        }
+      case AFTER_PROCESSING_TIME:
+        AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane();
+        for (RunnerApi.TimestampTransform transform :
+            triggerProto.getAfterProcessingTime().getTimestampTransformsList()) {
+          switch (transform.getTimestampTransformCase()) {
+            case ALIGN_TO:
+              trigger =
+                  trigger.alignedTo(
+                      Duration.millis(transform.getAlignTo().getPeriod()),
+                      new Instant(transform.getAlignTo().getOffset()));
+              break;
+            case DELAY:
+              trigger = trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis()));
+              break;
+            case TIMESTAMPTRANSFORM_NOT_SET:
+              throw new IllegalArgumentException(
+                  String.format(
+                      "Required field 'timestamp_transform' not set in %s", transform));
+            default:
+              throw new IllegalArgumentException(
+                  String.format(
+                      "Unknown timestamp transform case: %s",
+                      transform.getTimestampTransformCase()));
+          }
+        }
+        return trigger;
+      case AFTER_SYNCHRONIZED_PROCESSING_TIME:
+        return AfterSynchronizedProcessingTime.ofFirstElement();
+      case ALWAYS:
+        return new ReshuffleTrigger();
+      case ELEMENT_COUNT:
+        return AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount());
+      case NEVER:
+        return Never.ever();
+      case OR_FINALLY:
+        return fromProto(triggerProto.getOrFinally().getMain())
+            .orFinally((OnceTrigger) fromProto(triggerProto.getOrFinally().getFinally()));
+      case REPEAT:
+        return Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger()));
+      case DEFAULT:
+        return DefaultTrigger.of();
+      case TRIGGER_NOT_SET:
+        throw new IllegalArgumentException(
+            String.format("Required field 'trigger' not set in %s", triggerProto));
+      default:
+        throw new IllegalArgumentException(
+            String.format("Unknown trigger case: %s", triggerProto.getTriggerCase()));
+    }
+  }
+
+  private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> triggers) {
+    List<Trigger> result = Lists.newArrayList();
+    for (RunnerApi.Trigger trigger : triggers) {
+      result.add(fromProto(trigger));
+    }
+    return result;
+  }
+
+  // Do not instantiate
+  private Triggers() {}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
index 6d721b0..3d7deef 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.SerializableUtils;

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java
new file mode 100644
index 0000000..cf9d40c
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for utilities in {@link Triggers}. */
+@RunWith(Parameterized.class)
+public class TriggersTest {
+
+  @AutoValue
+  abstract static class ToProtoAndBackSpec {
+    abstract Trigger getTrigger();
+  }
+
+  private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) {
+    return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger);
+  }
+
+  @Parameters(name = "{index}: {0}")
+  public static Iterable<ToProtoAndBackSpec> data() {
+    return ImmutableList.of(
+        // Atomic triggers
+        toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()),
+        toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)),
+        toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()),
+        toProtoAndBackSpec(Never.ever()),
+        toProtoAndBackSpec(DefaultTrigger.of()),
+        toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()),
+        toProtoAndBackSpec(
+            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))),
+        toProtoAndBackSpec(
+            AfterProcessingTime.pastFirstElementInPane()
+                .alignedTo(Duration.millis(5), new Instant(27))),
+        toProtoAndBackSpec(
+            AfterProcessingTime.pastFirstElementInPane()
+                .plusDelayOf(Duration.standardSeconds(3))
+                .alignedTo(Duration.millis(5), new Instant(27))
+                .plusDelayOf(Duration.millis(13))),
+
+        // Composite triggers
+
+        toProtoAndBackSpec(
+            AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())),
+        toProtoAndBackSpec(
+            AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(
+            AfterWatermark.pastEndOfWindow()
+                .withEarlyFirings(
+                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42)))
+                .withLateFirings(AfterPane.elementCountAtLeast(3))),
+        toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())),
+        toProtoAndBackSpec(
+            Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+                .orFinally(AfterWatermark.pastEndOfWindow())));
+  }
+
+  @Parameter(0)
+  public ToProtoAndBackSpec toProtoAndBackSpec;
+
+  @Test
+  public void testToProtoAndBack() throws Exception {
+    Trigger trigger = toProtoAndBackSpec.getTrigger();
+    Trigger toProtoAndBackTrigger = Triggers.fromProto(Triggers.toProto(trigger));
+
+    assertThat(toProtoAndBackTrigger, equalTo(trigger));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index affd1a9..f066abf 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -77,6 +77,11 @@
       <artifactId>beam-sdks-common-runner-api</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-construction-java</artifactId>
+    </dependency>
+
     <!-- build dependencies -->
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index e3ce1ef..5508b2e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.core;
 
 import java.util.ArrayList;
 import java.util.List;
+import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 8dc1502..bf48df1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -17,12 +17,12 @@
  */
 package org.apache.beam.runners.core;
 
+import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 444f8fe..8fff0e4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.core;
 
 import java.util.Collection;
+import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -25,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.SystemDoFnInternal;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index d18a1c3..512420f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
@@ -60,7 +61,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.AppliedCombineFn;

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index f7fd4cf..b4ca998 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -34,6 +34,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
@@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1f2fcb6..0e74fa2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
@@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.util.WindowingStrategy;
@@ -67,7 +67,6 @@ import org.apache.spark.streaming.dstream.PairDStreamFunctions;
 import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.Function1;
 import scala.Option;
 import scala.Tuple2;

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 2b16c60..d19c4a9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -31,6 +31,7 @@ import org.apache.beam.runners.core.StateInternalsFactory;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
 import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.Triggers;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.spark.aggregators.NamedAggregators;
@@ -38,7 +39,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
index 591af37..47f05e4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
@@ -35,6 +35,7 @@ import org.joda.time.Instant;
 
 /** Utilities for working with {@link Triggers Triggers}. */
 @Experimental(Experimental.Kind.TRIGGER)
+@Deprecated
 public class Triggers implements Serializable {
 
   @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter();

http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java
deleted file mode 100644
index 0ac5966..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Tests for utilities in {@link Triggers}. */
-@RunWith(Parameterized.class)
-public class TriggersTest {
-
-  @AutoValue
-  abstract static class ToProtoAndBackSpec {
-    abstract Trigger getTrigger();
-  }
-
-  private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) {
-    return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger);
-  }
-
-  @Parameters(name = "{index}: {0}")
-  public static Iterable<ToProtoAndBackSpec> data() {
-    return ImmutableList.of(
-        // Atomic triggers
-        toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()),
-        toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)),
-        toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()),
-        toProtoAndBackSpec(Never.ever()),
-        toProtoAndBackSpec(DefaultTrigger.of()),
-        toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()),
-        toProtoAndBackSpec(
-            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))),
-        toProtoAndBackSpec(
-            AfterProcessingTime.pastFirstElementInPane()
-                .alignedTo(Duration.millis(5), new Instant(27))),
-        toProtoAndBackSpec(
-            AfterProcessingTime.pastFirstElementInPane()
-                .plusDelayOf(Duration.standardSeconds(3))
-                .alignedTo(Duration.millis(5), new Instant(27))
-                .plusDelayOf(Duration.millis(13))),
-
-        // Composite triggers
-
-        toProtoAndBackSpec(
-            AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())),
-        toProtoAndBackSpec(
-            AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))),
-        toProtoAndBackSpec(
-            AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))),
-        toProtoAndBackSpec(
-            AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))),
-        toProtoAndBackSpec(
-            AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))),
-        toProtoAndBackSpec(
-            AfterWatermark.pastEndOfWindow()
-                .withEarlyFirings(
-                    AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42)))
-                .withLateFirings(AfterPane.elementCountAtLeast(3))),
-        toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())),
-        toProtoAndBackSpec(
-            Repeatedly.forever(AfterPane.elementCountAtLeast(1))
-                .orFinally(AfterWatermark.pastEndOfWindow())));
-  }
-
-  @Parameter(0)
-  public ToProtoAndBackSpec toProtoAndBackSpec;
-
-  @Test
-  public void testToProtoAndBack() throws Exception {
-    Trigger trigger = toProtoAndBackSpec.getTrigger();
-    Trigger toProtoAndBackTrigger = Triggers.fromProto(Triggers.toProto(trigger));
-
-    assertThat(toProtoAndBackTrigger, equalTo(trigger));
-  }
-}


[2/2] beam git commit: This closes #2513

Posted by dh...@apache.org.
This closes #2513


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc672f42
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc672f42
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc672f42

Branch: refs/heads/master
Commit: dc672f42083c375872d87030523545866cea4f55
Parents: 8479094 facdc10
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 12 16:08:35 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 12 16:08:35 2017 -0700

----------------------------------------------------------------------
 .../runners/core/construction/Triggers.java     | 336 +++++++++++++++++++
 .../core/construction/WindowingStrategies.java  |   1 -
 .../runners/core/construction/TriggersTest.java | 111 ++++++
 runners/core-java/pom.xml                       |   5 +
 .../GroupAlsoByWindowViaOutputBufferDoFn.java   |   2 +-
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   2 +-
 .../GroupAlsoByWindowViaWindowSetNewDoFn.java   |   2 +-
 .../beam/runners/core/ReduceFnTester.java       |   2 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      |   2 +-
 .../SparkGroupAlsoByWindowViaWindowSet.java     |   3 +-
 ...SparkGroupAlsoByWindowViaOutputBufferFn.java |   2 +-
 .../beam/sdk/transforms/windowing/Triggers.java |   1 +
 .../sdk/transforms/windowing/TriggersTest.java  | 100 ------
 13 files changed, 460 insertions(+), 109 deletions(-)
----------------------------------------------------------------------