You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/06/30 21:31:28 UTC
[26/50] beam git commit: Removed OnceTriggerStateMachine
Removed OnceTriggerStateMachine
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f9820b1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f9820b1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f9820b1
Branch: refs/heads/gearpump-runner
Commit: 4f9820b1f24103831f3b0a4f5783f9ca726f8cd7
Parents: 0d4c85d
Author: = <=>
Authored: Wed Jun 14 20:11:49 2017 -0400
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Jun 28 20:14:24 2017 -0700
----------------------------------------------------------------------
.../core/triggers/AfterAllStateMachine.java | 25 ++++++++----------
.../AfterDelayFromFirstElementStateMachine.java | 6 ++---
.../core/triggers/AfterFirstStateMachine.java | 20 +++++++--------
.../core/triggers/AfterPaneStateMachine.java | 6 ++---
.../triggers/AfterWatermarkStateMachine.java | 7 ++---
.../triggers/ExecutableTriggerStateMachine.java | 23 +++--------------
.../core/triggers/NeverStateMachine.java | 5 ++--
.../core/triggers/TriggerStateMachine.java | 27 --------------------
.../triggers/AfterFirstStateMachineTest.java | 5 ++--
.../AfterWatermarkStateMachineTest.java | 7 +++--
.../core/triggers/StubTriggerStateMachine.java | 7 +++--
11 files changed, 44 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
index 0f0c17c..3530ed1 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
@@ -23,7 +23,6 @@ import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
import org.apache.beam.sdk.annotations.Experimental;
/**
@@ -31,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental;
* have fired.
*/
@Experimental(Experimental.Kind.TRIGGER)
-public class AfterAllStateMachine extends OnceTriggerStateMachine {
+public class AfterAllStateMachine extends TriggerStateMachine {
private AfterAllStateMachine(List<TriggerStateMachine> subTriggers) {
super(subTriggers);
@@ -42,11 +41,11 @@ public class AfterAllStateMachine extends OnceTriggerStateMachine {
* Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
*/
@SafeVarargs
- public static OnceTriggerStateMachine of(TriggerStateMachine... triggers) {
+ public static TriggerStateMachine of(TriggerStateMachine... triggers) {
return new AfterAllStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
}
- public static OnceTriggerStateMachine of(Iterable<? extends TriggerStateMachine> triggers) {
+ public static TriggerStateMachine of(Iterable<? extends TriggerStateMachine> triggers) {
return new AfterAllStateMachine(ImmutableList.copyOf(triggers));
}
@@ -78,24 +77,21 @@ public class AfterAllStateMachine extends OnceTriggerStateMachine {
*/
@Override
public boolean shouldFire(TriggerContext context) throws Exception {
- for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) {
- if (!context.forTrigger(subtrigger).trigger().isFinished()
- && !subtrigger.invokeShouldFire(context)) {
+ for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) {
+ if (!context.forTrigger(subTrigger).trigger().isFinished()
+ && !subTrigger.invokeShouldFire(context)) {
return false;
}
}
return true;
}
- /**
- * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire}
- * because they all must be ready to fire.
- */
@Override
- public void onOnlyFiring(TriggerContext context) throws Exception {
- for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) {
- subtrigger.invokeOnFire(context);
+ public void onFire(TriggerContext context) throws Exception {
+ for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) {
+ subTrigger.invokeOnFire(context);
}
+ context.trigger().setFinished(true);
}
@Override
@@ -103,7 +99,6 @@ public class AfterAllStateMachine extends OnceTriggerStateMachine {
StringBuilder builder = new StringBuilder("AfterAll.of(");
Joiner.on(", ").appendTo(builder, subTriggers);
builder.append(")");
-
return builder.toString();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 8d8d0de..06c2066 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -27,7 +27,6 @@ import org.apache.beam.runners.core.StateAccessor;
import org.apache.beam.runners.core.StateMerging;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.state.CombiningState;
@@ -50,7 +49,7 @@ import org.joda.time.format.PeriodFormatter;
// This class should be inlined to subclasses and deleted, simplifying them too
// https://issues.apache.org/jira/browse/BEAM-1486
@Experimental(Experimental.Kind.TRIGGER)
-public abstract class AfterDelayFromFirstElementStateMachine extends OnceTriggerStateMachine {
+public abstract class AfterDelayFromFirstElementStateMachine extends TriggerStateMachine {
protected static final List<SerializableFunction<Instant, Instant>> IDENTITY =
ImmutableList.<SerializableFunction<Instant, Instant>>of();
@@ -237,8 +236,9 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
}
@Override
- protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception {
+ public final void onFire(TriggerContext context) throws Exception {
clear(context);
+ context.trigger().setFinished(true);
}
protected Instant computeTargetTimestamp(Instant time) {
http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
index 840a65c..58c24c5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
@@ -23,7 +23,6 @@ import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
import org.apache.beam.sdk.annotations.Experimental;
/**
@@ -31,7 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental;
* sub-triggers have fired.
*/
@Experimental(Experimental.Kind.TRIGGER)
-public class AfterFirstStateMachine extends OnceTriggerStateMachine {
+public class AfterFirstStateMachine extends TriggerStateMachine {
AfterFirstStateMachine(List<TriggerStateMachine> subTriggers) {
super(subTriggers);
@@ -42,12 +41,12 @@ public class AfterFirstStateMachine extends OnceTriggerStateMachine {
* Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
*/
@SafeVarargs
- public static OnceTriggerStateMachine of(
+ public static TriggerStateMachine of(
TriggerStateMachine... triggers) {
return new AfterFirstStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
}
- public static OnceTriggerStateMachine of(
+ public static TriggerStateMachine of(
Iterable<? extends TriggerStateMachine> triggers) {
return new AfterFirstStateMachine(ImmutableList.copyOf(triggers));
}
@@ -79,18 +78,19 @@ public class AfterFirstStateMachine extends OnceTriggerStateMachine {
}
@Override
- protected void onOnlyFiring(TriggerContext context) throws Exception {
- for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) {
- TriggerContext subContext = context.forTrigger(subtrigger);
- if (subtrigger.invokeShouldFire(subContext)) {
+ public void onFire(TriggerContext context) throws Exception {
+ for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) {
+ TriggerContext subContext = context.forTrigger(subTrigger);
+ if (subTrigger.invokeShouldFire(subContext)) {
// If the trigger is ready to fire, then do whatever it needs to do.
- subtrigger.invokeOnFire(subContext);
+ subTrigger.invokeOnFire(subContext);
} else {
// If the trigger is not ready to fire, it is nonetheless true that whatever
// pending pane it was tracking is now gone.
- subtrigger.invokeClear(subContext);
+ subTrigger.invokeClear(subContext);
}
}
+ context.trigger().setFinished(true);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index b9fbac3..1ce035a 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -23,7 +23,6 @@ import org.apache.beam.runners.core.StateAccessor;
import org.apache.beam.runners.core.StateMerging;
import org.apache.beam.runners.core.StateTag;
import org.apache.beam.runners.core.StateTags;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.state.CombiningState;
@@ -33,7 +32,7 @@ import org.apache.beam.sdk.transforms.Sum;
* {@link TriggerStateMachine}s that fire based on properties of the elements in the current pane.
*/
@Experimental(Experimental.Kind.TRIGGER)
-public class AfterPaneStateMachine extends OnceTriggerStateMachine {
+public class AfterPaneStateMachine extends TriggerStateMachine {
private static final StateTag<CombiningState<Long, long[], Long>>
ELEMENTS_IN_PANE_TAG =
@@ -130,7 +129,8 @@ private static final StateTag<CombiningState<Long, long[], Long>>
}
@Override
- protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception {
+ public void onFire(TriggerStateMachine.TriggerContext context) throws Exception {
clear(context);
+ context.trigger().setFinished(true);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
index c9eee15..509c96b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
@@ -22,7 +22,6 @@ import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.collect.ImmutableList;
import java.util.Objects;
import javax.annotation.Nullable;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.state.TimeDomain;
@@ -242,7 +241,7 @@ public class AfterWatermarkStateMachine {
/**
* A watermark trigger targeted relative to the end of the window.
*/
- public static class FromEndOfWindow extends OnceTriggerStateMachine {
+ public static class FromEndOfWindow extends TriggerStateMachine {
private FromEndOfWindow() {
super(null);
@@ -319,6 +318,8 @@ public class AfterWatermarkStateMachine {
}
@Override
- protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { }
+ public void onFire(TriggerStateMachine.TriggerContext context) throws Exception {
+ context.trigger().setFinished(true);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
index c4d89c2..cdcff64 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/ExecutableTriggerStateMachine.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkState;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
/**
@@ -46,17 +45,14 @@ public class ExecutableTriggerStateMachine implements Serializable {
private static <W extends BoundedWindow> ExecutableTriggerStateMachine create(
TriggerStateMachine trigger, int nextUnusedIndex) {
- if (trigger instanceof OnceTriggerStateMachine) {
- return new ExecutableOnceTriggerStateMachine(
- (OnceTriggerStateMachine) trigger, nextUnusedIndex);
- } else {
+
return new ExecutableTriggerStateMachine(trigger, nextUnusedIndex);
- }
+
}
public static <W extends BoundedWindow> ExecutableTriggerStateMachine createForOnceTrigger(
- OnceTriggerStateMachine trigger, int nextUnusedIndex) {
- return new ExecutableOnceTriggerStateMachine(trigger, nextUnusedIndex);
+ TriggerStateMachine trigger, int nextUnusedIndex) {
+ return new ExecutableTriggerStateMachine(trigger, nextUnusedIndex);
}
private ExecutableTriggerStateMachine(TriggerStateMachine trigger, int nextUnusedIndex) {
@@ -146,15 +142,4 @@ public class ExecutableTriggerStateMachine implements Serializable {
public void invokeClear(TriggerStateMachine.TriggerContext c) throws Exception {
trigger.clear(c.forTrigger(this));
}
-
- /**
- * {@link ExecutableTriggerStateMachine} that enforces the fact that the trigger should always
- * FIRE_AND_FINISH and never just FIRE.
- */
- private static class ExecutableOnceTriggerStateMachine extends ExecutableTriggerStateMachine {
-
- public ExecutableOnceTriggerStateMachine(OnceTriggerStateMachine trigger, int nextUnusedIndex) {
- super(trigger, nextUnusedIndex);
- }
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
index f32c7a8..f8c5e8b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java
@@ -17,7 +17,6 @@
*/
package org.apache.beam.runners.core.triggers;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -27,7 +26,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
* <p>Using this trigger will only produce output when the watermark passes the end of the
* {@link BoundedWindow window} plus the allowed lateness.
*/
-public final class NeverStateMachine extends OnceTriggerStateMachine {
+public final class NeverStateMachine extends TriggerStateMachine {
/**
* Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey}
* when the {@link BoundedWindow} closes.
@@ -53,7 +52,7 @@ public final class NeverStateMachine extends OnceTriggerStateMachine {
}
@Override
- protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) {
+ public void onFire(TriggerStateMachine.TriggerContext context) {
throw new UnsupportedOperationException(
String.format("%s should never fire", getClass().getSimpleName()));
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
index 6a2cf0c..880aa48 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachine.java
@@ -453,35 +453,8 @@ public abstract class TriggerStateMachine implements Serializable {
* }
* </pre>
*
- * <p>Note that if {@code t1} is {@link OnceTriggerStateMachine}, then {@code t1.orFinally(t2)} is
- * the same as {@code AfterFirst.of(t1, t2)}.
*/
public TriggerStateMachine orFinally(TriggerStateMachine until) {
return new OrFinallyStateMachine(this, until);
}
-
- /**
- * {@link TriggerStateMachine}s that are guaranteed to fire at most once should extend from this,
- * rather than the general {@link TriggerStateMachine} class to indicate that behavior.
- */
- public abstract static class OnceTriggerStateMachine extends TriggerStateMachine {
- protected OnceTriggerStateMachine(List<TriggerStateMachine> subTriggers) {
- super(subTriggers);
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public final void onFire(TriggerContext context) throws Exception {
- onOnlyFiring(context);
- context.trigger().setFinished(true);
- }
-
- /**
- * Called exactly once by {@link #onFire} when the trigger is fired. By default,
- * invokes {@link #onFire} on all subtriggers for which {@link #shouldFire} is {@code true}.
- */
- protected abstract void onOnlyFiring(TriggerContext context) throws Exception;
- }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
index 453c8ff..2be90de 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachineTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.when;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -42,8 +41,8 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class AfterFirstStateMachineTest {
- @Mock private OnceTriggerStateMachine mockTrigger1;
- @Mock private OnceTriggerStateMachine mockTrigger2;
+ @Mock private TriggerStateMachine mockTrigger1;
+ @Mock private TriggerStateMachine mockTrigger2;
private SimpleTriggerStateMachineTester<IntervalWindow> tester;
private static TriggerStateMachine.TriggerContext anyTriggerContext() {
return Mockito.<TriggerStateMachine.TriggerContext>any();
http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
index e4d10a0..45a5cfb 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachineTest.java
@@ -25,7 +25,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnMergeContext;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachineTester.SimpleTriggerStateMachineTester;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
@@ -46,8 +45,8 @@ import org.mockito.MockitoAnnotations;
@RunWith(JUnit4.class)
public class AfterWatermarkStateMachineTest {
- @Mock private OnceTriggerStateMachine mockEarly;
- @Mock private OnceTriggerStateMachine mockLate;
+ @Mock private TriggerStateMachine mockEarly;
+ @Mock private TriggerStateMachine mockLate;
private SimpleTriggerStateMachineTester<IntervalWindow> tester;
private static TriggerStateMachine.TriggerContext anyTriggerContext() {
@@ -70,7 +69,7 @@ public class AfterWatermarkStateMachineTest {
MockitoAnnotations.initMocks(this);
}
- public void testRunningAsTrigger(OnceTriggerStateMachine mockTrigger, IntervalWindow window)
+ public void testRunningAsTrigger(TriggerStateMachine mockTrigger, IntervalWindow window)
throws Exception {
// Don't fire due to mock saying no
http://git-wip-us.apache.org/repos/asf/beam/blob/4f9820b1/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
index 4512848..1bc757e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/StubTriggerStateMachine.java
@@ -18,12 +18,11 @@
package org.apache.beam.runners.core.triggers;
import com.google.common.collect.Lists;
-import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
/**
- * No-op {@link OnceTriggerStateMachine} implementation for testing.
+ * No-op {@link TriggerStateMachine} implementation for testing.
*/
-abstract class StubTriggerStateMachine extends OnceTriggerStateMachine {
+abstract class StubTriggerStateMachine extends TriggerStateMachine {
/**
* Create a stub {@link TriggerStateMachine} instance which returns the specified name on {@link
* #toString()}.
@@ -42,7 +41,7 @@ abstract class StubTriggerStateMachine extends OnceTriggerStateMachine {
}
@Override
- protected void onOnlyFiring(TriggerContext context) throws Exception {
+ public void onFire(TriggerContext context) throws Exception {
}
@Override