You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/18 03:38:48 UTC
[17/18] incubator-beam git commit: Add TriggerStateMachines with
conversion from Trigger
Add TriggerStateMachines with conversion from Trigger
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/00672961
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/00672961
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/00672961
Branch: refs/heads/master
Commit: 00672961b5a3115c298c457dfe43f543947298a0
Parents: 2107f79
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:02:52 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:49 2016 -0700
----------------------------------------------------------------------
.../core/triggers/TriggerStateMachines.java | 210 +++++++++++++++++++
.../core/triggers/TriggerStateMachinesTest.java | 199 ++++++++++++++++++
2 files changed, 409 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00672961/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
new file mode 100644
index 0000000..317e3b9
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/** Translates a {@link Trigger} to a {@link TriggerStateMachine}. */
+public class TriggerStateMachines {
+
+ private TriggerStateMachines() {}
+
+ @VisibleForTesting static final StateMachineConverter CONVERTER = new StateMachineConverter();
+
+ public static TriggerStateMachine stateMachineForTrigger(Trigger trigger) {
+ return CONVERTER.evaluateTrigger(trigger);
+ }
+
+ public static OnceTriggerStateMachine stateMachineForOnceTrigger(OnceTrigger trigger) {
+ return CONVERTER.evaluateOnceTrigger(trigger);
+ }
+
+ @VisibleForTesting
+ static class StateMachineConverter {
+
+ public TriggerStateMachine evaluateTrigger(Trigger trigger) {
+ Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+ return tryEvaluate(evaluationMethod, trigger);
+ }
+
+ public OnceTriggerStateMachine evaluateOnceTrigger(OnceTrigger trigger) {
+ Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+ return (OnceTriggerStateMachine) tryEvaluate(evaluationMethod, trigger);
+ }
+
+ private TriggerStateMachine tryEvaluate(Method evaluationMethod, Trigger trigger) {
+ try {
+ return (TriggerStateMachine) evaluationMethod.invoke(this, trigger);
+ } catch (InvocationTargetException exc) {
+ if (exc.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) exc.getCause();
+ } else {
+ throw new RuntimeException(exc.getCause());
+ }
+ } catch (IllegalAccessException exc) {
+ throw new IllegalStateException(
+ String.format("Internal error: could not invoke %s", evaluationMethod));
+ }
+ }
+
+ private Method getEvaluationMethod(Class<?> clazz) {
+ Method evaluationMethod;
+ try {
+ return getClass().getDeclaredMethod("evaluateSpecific", clazz);
+ } catch (NoSuchMethodException exc) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot translate trigger class %s to a state machine.", clazz.getCanonicalName()),
+ exc);
+ }
+ }
+
+ private TriggerStateMachine evaluateSpecific(DefaultTrigger v) {
+ return DefaultTriggerStateMachine.of();
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterWatermark.FromEndOfWindow v) {
+ return AfterWatermarkStateMachine.pastEndOfWindow();
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(NeverTrigger v) {
+ return NeverStateMachine.ever();
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterSynchronizedProcessingTime v) {
+ return new AfterSynchronizedProcessingTimeStateMachine();
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterFirst v) {
+ List<OnceTriggerStateMachine> subStateMachines =
+ Lists.newArrayListWithCapacity(v.subTriggers().size());
+ for (Trigger subtrigger : v.subTriggers()) {
+ subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger));
+ }
+ return AfterFirstStateMachine.of(subStateMachines);
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterAll v) {
+ List<OnceTriggerStateMachine> subStateMachines =
+ Lists.newArrayListWithCapacity(v.subTriggers().size());
+ for (Trigger subtrigger : v.subTriggers()) {
+ subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger));
+ }
+ return AfterAllStateMachine.of(subStateMachines);
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterPane v) {
+ return AfterPaneStateMachine.elementCountAtLeast(v.getElementCount());
+ }
+
+ private TriggerStateMachine evaluateSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) {
+ AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+ AfterWatermarkStateMachine.pastEndOfWindow()
+ .withEarlyFirings(stateMachineForOnceTrigger(v.getEarlyTrigger()));
+
+ if (v.getLateTrigger() != null) {
+ machine = machine.withLateFirings(stateMachineForOnceTrigger(v.getLateTrigger()));
+ }
+ return machine;
+ }
+
+ private TriggerStateMachine evaluateSpecific(AfterEach v) {
+ List<TriggerStateMachine> subStateMachines =
+ Lists.newArrayListWithCapacity(v.subTriggers().size());
+
+ for (Trigger subtrigger : v.subTriggers()) {
+ subStateMachines.add(stateMachineForTrigger(subtrigger));
+ }
+
+ return AfterEachStateMachine.inOrder(subStateMachines);
+ }
+
+ private TriggerStateMachine evaluateSpecific(Repeatedly v) {
+ return RepeatedlyStateMachine.forever(stateMachineForTrigger(v.getRepeatedTrigger()));
+ }
+
+ private TriggerStateMachine evaluateSpecific(OrFinallyTrigger v) {
+ return new OrFinallyStateMachine(
+ stateMachineForTrigger(v.getMainTrigger()),
+ stateMachineForOnceTrigger(v.getUntilTrigger()));
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterProcessingTime v) {
+ return evaluateSpecific((AfterDelayFromFirstElement) v);
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(final AfterDelayFromFirstElement v) {
+ return new AfterDelayFromFirstElementStateMachineAdapter(v);
+ }
+
+ private static class AfterDelayFromFirstElementStateMachineAdapter
+ extends AfterDelayFromFirstElementStateMachine {
+
+ public AfterDelayFromFirstElementStateMachineAdapter(AfterDelayFromFirstElement v) {
+ this(v.getTimeDomain(), v.getTimestampMappers());
+ }
+
+ private AfterDelayFromFirstElementStateMachineAdapter(
+ TimeDomain timeDomain, List<SerializableFunction<Instant, Instant>> timestampMappers) {
+ super(timeDomain, timestampMappers);
+ }
+
+ @Override
+ public Instant getCurrentTime(TriggerContext context) {
+ switch (timeDomain) {
+ case PROCESSING_TIME:
+ return context.currentProcessingTime();
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return context.currentSynchronizedProcessingTime();
+ case EVENT_TIME:
+ return context.currentEventTime();
+ default:
+ throw new IllegalArgumentException("A time domain that doesn't exist was received!");
+ }
+ }
+
+ @Override
+ protected AfterDelayFromFirstElementStateMachine newWith(
+ List<SerializableFunction<Instant, Instant>> transform) {
+ return new AfterDelayFromFirstElementStateMachineAdapter(timeDomain, transform);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00672961/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
new file mode 100644
index 0000000..37f8f10
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests the {@link TriggerStateMachines} static utility methods. */
+@RunWith(JUnit4.class)
+public class TriggerStateMachinesTest {
+
+ //
+ // Tests for leaf trigger translation
+ //
+
+ @Test
+ public void testStateMachineForAfterPane() {
+ int count = 37;
+ AfterPane trigger = AfterPane.elementCountAtLeast(count);
+ AfterPaneStateMachine machine =
+ (AfterPaneStateMachine) TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+
+ assertThat(machine.getElementCount(), equalTo(trigger.getElementCount()));
+ }
+
+ @Test
+ public void testStateMachineForAfterProcessingTime() {
+ Duration minutes = Duration.standardMinutes(94);
+ Duration hours = Duration.standardHours(13);
+
+ AfterDelayFromFirstElement trigger =
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(minutes).alignedTo(hours);
+
+ AfterDelayFromFirstElementStateMachine machine =
+ (AfterDelayFromFirstElementStateMachine)
+ TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+
+ assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME));
+
+ // This equality is function equality, but due to the structure of the code (no serialization)
+ // it is OK to check
+ assertThat(machine.getTimestampMappers(), equalTo(trigger.getTimestampMappers()));
+ }
+
+ @Test
+ public void testStateMachineForAfterWatermark() {
+ AfterWatermark.FromEndOfWindow trigger = AfterWatermark.pastEndOfWindow();
+ AfterWatermarkStateMachine.FromEndOfWindow machine =
+ (AfterWatermarkStateMachine.FromEndOfWindow)
+ TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+ // No parameters, so if it doesn't crash, we win!
+ }
+
+ @Test
+ public void testDefaultTriggerTranslation() {
+ DefaultTrigger trigger = DefaultTrigger.of();
+ DefaultTriggerStateMachine machine =
+ (DefaultTriggerStateMachine)
+ checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger));
+ // No parameters, so if it doesn't crash, we win!
+ }
+
+ @Test
+ public void testNeverTranslation() {
+ NeverTrigger trigger = Never.ever();
+ NeverStateMachine machine =
+ (NeverStateMachine) checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger));
+ // No parameters, so if it doesn't crash, we win!
+ }
+
+ //
+ // Tests for composite trigger translation
+ //
+ // These check just that translation was invoked recursively using somewhat random
+ // leaf subtriggers; by induction it all holds together. Beyond this, explicit tests
+ // of particular triggers will suffice.
+
+ private static final int ELEM_COUNT = 472;
+ private static final Duration DELAY = Duration.standardSeconds(95673);
+
+ private final OnceTrigger subtrigger1 = AfterPane.elementCountAtLeast(ELEM_COUNT);
+ private final OnceTrigger subtrigger2 =
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DELAY);
+
+ private final OnceTriggerStateMachine submachine1 =
+ TriggerStateMachines.stateMachineForOnceTrigger(subtrigger1);
+ private final OnceTriggerStateMachine submachine2 =
+ TriggerStateMachines.stateMachineForOnceTrigger(subtrigger2);
+
+ @Test
+ public void testAfterEachTranslation() {
+ AfterEach trigger = AfterEach.inOrder(subtrigger1, subtrigger2);
+ AfterEachStateMachine machine =
+ (AfterEachStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(machine, equalTo(AfterEachStateMachine.inOrder(submachine1, submachine2)));
+ }
+
+ @Test
+ public void testAfterFirstTranslation() {
+ AfterFirst trigger = AfterFirst.of(subtrigger1, subtrigger2);
+ AfterFirstStateMachine machine =
+ (AfterFirstStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(machine, equalTo(AfterFirstStateMachine.of(submachine1, submachine2)));
+ }
+
+ @Test
+ public void testAfterAllTranslation() {
+ AfterAll trigger = AfterAll.of(subtrigger1, subtrigger2);
+ AfterAllStateMachine machine =
+ (AfterAllStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(machine, equalTo(AfterAllStateMachine.of(submachine1, submachine2)));
+ }
+
+ @Test
+ public void testAfterWatermarkEarlyTranslation() {
+ AfterWatermark.AfterWatermarkEarlyAndLate trigger =
+ AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1);
+ AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+ (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
+ TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(
+ machine,
+ equalTo(AfterWatermarkStateMachine.pastEndOfWindow().withEarlyFirings(submachine1)));
+ }
+
+ @Test
+ public void testAfterWatermarkEarlyLateTranslation() {
+ AfterWatermark.AfterWatermarkEarlyAndLate trigger =
+ AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1).withLateFirings(subtrigger2);
+ AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+ (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
+ TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(
+ machine,
+ equalTo(
+ AfterWatermarkStateMachine.pastEndOfWindow()
+ .withEarlyFirings(submachine1)
+ .withLateFirings(submachine2)));
+ }
+
+ @Test
+ public void testOrFinallyTranslation() {
+ OrFinallyTrigger trigger = subtrigger1.orFinally(subtrigger2);
+ OrFinallyStateMachine machine =
+ (OrFinallyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(machine, equalTo(submachine1.orFinally(submachine2)));
+ }
+
+ @Test
+ public void testRepeatedlyTranslation() {
+ Repeatedly trigger = Repeatedly.forever(subtrigger1);
+ RepeatedlyStateMachine machine =
+ (RepeatedlyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(machine, equalTo(RepeatedlyStateMachine.forever(submachine1)));
+ }
+}