You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/12 23:08:51 UTC
[1/2] beam git commit: Move Triggers from sdk-core to
runners-core-construction
Repository: beam
Updated Branches:
refs/heads/master 8479094c2 -> dc672f420
Move Triggers from sdk-core to runners-core-construction
Converting to/from Runner API is not the SDK API surface.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/facdc108
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/facdc108
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/facdc108
Branch: refs/heads/master
Commit: facdc108bbd7bd00a4e7d7aacf447e77785529a0
Parents: 8479094
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 12 10:00:45 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 12 16:08:32 2017 -0700
----------------------------------------------------------------------
.../runners/core/construction/Triggers.java | 336 +++++++++++++++++++
.../core/construction/WindowingStrategies.java | 1 -
.../runners/core/construction/TriggersTest.java | 111 ++++++
runners/core-java/pom.xml | 5 +
.../GroupAlsoByWindowViaOutputBufferDoFn.java | 2 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 2 +-
.../GroupAlsoByWindowViaWindowSetNewDoFn.java | 2 +-
.../beam/runners/core/ReduceFnTester.java | 2 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 2 +-
.../SparkGroupAlsoByWindowViaWindowSet.java | 3 +-
...SparkGroupAlsoByWindowViaOutputBufferFn.java | 2 +-
.../beam/sdk/transforms/windowing/Triggers.java | 1 +
.../sdk/transforms/windowing/TriggersTest.java | 100 ------
13 files changed, 460 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
new file mode 100644
index 0000000..81f738d
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/Triggers.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark.AfterWatermarkEarlyAndLate;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark.FromEndOfWindow;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.TimestampTransform;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.ReshuffleTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** Utilities for working with {@link Triggers Triggers}. */
+@Experimental(Experimental.Kind.TRIGGER)
+public class Triggers implements Serializable {
+
+ @VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter();
+
+ public static RunnerApi.Trigger toProto(Trigger trigger) {
+ return CONVERTER.convertTrigger(trigger);
+ }
+
+ @VisibleForTesting
+ static class ProtoConverter {
+
+ public RunnerApi.Trigger convertTrigger(Trigger trigger) {
+ Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+ return tryConvert(evaluationMethod, trigger);
+ }
+
+ private RunnerApi.Trigger tryConvert(Method evaluationMethod, Trigger trigger) {
+ try {
+ return (RunnerApi.Trigger) evaluationMethod.invoke(this, trigger);
+ } catch (InvocationTargetException exc) {
+ if (exc.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) exc.getCause();
+ } else {
+ throw new RuntimeException(exc.getCause());
+ }
+ } catch (IllegalAccessException exc) {
+ throw new IllegalStateException(
+ String.format("Internal error: could not invoke %s", evaluationMethod));
+ }
+ }
+
+ private Method getEvaluationMethod(Class<?> clazz) {
+ try {
+ return getClass().getDeclaredMethod("convertSpecific", clazz);
+ } catch (NoSuchMethodException exc) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot translate trigger class %s to a runner-API proto.",
+ clazz.getCanonicalName()),
+ exc);
+ }
+ }
+
+ private RunnerApi.Trigger convertSpecific(DefaultTrigger v) {
+ return RunnerApi.Trigger.newBuilder()
+ .setDefault(RunnerApi.Trigger.Default.getDefaultInstance())
+ .build();
+ }
+
+ private RunnerApi.Trigger convertSpecific(FromEndOfWindow v) {
+ return RunnerApi.Trigger.newBuilder()
+ .setAfterEndOfWindow(RunnerApi.Trigger.AfterEndOfWindow.newBuilder())
+ .build();
+ }
+
+ private RunnerApi.Trigger convertSpecific(NeverTrigger v) {
+ return RunnerApi.Trigger.newBuilder()
+ .setNever(RunnerApi.Trigger.Never.getDefaultInstance())
+ .build();
+ }
+
+ private RunnerApi.Trigger convertSpecific(ReshuffleTrigger v) {
+ return RunnerApi.Trigger.newBuilder()
+ .setAlways(RunnerApi.Trigger.Always.getDefaultInstance())
+ .build();
+ }
+
+ private RunnerApi.Trigger convertSpecific(AfterSynchronizedProcessingTime v) {
+ return RunnerApi.Trigger.newBuilder()
+ .setAfterSynchronizedProcessingTime(
+ RunnerApi.Trigger.AfterSynchronizedProcessingTime.getDefaultInstance())
+ .build();
+ }
+
+ private RunnerApi.TimeDomain convertTimeDomain(TimeDomain timeDomain) {
+ switch (timeDomain) {
+ case EVENT_TIME:
+ return RunnerApi.TimeDomain.EVENT_TIME;
+ case PROCESSING_TIME:
+ return RunnerApi.TimeDomain.PROCESSING_TIME;
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return RunnerApi.TimeDomain.SYNCHRONIZED_PROCESSING_TIME;
+ default:
+ throw new IllegalArgumentException(String.format("Unknown time domain: %s", timeDomain));
+ }
+ }
+
+ private RunnerApi.Trigger convertSpecific(AfterFirst v) {
+ RunnerApi.Trigger.AfterAny.Builder builder = RunnerApi.Trigger.AfterAny.newBuilder();
+
+ for (Trigger subtrigger : v.subTriggers()) {
+ builder.addSubtriggers(toProto(subtrigger));
+ }
+
+ return RunnerApi.Trigger.newBuilder().setAfterAny(builder).build();
+ }
+
+ private RunnerApi.Trigger convertSpecific(AfterAll v) {
+ RunnerApi.Trigger.AfterAll.Builder builder = RunnerApi.Trigger.AfterAll.newBuilder();
+
+ for (Trigger subtrigger : v.subTriggers()) {
+ builder.addSubtriggers(toProto(subtrigger));
+ }
+
+ return RunnerApi.Trigger.newBuilder().setAfterAll(builder).build();
+ }
+
+ private RunnerApi.Trigger convertSpecific(AfterPane v) {
+ return RunnerApi.Trigger.newBuilder()
+ .setElementCount(
+ RunnerApi.Trigger.ElementCount.newBuilder().setElementCount(v.getElementCount()))
+ .build();
+ }
+
+ private RunnerApi.Trigger convertSpecific(AfterWatermarkEarlyAndLate v) {
+ RunnerApi.Trigger.AfterEndOfWindow.Builder builder =
+ RunnerApi.Trigger.AfterEndOfWindow.newBuilder();
+
+ builder.setEarlyFirings(toProto(v.getEarlyTrigger()));
+ if (v.getLateTrigger() != null) {
+ builder.setLateFirings(toProto(v.getLateTrigger()));
+ }
+
+ return RunnerApi.Trigger.newBuilder().setAfterEndOfWindow(builder).build();
+ }
+
+ private RunnerApi.Trigger convertSpecific(AfterEach v) {
+ RunnerApi.Trigger.AfterEach.Builder builder = RunnerApi.Trigger.AfterEach.newBuilder();
+
+ for (Trigger subtrigger : v.subTriggers()) {
+ builder.addSubtriggers(toProto(subtrigger));
+ }
+
+ return RunnerApi.Trigger.newBuilder().setAfterEach(builder).build();
+ }
+
+ private RunnerApi.Trigger convertSpecific(Repeatedly v) {
+ return RunnerApi.Trigger.newBuilder()
+ .setRepeat(
+ RunnerApi.Trigger.Repeat.newBuilder()
+ .setSubtrigger(toProto(v.getRepeatedTrigger())))
+ .build();
+ }
+
+ private RunnerApi.Trigger convertSpecific(OrFinallyTrigger v) {
+ return RunnerApi.Trigger.newBuilder()
+ .setOrFinally(
+ RunnerApi.Trigger.OrFinally.newBuilder()
+ .setMain(toProto(v.getMainTrigger()))
+ .setFinally(toProto(v.getUntilTrigger())))
+ .build();
+ }
+
+ private RunnerApi.Trigger convertSpecific(AfterProcessingTime v) {
+ RunnerApi.Trigger.AfterProcessingTime.Builder builder =
+ RunnerApi.Trigger.AfterProcessingTime.newBuilder();
+
+ for (TimestampTransform transform : v.getTimestampTransforms()) {
+ builder.addTimestampTransforms(convertTimestampTransform(transform));
+ }
+
+ return RunnerApi.Trigger.newBuilder().setAfterProcessingTime(builder).build();
+ }
+
+ private RunnerApi.TimestampTransform convertTimestampTransform(TimestampTransform transform) {
+ if (transform instanceof TimestampTransform.Delay) {
+ return RunnerApi.TimestampTransform.newBuilder()
+ .setDelay(
+ RunnerApi.TimestampTransform.Delay.newBuilder()
+ .setDelayMillis(((TimestampTransform.Delay) transform).getDelay().getMillis()))
+ .build();
+ } else if (transform instanceof TimestampTransform.AlignTo) {
+ TimestampTransform.AlignTo alignTo = (TimestampTransform.AlignTo) transform;
+ return RunnerApi.TimestampTransform.newBuilder()
+ .setAlignTo(
+ RunnerApi.TimestampTransform.AlignTo.newBuilder()
+ .setPeriod(alignTo.getPeriod().getMillis())
+ .setOffset(alignTo.getOffset().getMillis()))
+ .build();
+
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Unknown %s: %s", TimestampTransform.class.getSimpleName(), transform));
+ }
+ }
+ }
+
+ public static Trigger fromProto(RunnerApi.Trigger triggerProto) {
+ switch (triggerProto.getTriggerCase()) {
+ case AFTER_ALL:
+ return AfterAll.of(protosToTriggers(triggerProto.getAfterAll().getSubtriggersList()));
+ case AFTER_ANY:
+ return AfterFirst.of(protosToTriggers(triggerProto.getAfterAny().getSubtriggersList()));
+ case AFTER_EACH:
+ return AfterEach.inOrder(
+ protosToTriggers(triggerProto.getAfterEach().getSubtriggersList()));
+ case AFTER_END_OF_WINDOW:
+ RunnerApi.Trigger.AfterEndOfWindow eowProto = triggerProto.getAfterEndOfWindow();
+
+ if (!eowProto.hasEarlyFirings() && !eowProto.hasLateFirings()) {
+ return AfterWatermark.pastEndOfWindow();
+ }
+
+ // It either has early or late firings or both; our typing in Java makes this a smidge
+ // annoying
+ if (triggerProto.getAfterEndOfWindow().hasEarlyFirings()) {
+ AfterWatermarkEarlyAndLate trigger =
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(
+ (OnceTrigger)
+ fromProto(triggerProto.getAfterEndOfWindow().getEarlyFirings()));
+
+ if (triggerProto.getAfterEndOfWindow().hasLateFirings()) {
+ trigger =
+ trigger.withLateFirings(
+ (OnceTrigger)
+ fromProto(triggerProto.getAfterEndOfWindow().getLateFirings()));
+ }
+ return trigger;
+ } else {
+ // only late firings, so return directly
+ return AfterWatermark.pastEndOfWindow()
+ .withLateFirings((OnceTrigger) fromProto(eowProto.getLateFirings()));
+ }
+ case AFTER_PROCESSING_TIME:
+ AfterProcessingTime trigger = AfterProcessingTime.pastFirstElementInPane();
+ for (RunnerApi.TimestampTransform transform :
+ triggerProto.getAfterProcessingTime().getTimestampTransformsList()) {
+ switch (transform.getTimestampTransformCase()) {
+ case ALIGN_TO:
+ trigger =
+ trigger.alignedTo(
+ Duration.millis(transform.getAlignTo().getPeriod()),
+ new Instant(transform.getAlignTo().getOffset()));
+ break;
+ case DELAY:
+ trigger = trigger.plusDelayOf(Duration.millis(transform.getDelay().getDelayMillis()));
+ break;
+ case TIMESTAMPTRANSFORM_NOT_SET:
+ throw new IllegalArgumentException(
+ String.format(
+ "Required field 'timestamp_transform' not set in %s", transform));
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Unknown timestamp transform case: %s",
+ transform.getTimestampTransformCase()));
+ }
+ }
+ return trigger;
+ case AFTER_SYNCHRONIZED_PROCESSING_TIME:
+ return AfterSynchronizedProcessingTime.ofFirstElement();
+ case ALWAYS:
+ return new ReshuffleTrigger();
+ case ELEMENT_COUNT:
+ return AfterPane.elementCountAtLeast(triggerProto.getElementCount().getElementCount());
+ case NEVER:
+ return Never.ever();
+ case OR_FINALLY:
+ return fromProto(triggerProto.getOrFinally().getMain())
+ .orFinally((OnceTrigger) fromProto(triggerProto.getOrFinally().getFinally()));
+ case REPEAT:
+ return Repeatedly.forever(fromProto(triggerProto.getRepeat().getSubtrigger()));
+ case DEFAULT:
+ return DefaultTrigger.of();
+ case TRIGGER_NOT_SET:
+ throw new IllegalArgumentException(
+ String.format("Required field 'trigger' not set in %s", triggerProto));
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unknown trigger case: %s", triggerProto.getTriggerCase()));
+ }
+ }
+
+ private static List<Trigger> protosToTriggers(List<RunnerApi.Trigger> triggers) {
+ List<Trigger> result = Lists.newArrayList();
+ for (RunnerApi.Trigger trigger : triggers) {
+ result.add(fromProto(trigger));
+ }
+ return result;
+ }
+
+ // Do not instantiate
+ private Triggers() {}
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
index 6d721b0..3d7deef 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategies.java
@@ -32,7 +32,6 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi.SdkFunctionSpec;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.SerializableUtils;
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java
new file mode 100644
index 0000000..cf9d40c
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TriggersTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.collect.ImmutableList;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for utilities in {@link Triggers}. */
+@RunWith(Parameterized.class)
+public class TriggersTest {
+
+ @AutoValue
+ abstract static class ToProtoAndBackSpec {
+ abstract Trigger getTrigger();
+ }
+
+ private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) {
+ return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger);
+ }
+
+ @Parameters(name = "{index}: {0}")
+ public static Iterable<ToProtoAndBackSpec> data() {
+ return ImmutableList.of(
+ // Atomic triggers
+ toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()),
+ toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)),
+ toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()),
+ toProtoAndBackSpec(Never.ever()),
+ toProtoAndBackSpec(DefaultTrigger.of()),
+ toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()),
+ toProtoAndBackSpec(
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))),
+ toProtoAndBackSpec(
+ AfterProcessingTime.pastFirstElementInPane()
+ .alignedTo(Duration.millis(5), new Instant(27))),
+ toProtoAndBackSpec(
+ AfterProcessingTime.pastFirstElementInPane()
+ .plusDelayOf(Duration.standardSeconds(3))
+ .alignedTo(Duration.millis(5), new Instant(27))
+ .plusDelayOf(Duration.millis(13))),
+
+ // Composite triggers
+
+ toProtoAndBackSpec(
+ AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())),
+ toProtoAndBackSpec(
+ AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))),
+ toProtoAndBackSpec(
+ AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))),
+ toProtoAndBackSpec(
+ AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))),
+ toProtoAndBackSpec(
+ AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))),
+ toProtoAndBackSpec(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42)))
+ .withLateFirings(AfterPane.elementCountAtLeast(3))),
+ toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())),
+ toProtoAndBackSpec(
+ Repeatedly.forever(AfterPane.elementCountAtLeast(1))
+ .orFinally(AfterWatermark.pastEndOfWindow())));
+ }
+
+ @Parameter(0)
+ public ToProtoAndBackSpec toProtoAndBackSpec;
+
+ @Test
+ public void testToProtoAndBack() throws Exception {
+ Trigger trigger = toProtoAndBackSpec.getTrigger();
+ Trigger toProtoAndBackTrigger = Triggers.fromProto(Triggers.toProto(trigger));
+
+ assertThat(toProtoAndBackTrigger, equalTo(trigger));
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index affd1a9..f066abf 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -77,6 +77,11 @@
<artifactId>beam-sdks-common-runner-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-core-construction-java</artifactId>
+ </dependency>
+
<!-- build dependencies -->
<dependency>
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
index e3ce1ef..5508b2e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaOutputBufferDoFn.java
@@ -19,10 +19,10 @@ package org.apache.beam.runners.core;
import java.util.ArrayList;
import java.util.List;
+import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.joda.time.Instant;
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 8dc1502..bf48df1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -17,12 +17,12 @@
*/
package org.apache.beam.runners.core;
+import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
index 444f8fe..8fff0e4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetNewDoFn.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.core;
import java.util.Collection;
+import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.sdk.transforms.Aggregator;
@@ -25,7 +26,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.SystemDoFnInternal;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index d18a1c3..512420f 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -38,6 +38,7 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
@@ -60,7 +61,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.AppliedCombineFn;
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index f7fd4cf..b4ca998 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -34,6 +34,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
@@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 1f2fcb6..0e74fa2 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -30,6 +30,7 @@ import org.apache.beam.runners.core.ReduceFnRunner;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.spark.SparkPipelineOptions;
@@ -47,7 +48,6 @@ import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -67,7 +67,6 @@ import org.apache.spark.streaming.dstream.PairDStreamFunctions;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.Function1;
import scala.Option;
import scala.Tuple2;
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
index 2b16c60..d19c4a9 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java
@@ -31,6 +31,7 @@ import org.apache.beam.runners.core.StateInternalsFactory;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.runners.core.TimerInternals;
import org.apache.beam.runners.core.UnsupportedSideInputReader;
+import org.apache.beam.runners.core.construction.Triggers;
import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachines;
import org.apache.beam.runners.spark.aggregators.NamedAggregators;
@@ -38,7 +39,6 @@ import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.Triggers;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.KV;
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
index 591af37..47f05e4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Triggers.java
@@ -35,6 +35,7 @@ import org.joda.time.Instant;
/** Utilities for working with {@link Triggers Triggers}. */
@Experimental(Experimental.Kind.TRIGGER)
+@Deprecated
public class Triggers implements Serializable {
@VisibleForTesting static final ProtoConverter CONVERTER = new ProtoConverter();
http://git-wip-us.apache.org/repos/asf/beam/blob/facdc108/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java
deleted file mode 100644
index 0ac5966..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggersTest.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.windowing;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.collect.ImmutableList;
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
-
-/** Tests for utilities in {@link Triggers}. */
-@RunWith(Parameterized.class)
-public class TriggersTest {
-
- @AutoValue
- abstract static class ToProtoAndBackSpec {
- abstract Trigger getTrigger();
- }
-
- private static ToProtoAndBackSpec toProtoAndBackSpec(Trigger trigger) {
- return new AutoValue_TriggersTest_ToProtoAndBackSpec(trigger);
- }
-
- @Parameters(name = "{index}: {0}")
- public static Iterable<ToProtoAndBackSpec> data() {
- return ImmutableList.of(
- // Atomic triggers
- toProtoAndBackSpec(AfterWatermark.pastEndOfWindow()),
- toProtoAndBackSpec(AfterPane.elementCountAtLeast(73)),
- toProtoAndBackSpec(AfterSynchronizedProcessingTime.ofFirstElement()),
- toProtoAndBackSpec(Never.ever()),
- toProtoAndBackSpec(DefaultTrigger.of()),
- toProtoAndBackSpec(AfterProcessingTime.pastFirstElementInPane()),
- toProtoAndBackSpec(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(23))),
- toProtoAndBackSpec(
- AfterProcessingTime.pastFirstElementInPane()
- .alignedTo(Duration.millis(5), new Instant(27))),
- toProtoAndBackSpec(
- AfterProcessingTime.pastFirstElementInPane()
- .plusDelayOf(Duration.standardSeconds(3))
- .alignedTo(Duration.millis(5), new Instant(27))
- .plusDelayOf(Duration.millis(13))),
-
- // Composite triggers
-
- toProtoAndBackSpec(
- AfterAll.of(AfterPane.elementCountAtLeast(79), AfterWatermark.pastEndOfWindow())),
- toProtoAndBackSpec(
- AfterEach.inOrder(AfterPane.elementCountAtLeast(79), AfterPane.elementCountAtLeast(3))),
- toProtoAndBackSpec(
- AfterFirst.of(AfterWatermark.pastEndOfWindow(), AfterPane.elementCountAtLeast(3))),
- toProtoAndBackSpec(
- AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(3))),
- toProtoAndBackSpec(
- AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(3))),
- toProtoAndBackSpec(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(
- AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(42)))
- .withLateFirings(AfterPane.elementCountAtLeast(3))),
- toProtoAndBackSpec(Repeatedly.forever(AfterWatermark.pastEndOfWindow())),
- toProtoAndBackSpec(
- Repeatedly.forever(AfterPane.elementCountAtLeast(1))
- .orFinally(AfterWatermark.pastEndOfWindow())));
- }
-
- @Parameter(0)
- public ToProtoAndBackSpec toProtoAndBackSpec;
-
- @Test
- public void testToProtoAndBack() throws Exception {
- Trigger trigger = toProtoAndBackSpec.getTrigger();
- Trigger toProtoAndBackTrigger = Triggers.fromProto(Triggers.toProto(trigger));
-
- assertThat(toProtoAndBackTrigger, equalTo(trigger));
- }
-}
[2/2] beam git commit: This closes #2513
Posted by dh...@apache.org.
This closes #2513
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dc672f42
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dc672f42
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dc672f42
Branch: refs/heads/master
Commit: dc672f42083c375872d87030523545866cea4f55
Parents: 8479094 facdc10
Author: Dan Halperin <dh...@google.com>
Authored: Wed Apr 12 16:08:35 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Apr 12 16:08:35 2017 -0700
----------------------------------------------------------------------
.../runners/core/construction/Triggers.java | 336 +++++++++++++++++++
.../core/construction/WindowingStrategies.java | 1 -
.../runners/core/construction/TriggersTest.java | 111 ++++++
runners/core-java/pom.xml | 5 +
.../GroupAlsoByWindowViaOutputBufferDoFn.java | 2 +-
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 2 +-
.../GroupAlsoByWindowViaWindowSetNewDoFn.java | 2 +-
.../beam/runners/core/ReduceFnTester.java | 2 +-
.../GroupAlsoByWindowEvaluatorFactory.java | 2 +-
.../SparkGroupAlsoByWindowViaWindowSet.java | 3 +-
...SparkGroupAlsoByWindowViaOutputBufferFn.java | 2 +-
.../beam/sdk/transforms/windowing/Triggers.java | 1 +
.../sdk/transforms/windowing/TriggersTest.java | 100 ------
13 files changed, 460 insertions(+), 109 deletions(-)
----------------------------------------------------------------------