You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/18 03:38:48 UTC

[17/18] incubator-beam git commit: Add TriggerStateMachines with conversion from Trigger

Add TriggerStateMachines with conversion from Trigger


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/00672961
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/00672961
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/00672961

Branch: refs/heads/master
Commit: 00672961b5a3115c298c457dfe43f543947298a0
Parents: 2107f79
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:02:52 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:49 2016 -0700

----------------------------------------------------------------------
 .../core/triggers/TriggerStateMachines.java     | 210 +++++++++++++++++++
 .../core/triggers/TriggerStateMachinesTest.java | 199 ++++++++++++++++++
 2 files changed, 409 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00672961/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
new file mode 100644
index 0000000..317e3b9
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/** Translates a {@link Trigger} to a {@link TriggerStateMachine}. */
+public class TriggerStateMachines {
+
+  private TriggerStateMachines() {}
+
+  @VisibleForTesting static final StateMachineConverter CONVERTER = new StateMachineConverter();
+
+  public static TriggerStateMachine stateMachineForTrigger(Trigger trigger) {
+    return CONVERTER.evaluateTrigger(trigger);
+  }
+
+  public static OnceTriggerStateMachine stateMachineForOnceTrigger(OnceTrigger trigger) {
+    return CONVERTER.evaluateOnceTrigger(trigger);
+  }
+
+  @VisibleForTesting
+  static class StateMachineConverter {
+
+    public TriggerStateMachine evaluateTrigger(Trigger trigger) {
+      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+      return tryEvaluate(evaluationMethod, trigger);
+    }
+
+    public OnceTriggerStateMachine evaluateOnceTrigger(OnceTrigger trigger) {
+      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+      return (OnceTriggerStateMachine) tryEvaluate(evaluationMethod, trigger);
+    }
+
+    private TriggerStateMachine tryEvaluate(Method evaluationMethod, Trigger trigger) {
+      try {
+        return (TriggerStateMachine) evaluationMethod.invoke(this, trigger);
+      } catch (InvocationTargetException exc) {
+        if (exc.getCause() instanceof RuntimeException) {
+          throw (RuntimeException) exc.getCause();
+        } else {
+          throw new RuntimeException(exc.getCause());
+        }
+      } catch (IllegalAccessException exc) {
+        throw new IllegalStateException(
+            String.format("Internal error: could not invoke %s", evaluationMethod));
+      }
+    }
+
+    private Method getEvaluationMethod(Class<?> clazz) {
+      Method evaluationMethod;
+      try {
+        return getClass().getDeclaredMethod("evaluateSpecific", clazz);
+      } catch (NoSuchMethodException exc) {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Cannot translate trigger class %s to a state machine.", clazz.getCanonicalName()),
+            exc);
+      }
+    }
+
+    private TriggerStateMachine evaluateSpecific(DefaultTrigger v) {
+      return DefaultTriggerStateMachine.of();
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterWatermark.FromEndOfWindow v) {
+      return AfterWatermarkStateMachine.pastEndOfWindow();
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(NeverTrigger v) {
+      return NeverStateMachine.ever();
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterSynchronizedProcessingTime v) {
+      return new AfterSynchronizedProcessingTimeStateMachine();
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterFirst v) {
+      List<OnceTriggerStateMachine> subStateMachines =
+          Lists.newArrayListWithCapacity(v.subTriggers().size());
+      for (Trigger subtrigger : v.subTriggers()) {
+        subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger));
+      }
+      return AfterFirstStateMachine.of(subStateMachines);
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterAll v) {
+      List<OnceTriggerStateMachine> subStateMachines =
+          Lists.newArrayListWithCapacity(v.subTriggers().size());
+      for (Trigger subtrigger : v.subTriggers()) {
+        subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger));
+      }
+      return AfterAllStateMachine.of(subStateMachines);
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterPane v) {
+      return AfterPaneStateMachine.elementCountAtLeast(v.getElementCount());
+    }
+
+    private TriggerStateMachine evaluateSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) {
+      AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+          AfterWatermarkStateMachine.pastEndOfWindow()
+              .withEarlyFirings(stateMachineForOnceTrigger(v.getEarlyTrigger()));
+
+      if (v.getLateTrigger() != null) {
+        machine = machine.withLateFirings(stateMachineForOnceTrigger(v.getLateTrigger()));
+      }
+      return machine;
+    }
+
+    private TriggerStateMachine evaluateSpecific(AfterEach v) {
+      List<TriggerStateMachine> subStateMachines =
+          Lists.newArrayListWithCapacity(v.subTriggers().size());
+
+      for (Trigger subtrigger : v.subTriggers()) {
+        subStateMachines.add(stateMachineForTrigger(subtrigger));
+      }
+
+      return AfterEachStateMachine.inOrder(subStateMachines);
+    }
+
+    private TriggerStateMachine evaluateSpecific(Repeatedly v) {
+      return RepeatedlyStateMachine.forever(stateMachineForTrigger(v.getRepeatedTrigger()));
+    }
+
+    private TriggerStateMachine evaluateSpecific(OrFinallyTrigger v) {
+      return new OrFinallyStateMachine(
+          stateMachineForTrigger(v.getMainTrigger()),
+          stateMachineForOnceTrigger(v.getUntilTrigger()));
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterProcessingTime v) {
+      return evaluateSpecific((AfterDelayFromFirstElement) v);
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(final AfterDelayFromFirstElement v) {
+      return new AfterDelayFromFirstElementStateMachineAdapter(v);
+    }
+
+    private static class AfterDelayFromFirstElementStateMachineAdapter
+        extends AfterDelayFromFirstElementStateMachine {
+
+      public AfterDelayFromFirstElementStateMachineAdapter(AfterDelayFromFirstElement v) {
+        this(v.getTimeDomain(), v.getTimestampMappers());
+      }
+
+      private AfterDelayFromFirstElementStateMachineAdapter(
+          TimeDomain timeDomain, List<SerializableFunction<Instant, Instant>> timestampMappers) {
+        super(timeDomain, timestampMappers);
+      }
+
+      @Override
+      public Instant getCurrentTime(TriggerContext context) {
+        switch (timeDomain) {
+          case PROCESSING_TIME:
+            return context.currentProcessingTime();
+          case SYNCHRONIZED_PROCESSING_TIME:
+            return context.currentSynchronizedProcessingTime();
+          case EVENT_TIME:
+            return context.currentEventTime();
+          default:
+            throw new IllegalArgumentException("A time domain that doesn't exist was received!");
+        }
+      }
+
+      @Override
+      protected AfterDelayFromFirstElementStateMachine newWith(
+          List<SerializableFunction<Instant, Instant>> transform) {
+        return new AfterDelayFromFirstElementStateMachineAdapter(timeDomain, transform);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00672961/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
new file mode 100644
index 0000000..37f8f10
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests the {@link TriggerStateMachines} static utility methods. */
+@RunWith(JUnit4.class)
+public class TriggerStateMachinesTest {
+
+  //
+  // Tests for leaf trigger translation
+  //
+
+  @Test
+  public void testStateMachineForAfterPane() {
+    int count = 37;
+    AfterPane trigger = AfterPane.elementCountAtLeast(count);
+    AfterPaneStateMachine machine =
+        (AfterPaneStateMachine) TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+
+    assertThat(machine.getElementCount(), equalTo(trigger.getElementCount()));
+  }
+
+  @Test
+  public void testStateMachineForAfterProcessingTime() {
+    Duration minutes = Duration.standardMinutes(94);
+    Duration hours = Duration.standardHours(13);
+
+    AfterDelayFromFirstElement trigger =
+        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(minutes).alignedTo(hours);
+
+    AfterDelayFromFirstElementStateMachine machine =
+        (AfterDelayFromFirstElementStateMachine)
+            TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+
+    assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME));
+
+    // This equality is function equality, but due to the structure of the code (no serialization)
+    // it is OK to check
+    assertThat(machine.getTimestampMappers(), equalTo(trigger.getTimestampMappers()));
+  }
+
+  @Test
+  public void testStateMachineForAfterWatermark() {
+    AfterWatermark.FromEndOfWindow trigger = AfterWatermark.pastEndOfWindow();
+    AfterWatermarkStateMachine.FromEndOfWindow machine =
+        (AfterWatermarkStateMachine.FromEndOfWindow)
+            TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+    // No parameters, so if it doesn't crash, we win!
+  }
+
+  @Test
+  public void testDefaultTriggerTranslation() {
+    DefaultTrigger trigger = DefaultTrigger.of();
+    DefaultTriggerStateMachine machine =
+        (DefaultTriggerStateMachine)
+            checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger));
+    // No parameters, so if it doesn't crash, we win!
+  }
+
+  @Test
+  public void testNeverTranslation() {
+    NeverTrigger trigger = Never.ever();
+    NeverStateMachine machine =
+        (NeverStateMachine) checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger));
+    // No parameters, so if it doesn't crash, we win!
+  }
+
+  //
+  // Tests for composite trigger translation
+  //
+  // These check just that translation was invoked recursively using somewhat random
+  // leaf subtriggers; by induction it all holds together. Beyond this, explicit tests
+  // of particular triggers will suffice.
+
+  private static final int ELEM_COUNT = 472;
+  private static final Duration DELAY = Duration.standardSeconds(95673);
+
+  private final OnceTrigger subtrigger1 = AfterPane.elementCountAtLeast(ELEM_COUNT);
+  private final OnceTrigger subtrigger2 =
+      AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DELAY);
+
+  private final OnceTriggerStateMachine submachine1 =
+      TriggerStateMachines.stateMachineForOnceTrigger(subtrigger1);
+  private final OnceTriggerStateMachine submachine2 =
+      TriggerStateMachines.stateMachineForOnceTrigger(subtrigger2);
+
+  @Test
+  public void testAfterEachTranslation() {
+    AfterEach trigger = AfterEach.inOrder(subtrigger1, subtrigger2);
+    AfterEachStateMachine machine =
+        (AfterEachStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(machine, equalTo(AfterEachStateMachine.inOrder(submachine1, submachine2)));
+  }
+
+  @Test
+  public void testAfterFirstTranslation() {
+    AfterFirst trigger = AfterFirst.of(subtrigger1, subtrigger2);
+    AfterFirstStateMachine machine =
+        (AfterFirstStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(machine, equalTo(AfterFirstStateMachine.of(submachine1, submachine2)));
+  }
+
+  @Test
+  public void testAfterAllTranslation() {
+    AfterAll trigger = AfterAll.of(subtrigger1, subtrigger2);
+    AfterAllStateMachine machine =
+        (AfterAllStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(machine, equalTo(AfterAllStateMachine.of(submachine1, submachine2)));
+  }
+
+  @Test
+  public void testAfterWatermarkEarlyTranslation() {
+    AfterWatermark.AfterWatermarkEarlyAndLate trigger =
+        AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1);
+    AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+        (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
+            TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(
+        machine,
+        equalTo(AfterWatermarkStateMachine.pastEndOfWindow().withEarlyFirings(submachine1)));
+  }
+
+  @Test
+  public void testAfterWatermarkEarlyLateTranslation() {
+    AfterWatermark.AfterWatermarkEarlyAndLate trigger =
+        AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1).withLateFirings(subtrigger2);
+    AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+        (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
+            TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(
+        machine,
+        equalTo(
+            AfterWatermarkStateMachine.pastEndOfWindow()
+                .withEarlyFirings(submachine1)
+                .withLateFirings(submachine2)));
+  }
+
+  @Test
+  public void testOrFinallyTranslation() {
+    OrFinallyTrigger trigger = subtrigger1.orFinally(subtrigger2);
+    OrFinallyStateMachine machine =
+        (OrFinallyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(machine, equalTo(submachine1.orFinally(submachine2)));
+  }
+
+  @Test
+  public void testRepeatedlyTranslation() {
+    Repeatedly trigger = Repeatedly.forever(subtrigger1);
+    RepeatedlyStateMachine machine =
+        (RepeatedlyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(machine, equalTo(RepeatedlyStateMachine.forever(submachine1)));
+  }
+}