You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2021/10/27 19:28:00 UTC

[jira] [Work logged] (BEAM-742) Move trigger state machines to runners-core, convert triggers to AST

     [ https://issues.apache.org/jira/browse/BEAM-742?focusedWorklogId=670984&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-670984 ]

ASF GitHub Bot logged work on BEAM-742:
---------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Oct/21 19:27
            Start Date: 27/Oct/21 19:27
    Worklog Time Spent: 10m 
      Work Description: TheNeuralBit commented on a change in pull request #1083:
URL: https://github.com/apache/beam/pull/1083#discussion_r737779636



##########
File path: runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java
##########
@@ -81,66 +91,75 @@ public void injectElements(int... values) throws Exception {
       injectElements(timestampedValues);
     }
 
-    public SimpleTriggerTester<W> withAllowedLateness(Duration allowedLateness) throws Exception {
-      return new SimpleTriggerTester<>(
-          windowingStrategy.withAllowedLateness(allowedLateness));
+    public SimpleTriggerStateMachineTester<W> withAllowedLateness(Duration allowedLateness)
+        throws Exception {
+      return new SimpleTriggerStateMachineTester<>(
+              executableTrigger,
+              windowFn,
+              allowedLateness);
     }
   }
 
-  protected final WindowingStrategy<Object, W> windowingStrategy;
-
   private final TestInMemoryStateInternals<?> stateInternals =
       new TestInMemoryStateInternals<Object>(null /* key */);
   private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals();
-  private final TriggerContextFactory<W> contextFactory;
-  private final WindowFn<Object, W> windowFn;
+  private final TriggerStateMachineContextFactory<W> contextFactory;
+  protected final WindowFn<Object, W> windowFn;
   private final ActiveWindowSet<W> activeWindows;
   private final Map<W, W> windowToMergeResult;
 
   /**
-   * An {@link ExecutableTrigger} built from the {@link Trigger} or {@link Trigger}
-   * under test.
+   * An {@link ExecutableTriggerStateMachine} under test.
    */
-  private final ExecutableTrigger executableTrigger;
+  protected final ExecutableTriggerStateMachine executableTrigger;
 
   /**
    * A map from a window and trigger to whether that trigger is finished for the window.
    */
   private final Map<W, FinishedTriggers> finishedSets;
 
-  public static <W extends BoundedWindow> SimpleTriggerTester<W> forTrigger(
-      Trigger trigger, WindowFn<Object, W> windowFn)
+  public static <W extends BoundedWindow> SimpleTriggerStateMachineTester<W> forTrigger(
+      TriggerStateMachine stateMachine, WindowFn<Object, W> windowFn)
           throws Exception {
-    WindowingStrategy<Object, W> windowingStrategy =
-        WindowingStrategy.of(windowFn).withTrigger(trigger)
-        // Merging requires accumulation mode or early firings can break up a session.
-        // Not currently an issue with the tester (because we never GC) but we don't want
-        // mystery failures due to violating this need.
-        .withMode(windowFn.isNonMerging()
-            ? AccumulationMode.DISCARDING_FIRED_PANES
-            : AccumulationMode.ACCUMULATING_FIRED_PANES);
 
-    return new SimpleTriggerTester<>(windowingStrategy);
-  }
+    ExecutableTriggerStateMachine executableTriggerStateMachine =
+        ExecutableTriggerStateMachine.create(stateMachine);
 
-  public static <InputT, W extends BoundedWindow> TriggerTester<InputT, W> forAdvancedTrigger(
-      Trigger trigger, WindowFn<Object, W> windowFn) throws Exception {
-    WindowingStrategy<Object, W> strategy =
-        WindowingStrategy.of(windowFn).withTrigger(trigger)
-        // Merging requires accumulation mode or early firings can break up a session.
-        // Not currently an issue with the tester (because we never GC) but we don't want
-        // mystery failures due to violating this need.
-        .withMode(windowFn.isNonMerging()
+    // Merging requires accumulation mode or early firings can break up a session.
+    // Not currently an issue with the tester (because we never GC) but we don't want
+    // mystery failures due to violating this need.
+    AccumulationMode mode =
+        windowFn.isNonMerging()
             ? AccumulationMode.DISCARDING_FIRED_PANES
-            : AccumulationMode.ACCUMULATING_FIRED_PANES);
+            : AccumulationMode.ACCUMULATING_FIRED_PANES;
+
+    return new SimpleTriggerStateMachineTester<>(
+        executableTriggerStateMachine, windowFn, Duration.ZERO);
+  }
 
-    return new TriggerTester<>(strategy);
+  public static <InputT, W extends BoundedWindow>
+  TriggerStateMachineTester<InputT, W> forAdvancedTrigger(
+          TriggerStateMachine stateMachine, WindowFn<Object, W> windowFn) throws Exception {
+    ExecutableTriggerStateMachine executableTriggerStateMachine =
+        ExecutableTriggerStateMachine.create(stateMachine);
+
+    // Merging requires accumulation mode or early firings can break up a session.
+    // Not currently an issue with the tester (because we never GC) but we don't want
+    // mystery failures due to violating this need.
+    AccumulationMode mode =

Review comment:
       With this change the accumulation mode is unused. Was that intentional? Can we just remove it?
   
   (Context: there's a PR out that would enable an errorprone check for unused fields https://github.com/apache/beam/pull/15742, and this is one case it's triggering on)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 670984)
    Remaining Estimate: 0h
            Time Spent: 10m

> Move trigger state machines to runners-core, convert triggers to AST
> --------------------------------------------------------------------
>
>                 Key: BEAM-742
>                 URL: https://issues.apache.org/jira/browse/BEAM-742
>             Project: Beam
>          Issue Type: Sub-task
>          Components: beam-model, runner-core, sdk-java-core
>            Reporter: Kenneth Knowles
>            Assignee: Kenneth Knowles
>            Priority: P2
>              Labels: Done
>             Fix For: 0.4.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)