You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/18 03:38:32 UTC
[01/18] incubator-beam git commit: Add access to values from
AfterDelay triggers
Repository: incubator-beam
Updated Branches:
refs/heads/master 7ac725583 -> c5329f9b4
Add access to values from AfterDelay triggers
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4445ac4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4445ac4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4445ac4
Branch: refs/heads/master
Commit: b4445ac43530441463ee11c395bcf631b66ef2e9
Parents: 0df929f
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:13:43 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700
----------------------------------------------------------------------
.../AfterDelayFromFirstElementStateMachine.java | 17 ++++++++++++++++-
1 file changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4445ac4/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index a6616fa..02b156b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -82,7 +82,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
*/
protected final List<SerializableFunction<Instant, Instant>> timestampMappers;
- private final TimeDomain timeDomain;
+ protected final TimeDomain timeDomain;
public AfterDelayFromFirstElementStateMachine(
TimeDomain timeDomain,
@@ -97,6 +97,21 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
}
/**
+ * The time domain according for which this trigger sets timers.
+ */
+ public TimeDomain getTimeDomain() {
+ return timeDomain;
+ }
+
+ /**
+ * The mapping functions applied to the arrival time of an element to determine when to
+ * set a wake-up timer for triggering.
+ */
+ public List<SerializableFunction<Instant, Instant>> getTimestampMappers() {
+ return timestampMappers;
+ }
+
+ /**
* Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
* than the timestamp.
*
[03/18] incubator-beam git commit: Add direct accessors for the
components of OrFinallyTrigger
Posted by ke...@apache.org.
Add direct accessors for the components of OrFinallyTrigger
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8afb80e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8afb80e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8afb80e1
Branch: refs/heads/master
Commit: 8afb80e18f80a9d5a4ed18623a770dbf15ff5e65
Parents: b19918d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:44:35 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700
----------------------------------------------------------------------
.../sdk/transforms/windowing/OrFinallyTrigger.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8afb80e1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
index 25b7b34..1a03450 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
@@ -35,6 +35,21 @@ class OrFinallyTrigger extends Trigger {
super(Arrays.asList(actual, until));
}
+ /**
+ * The main trigger, which will continue firing until the "until" trigger fires. See
+ * {@link #getUntilTrigger()}
+ */
+ public Trigger getMainTrigger() {
+ return subTriggers().get(ACTUAL);
+ }
+
+ /**
+ * The trigger that signals termination of this trigger.
+ */
+ public OnceTrigger getUntilTrigger() {
+ return (OnceTrigger) subTriggers().get(UNTIL);
+ }
+
@Override
public void onElement(OnElementContext c) throws Exception {
c.trigger().subTrigger(ACTUAL).invokeOnElement(c);
[10/18] incubator-beam git commit: Construct AfterAllStateMachine
with a list of subtriggers
Posted by ke...@apache.org.
Construct AfterAllStateMachine with a list of subtriggers
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2bb7c04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2bb7c04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2bb7c04
Branch: refs/heads/master
Commit: b2bb7c048086a3e5eee7d2652d4bb971bc0694e7
Parents: 87c7811
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:42:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700
----------------------------------------------------------------------
.../apache/beam/runners/core/triggers/AfterAllStateMachine.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b2bb7c04/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
index 2f4ad63..12cbc3d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.triggers;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
@@ -45,6 +46,10 @@ public class AfterAllStateMachine extends OnceTriggerStateMachine {
return new AfterAllStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
}
+ public static OnceTriggerStateMachine of(Iterable<? extends TriggerStateMachine> triggers) {
+ return new AfterAllStateMachine(ImmutableList.copyOf(triggers));
+ }
+
@Override
public void onElement(OnElementContext c) throws Exception {
for (ExecutableTriggerStateMachine subTrigger : c.trigger().unfinishedSubTriggers()) {
[13/18] incubator-beam git commit: Accessors for AfterDelay
Posted by ke...@apache.org.
Accessors for AfterDelay
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa9b3812
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa9b3812
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa9b3812
Branch: refs/heads/master
Commit: fa9b3812e7262b1e0368f613d3f667b71f5de59e
Parents: e1c5bfb
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:17:14 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700
----------------------------------------------------------------------
.../windowing/AfterDelayFromFirstElement.java | 15 +++++++++++++++
1 file changed, 15 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa9b3812/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
index c4bc946..6078b34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
@@ -97,6 +97,21 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
}
/**
+ * The time domain according for which this trigger sets timers.
+ */
+ public TimeDomain getTimeDomain() {
+ return timeDomain;
+ }
+
+ /**
+ * The mapping functions applied to the arrival time of an element to determine when to
+ * set a wake-up timer for triggering.
+ */
+ public List<SerializableFunction<Instant, Instant>> getTimestampMappers() {
+ return timestampMappers;
+ }
+
+ /**
* Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
* than the timestamp.
*
[12/18] incubator-beam git commit: Make return types of trigger
static factory methods precise
Posted by ke...@apache.org.
Make return types of trigger static factory methods precise
This is helpful in testing and loses no abstraction - the code
locations calling these methods already know the more-specific
type that will be returned.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e1c5bfbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e1c5bfbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e1c5bfbc
Branch: refs/heads/master
Commit: e1c5bfbc76c0ef3766d5b1bf2dbd47e13f0ed97c
Parents: e0c5766
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:15:32 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/transforms/windowing/AfterAll.java | 2 +-
.../java/org/apache/beam/sdk/transforms/windowing/AfterEach.java | 2 +-
.../org/apache/beam/sdk/transforms/windowing/AfterFirst.java | 2 +-
.../org/apache/beam/sdk/transforms/windowing/AfterWatermark.java | 4 ++--
.../java/org/apache/beam/sdk/transforms/windowing/Never.java | 2 +-
.../java/org/apache/beam/sdk/transforms/windowing/Trigger.java | 2 +-
6 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
index cc8c97f..0e37d33 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
@@ -41,7 +41,7 @@ public class AfterAll extends OnceTrigger {
/**
* Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
*/
- public static OnceTrigger of(OnceTrigger... triggers) {
+ public static AfterAll of(OnceTrigger... triggers) {
return new AfterAll(Arrays.<Trigger>asList(triggers));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
index 629c640..961d97f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
@@ -54,7 +54,7 @@ public class AfterEach extends Trigger {
* Returns an {@code AfterEach} {@code Trigger} with the given subtriggers.
*/
@SafeVarargs
- public static Trigger inOrder(Trigger... triggers) {
+ public static AfterEach inOrder(Trigger... triggers) {
return new AfterEach(Arrays.<Trigger>asList(triggers));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
index 6b06cfa..7840fc4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
@@ -42,7 +42,7 @@ public class AfterFirst extends OnceTrigger {
/**
* Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
*/
- public static OnceTrigger of(OnceTrigger... triggers) {
+ public static AfterFirst of(OnceTrigger... triggers) {
return new AfterFirst(Arrays.<Trigger>asList(triggers));
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
index da96de3..89c1ba9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
@@ -102,11 +102,11 @@ public class AfterWatermark {
this.lateTrigger = lateTrigger;
}
- public Trigger withEarlyFirings(OnceTrigger earlyTrigger) {
+ public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyTrigger) {
return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
}
- public Trigger withLateFirings(OnceTrigger lateTrigger) {
+ public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateTrigger) {
return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index 5f20465..353258b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -34,7 +34,7 @@ public final class Never {
* Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey}
* when the {@link BoundedWindow} closes.
*/
- public static OnceTrigger ever() {
+ public static NeverTrigger ever() {
// NeverTrigger ignores all inputs and is Window-type independent.
return new NeverTrigger();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
index cfabb8b..90e9386 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
@@ -490,7 +490,7 @@ public abstract class Trigger implements Serializable {
* <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same
* as {@code AfterFirst.of(t1, t2)}.
*/
- public Trigger orFinally(OnceTrigger until) {
+ public OrFinallyTrigger orFinally(OnceTrigger until) {
return new OrFinallyTrigger(this, until);
}
[18/18] incubator-beam git commit: This closes #1101
Posted by ke...@apache.org.
This closes #1101
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5329f9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5329f9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5329f9b
Branch: refs/heads/master
Commit: c5329f9b43c51cf1933d28f88ad44adce96ec7b6
Parents: 7ac7255 0067296
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 17 20:37:24 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 20:37:24 2016 -0700
----------------------------------------------------------------------
.../core/triggers/AfterAllStateMachine.java | 5 +
.../AfterDelayFromFirstElementStateMachine.java | 17 +-
.../core/triggers/AfterEachStateMachine.java | 5 +
.../core/triggers/AfterFirstStateMachine.java | 6 +
.../core/triggers/AfterPaneStateMachine.java | 7 +
.../triggers/AfterWatermarkStateMachine.java | 4 +-
.../core/triggers/TriggerStateMachines.java | 210 +++++++++++++++++++
.../core/triggers/TriggerStateMachinesTest.java | 199 ++++++++++++++++++
.../beam/sdk/transforms/windowing/AfterAll.java | 2 +-
.../windowing/AfterDelayFromFirstElement.java | 15 ++
.../sdk/transforms/windowing/AfterEach.java | 2 +-
.../sdk/transforms/windowing/AfterFirst.java | 2 +-
.../sdk/transforms/windowing/AfterPane.java | 7 +
.../AfterSynchronizedProcessingTime.java | 6 +-
.../transforms/windowing/AfterWatermark.java | 12 +-
.../transforms/windowing/DefaultTrigger.java | 2 +-
.../beam/sdk/transforms/windowing/Never.java | 8 +-
.../transforms/windowing/OrFinallyTrigger.java | 17 +-
.../sdk/transforms/windowing/Repeatedly.java | 10 +-
.../beam/sdk/transforms/windowing/Trigger.java | 11 +-
.../apache/beam/sdk/util/ReshuffleTrigger.java | 2 +-
21 files changed, 528 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
[17/18] incubator-beam git commit: Add TriggerStateMachines with
conversion from Trigger
Posted by ke...@apache.org.
Add TriggerStateMachines with conversion from Trigger
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/00672961
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/00672961
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/00672961
Branch: refs/heads/master
Commit: 00672961b5a3115c298c457dfe43f543947298a0
Parents: 2107f79
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:02:52 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:49 2016 -0700
----------------------------------------------------------------------
.../core/triggers/TriggerStateMachines.java | 210 +++++++++++++++++++
.../core/triggers/TriggerStateMachinesTest.java | 199 ++++++++++++++++++
2 files changed, 409 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00672961/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
new file mode 100644
index 0000000..317e3b9
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/** Translates a {@link Trigger} to a {@link TriggerStateMachine}. */
+public class TriggerStateMachines {
+
+ private TriggerStateMachines() {}
+
+ @VisibleForTesting static final StateMachineConverter CONVERTER = new StateMachineConverter();
+
+ public static TriggerStateMachine stateMachineForTrigger(Trigger trigger) {
+ return CONVERTER.evaluateTrigger(trigger);
+ }
+
+ public static OnceTriggerStateMachine stateMachineForOnceTrigger(OnceTrigger trigger) {
+ return CONVERTER.evaluateOnceTrigger(trigger);
+ }
+
+ @VisibleForTesting
+ static class StateMachineConverter {
+
+ public TriggerStateMachine evaluateTrigger(Trigger trigger) {
+ Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+ return tryEvaluate(evaluationMethod, trigger);
+ }
+
+ public OnceTriggerStateMachine evaluateOnceTrigger(OnceTrigger trigger) {
+ Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+ return (OnceTriggerStateMachine) tryEvaluate(evaluationMethod, trigger);
+ }
+
+ private TriggerStateMachine tryEvaluate(Method evaluationMethod, Trigger trigger) {
+ try {
+ return (TriggerStateMachine) evaluationMethod.invoke(this, trigger);
+ } catch (InvocationTargetException exc) {
+ if (exc.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) exc.getCause();
+ } else {
+ throw new RuntimeException(exc.getCause());
+ }
+ } catch (IllegalAccessException exc) {
+ throw new IllegalStateException(
+ String.format("Internal error: could not invoke %s", evaluationMethod));
+ }
+ }
+
+ private Method getEvaluationMethod(Class<?> clazz) {
+ Method evaluationMethod;
+ try {
+ return getClass().getDeclaredMethod("evaluateSpecific", clazz);
+ } catch (NoSuchMethodException exc) {
+ throw new UnsupportedOperationException(
+ String.format(
+ "Cannot translate trigger class %s to a state machine.", clazz.getCanonicalName()),
+ exc);
+ }
+ }
+
+ private TriggerStateMachine evaluateSpecific(DefaultTrigger v) {
+ return DefaultTriggerStateMachine.of();
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterWatermark.FromEndOfWindow v) {
+ return AfterWatermarkStateMachine.pastEndOfWindow();
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(NeverTrigger v) {
+ return NeverStateMachine.ever();
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterSynchronizedProcessingTime v) {
+ return new AfterSynchronizedProcessingTimeStateMachine();
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterFirst v) {
+ List<OnceTriggerStateMachine> subStateMachines =
+ Lists.newArrayListWithCapacity(v.subTriggers().size());
+ for (Trigger subtrigger : v.subTriggers()) {
+ subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger));
+ }
+ return AfterFirstStateMachine.of(subStateMachines);
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterAll v) {
+ List<OnceTriggerStateMachine> subStateMachines =
+ Lists.newArrayListWithCapacity(v.subTriggers().size());
+ for (Trigger subtrigger : v.subTriggers()) {
+ subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger));
+ }
+ return AfterAllStateMachine.of(subStateMachines);
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterPane v) {
+ return AfterPaneStateMachine.elementCountAtLeast(v.getElementCount());
+ }
+
+ private TriggerStateMachine evaluateSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) {
+ AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+ AfterWatermarkStateMachine.pastEndOfWindow()
+ .withEarlyFirings(stateMachineForOnceTrigger(v.getEarlyTrigger()));
+
+ if (v.getLateTrigger() != null) {
+ machine = machine.withLateFirings(stateMachineForOnceTrigger(v.getLateTrigger()));
+ }
+ return machine;
+ }
+
+ private TriggerStateMachine evaluateSpecific(AfterEach v) {
+ List<TriggerStateMachine> subStateMachines =
+ Lists.newArrayListWithCapacity(v.subTriggers().size());
+
+ for (Trigger subtrigger : v.subTriggers()) {
+ subStateMachines.add(stateMachineForTrigger(subtrigger));
+ }
+
+ return AfterEachStateMachine.inOrder(subStateMachines);
+ }
+
+ private TriggerStateMachine evaluateSpecific(Repeatedly v) {
+ return RepeatedlyStateMachine.forever(stateMachineForTrigger(v.getRepeatedTrigger()));
+ }
+
+ private TriggerStateMachine evaluateSpecific(OrFinallyTrigger v) {
+ return new OrFinallyStateMachine(
+ stateMachineForTrigger(v.getMainTrigger()),
+ stateMachineForOnceTrigger(v.getUntilTrigger()));
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(AfterProcessingTime v) {
+ return evaluateSpecific((AfterDelayFromFirstElement) v);
+ }
+
+ private OnceTriggerStateMachine evaluateSpecific(final AfterDelayFromFirstElement v) {
+ return new AfterDelayFromFirstElementStateMachineAdapter(v);
+ }
+
+ private static class AfterDelayFromFirstElementStateMachineAdapter
+ extends AfterDelayFromFirstElementStateMachine {
+
+ public AfterDelayFromFirstElementStateMachineAdapter(AfterDelayFromFirstElement v) {
+ this(v.getTimeDomain(), v.getTimestampMappers());
+ }
+
+ private AfterDelayFromFirstElementStateMachineAdapter(
+ TimeDomain timeDomain, List<SerializableFunction<Instant, Instant>> timestampMappers) {
+ super(timeDomain, timestampMappers);
+ }
+
+ @Override
+ public Instant getCurrentTime(TriggerContext context) {
+ switch (timeDomain) {
+ case PROCESSING_TIME:
+ return context.currentProcessingTime();
+ case SYNCHRONIZED_PROCESSING_TIME:
+ return context.currentSynchronizedProcessingTime();
+ case EVENT_TIME:
+ return context.currentEventTime();
+ default:
+ throw new IllegalArgumentException("A time domain that doesn't exist was received!");
+ }
+ }
+
+ @Override
+ protected AfterDelayFromFirstElementStateMachine newWith(
+ List<SerializableFunction<Instant, Instant>> transform) {
+ return new AfterDelayFromFirstElementStateMachineAdapter(timeDomain, transform);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00672961/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
new file mode 100644
index 0000000..37f8f10
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests the {@link TriggerStateMachines} static utility methods. */
+@RunWith(JUnit4.class)
+public class TriggerStateMachinesTest {
+
+ //
+ // Tests for leaf trigger translation
+ //
+
+ @Test
+ public void testStateMachineForAfterPane() {
+ int count = 37;
+ AfterPane trigger = AfterPane.elementCountAtLeast(count);
+ AfterPaneStateMachine machine =
+ (AfterPaneStateMachine) TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+
+ assertThat(machine.getElementCount(), equalTo(trigger.getElementCount()));
+ }
+
+ @Test
+ public void testStateMachineForAfterProcessingTime() {
+ Duration minutes = Duration.standardMinutes(94);
+ Duration hours = Duration.standardHours(13);
+
+ AfterDelayFromFirstElement trigger =
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(minutes).alignedTo(hours);
+
+ AfterDelayFromFirstElementStateMachine machine =
+ (AfterDelayFromFirstElementStateMachine)
+ TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+
+ assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME));
+
+ // This equality is function equality, but due to the structure of the code (no serialization)
+ // it is OK to check
+ assertThat(machine.getTimestampMappers(), equalTo(trigger.getTimestampMappers()));
+ }
+
+ @Test
+ public void testStateMachineForAfterWatermark() {
+ AfterWatermark.FromEndOfWindow trigger = AfterWatermark.pastEndOfWindow();
+ AfterWatermarkStateMachine.FromEndOfWindow machine =
+ (AfterWatermarkStateMachine.FromEndOfWindow)
+ TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+ // No parameters, so if it doesn't crash, we win!
+ }
+
+ @Test
+ public void testDefaultTriggerTranslation() {
+ DefaultTrigger trigger = DefaultTrigger.of();
+ DefaultTriggerStateMachine machine =
+ (DefaultTriggerStateMachine)
+ checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger));
+ // No parameters, so if it doesn't crash, we win!
+ }
+
+ @Test
+ public void testNeverTranslation() {
+ NeverTrigger trigger = Never.ever();
+ NeverStateMachine machine =
+ (NeverStateMachine) checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger));
+ // No parameters, so if it doesn't crash, we win!
+ }
+
+ //
+ // Tests for composite trigger translation
+ //
+ // These check just that translation was invoked recursively using somewhat random
+ // leaf subtriggers; by induction it all holds together. Beyond this, explicit tests
+ // of particular triggers will suffice.
+
+ private static final int ELEM_COUNT = 472;
+ private static final Duration DELAY = Duration.standardSeconds(95673);
+
+ private final OnceTrigger subtrigger1 = AfterPane.elementCountAtLeast(ELEM_COUNT);
+ private final OnceTrigger subtrigger2 =
+ AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DELAY);
+
+ private final OnceTriggerStateMachine submachine1 =
+ TriggerStateMachines.stateMachineForOnceTrigger(subtrigger1);
+ private final OnceTriggerStateMachine submachine2 =
+ TriggerStateMachines.stateMachineForOnceTrigger(subtrigger2);
+
+ @Test
+ public void testAfterEachTranslation() {
+ AfterEach trigger = AfterEach.inOrder(subtrigger1, subtrigger2);
+ AfterEachStateMachine machine =
+ (AfterEachStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(machine, equalTo(AfterEachStateMachine.inOrder(submachine1, submachine2)));
+ }
+
+ @Test
+ public void testAfterFirstTranslation() {
+ AfterFirst trigger = AfterFirst.of(subtrigger1, subtrigger2);
+ AfterFirstStateMachine machine =
+ (AfterFirstStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(machine, equalTo(AfterFirstStateMachine.of(submachine1, submachine2)));
+ }
+
+ @Test
+ public void testAfterAllTranslation() {
+ AfterAll trigger = AfterAll.of(subtrigger1, subtrigger2);
+ AfterAllStateMachine machine =
+ (AfterAllStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(machine, equalTo(AfterAllStateMachine.of(submachine1, submachine2)));
+ }
+
+ @Test
+ public void testAfterWatermarkEarlyTranslation() {
+ AfterWatermark.AfterWatermarkEarlyAndLate trigger =
+ AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1);
+ AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+ (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
+ TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(
+ machine,
+ equalTo(AfterWatermarkStateMachine.pastEndOfWindow().withEarlyFirings(submachine1)));
+ }
+
+ @Test
+ public void testAfterWatermarkEarlyLateTranslation() {
+ AfterWatermark.AfterWatermarkEarlyAndLate trigger =
+ AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1).withLateFirings(subtrigger2);
+ AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+ (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
+ TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(
+ machine,
+ equalTo(
+ AfterWatermarkStateMachine.pastEndOfWindow()
+ .withEarlyFirings(submachine1)
+ .withLateFirings(submachine2)));
+ }
+
+ @Test
+ public void testOrFinallyTranslation() {
+ OrFinallyTrigger trigger = subtrigger1.orFinally(subtrigger2);
+ OrFinallyStateMachine machine =
+ (OrFinallyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(machine, equalTo(submachine1.orFinally(submachine2)));
+ }
+
+ @Test
+ public void testRepeatedlyTranslation() {
+ Repeatedly trigger = Repeatedly.forever(subtrigger1);
+ RepeatedlyStateMachine machine =
+ (RepeatedlyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+ assertThat(machine, equalTo(RepeatedlyStateMachine.forever(submachine1)));
+ }
+}
[11/18] incubator-beam git commit: Add accessors for
AfterPane(StateMachine) parameters
Posted by ke...@apache.org.
Add accessors for AfterPane(StateMachine) parameters
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e0c57664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e0c57664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e0c57664
Branch: refs/heads/master
Commit: e0c576649382ea4d1d70ee9e54ff25018210dcfb
Parents: b4445ac
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:14:32 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/triggers/AfterPaneStateMachine.java | 7 +++++++
.../org/apache/beam/sdk/transforms/windowing/AfterPane.java | 7 +++++++
2 files changed, 14 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e0c57664/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 723aba6..288643d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -49,6 +49,13 @@ private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Lo
}
/**
+ * The number of elements after which this trigger may fire.
+ */
+ public int getElementCount() {
+ return countElems;
+ }
+
+ /**
* Creates a trigger that fires when the pane contains at least {@code countElems} elements.
*/
public static AfterPaneStateMachine elementCountAtLeast(int countElems) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e0c57664/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
index 8c128dd..4d59d58 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
@@ -51,6 +51,13 @@ private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Lo
}
/**
+ * The number of elements after which this trigger may fire.
+ */
+ public int getElementCount() {
+ return countElems;
+ }
+
+ /**
* Creates a trigger that fires when the pane contains at least {@code countElems} elements.
*/
public static AfterPane elementCountAtLeast(int countElems) {
[08/18] incubator-beam git commit: Construct AfterFirstStateMachine
with a list of subtriggers
Posted by ke...@apache.org.
Construct AfterFirstStateMachine with a list of subtriggers
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77332f1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77332f1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77332f1e
Branch: refs/heads/master
Commit: 77332f1e612caf9090e148e1493c11ca8e753076
Parents: b2bb7c0
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:43:08 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/triggers/AfterFirstStateMachine.java | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77332f1e/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
index 272c278..f4b305e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.triggers;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
@@ -46,6 +47,11 @@ public class AfterFirstStateMachine extends OnceTriggerStateMachine {
return new AfterFirstStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
}
+ public static OnceTriggerStateMachine of(
+ Iterable<? extends TriggerStateMachine> triggers) {
+ return new AfterFirstStateMachine(ImmutableList.copyOf(triggers));
+ }
+
@Override
public void onElement(OnElementContext c) throws Exception {
for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) {
[05/18] incubator-beam git commit: Construct AfterEachStateMachine
from list of subtriggers
Posted by ke...@apache.org.
Construct AfterEachStateMachine from list of subtriggers
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f46ce0db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f46ce0db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f46ce0db
Branch: refs/heads/master
Commit: f46ce0db372ffb66915eda22092bd760e474b9c0
Parents: 8afb80e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:56:54 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/triggers/AfterEachStateMachine.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f46ce0db/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
index 140ac75..38357d4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.triggers;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
import java.util.Arrays;
import java.util.List;
@@ -54,6 +55,10 @@ public class AfterEachStateMachine extends TriggerStateMachine {
return new AfterEachStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
}
+ public static TriggerStateMachine inOrder(Iterable<? extends TriggerStateMachine> triggers) {
+ return new AfterEachStateMachine(ImmutableList.copyOf(triggers));
+ }
+
@Override
public void onElement(OnElementContext c) throws Exception {
if (!c.trigger().isMerging()) {
[15/18] incubator-beam git commit: Make NeverTrigger public for
translation
Posted by ke...@apache.org.
Make NeverTrigger public for translation
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/703c84ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/703c84ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/703c84ef
Branch: refs/heads/master
Commit: 703c84efd9c8219b2a8a205f0b6f1f9a999e927c
Parents: fa9b381
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:17:26 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/transforms/windowing/Never.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/703c84ef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index 353258b..07b70f4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -39,8 +39,10 @@ public final class Never {
return new NeverTrigger();
}
- // package-private in order to check identity for string formatting.
- static class NeverTrigger extends OnceTrigger {
+ /**
+ * The actual trigger class for {@link Never} triggers.
+ */
+ public static class NeverTrigger extends OnceTrigger {
protected NeverTrigger() {
super(null);
}
[04/18] incubator-beam git commit: Make
AfterSynchronizedProcessingTime public
Posted by ke...@apache.org.
Make AfterSynchronizedProcessingTime public
We need to be able to access this class to reason about it when
converting a trigger to a state machine.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b19918df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b19918df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b19918df
Branch: refs/heads/master
Commit: b19918df1992d445ad8c13a63722c690ddca3899
Parents: 77332f1
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:43:33 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700
----------------------------------------------------------------------
.../transforms/windowing/AfterSynchronizedProcessingTime.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b19918df/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
index 59ece10..b96b293 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
@@ -25,7 +25,11 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.TimeDomain;
import org.joda.time.Instant;
-class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement {
+/**
+ * A trigger that fires after synchronized processing time has reached a shared
+ * threshold between upstream workers.
+ */
+public class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement {
@Override
@Nullable
[02/18] incubator-beam git commit: Add accessors to Repeatedly trigger
Posted by ke...@apache.org.
Add accessors to Repeatedly trigger
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/303a42ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/303a42ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/303a42ab
Branch: refs/heads/master
Commit: 303a42ab993aee70527272dd908c3568a29f1e27
Parents: f46ce0d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:57:48 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/windowing/Repeatedly.java | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/303a42ab/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
index 8858798..45bc6c1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
@@ -49,10 +49,16 @@ public class Repeatedly extends Trigger {
return new Repeatedly(repeated);
}
- private Repeatedly(Trigger repeated) {
- super(Arrays.asList(repeated));
+ private Trigger repeatedTrigger;
+
+ private Repeatedly(Trigger repeatedTrigger) {
+ super(Arrays.asList(repeatedTrigger));
+ this.repeatedTrigger = repeatedTrigger;
}
+ public Trigger getRepeatedTrigger() {
+ return repeatedTrigger;
+ }
@Override
public void onElement(OnElementContext c) throws Exception {
[14/18] incubator-beam git commit: Touch up javadoc for
AfterDelayFromFirstElementStateMachine
Posted by ke...@apache.org.
Touch up javadoc for AfterDelayFromFirstElementStateMachine
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2107f796
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2107f796
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2107f796
Branch: refs/heads/master
Commit: 2107f7961e863a34c71a3fcaa4dd900d6394ed05
Parents: 476dcd7
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 17 19:54:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700
----------------------------------------------------------------------
.../core/triggers/AfterDelayFromFirstElementStateMachine.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2107f796/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 02b156b..d9d2c42 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -97,7 +97,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
}
/**
- * The time domain according for which this trigger sets timers.
+ * The time domain according to which this trigger sets timers.
*/
public TimeDomain getTimeDomain() {
return timeDomain;
[16/18] incubator-beam git commit: Make return types more precise for
AfterWatermarkTriggerStateMachine
Posted by ke...@apache.org.
Make return types more precise for AfterWatermarkTriggerStateMachine
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/476dcd74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/476dcd74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/476dcd74
Branch: refs/heads/master
Commit: 476dcd740c97831a5d55953014fc5d99135addce
Parents: 703c84e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:46:08 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700
----------------------------------------------------------------------
.../beam/runners/core/triggers/AfterWatermarkStateMachine.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/476dcd74/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
index 5ad6214..524c057 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
@@ -93,11 +93,11 @@ public class AfterWatermarkStateMachine {
this.lateTrigger = lateTrigger;
}
- public TriggerStateMachine withEarlyFirings(OnceTriggerStateMachine earlyTrigger) {
+ public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTriggerStateMachine earlyTrigger) {
return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
}
- public TriggerStateMachine withLateFirings(OnceTriggerStateMachine lateTrigger) {
+ public AfterWatermarkEarlyAndLate withLateFirings(OnceTriggerStateMachine lateTrigger) {
return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
}
[09/18] incubator-beam git commit: Make Trigger#subTriggers public
and non-null
Posted by ke...@apache.org.
Make Trigger#subTriggers public and non-null
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87c7811a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87c7811a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87c7811a
Branch: refs/heads/master
Commit: 87c7811a3bab6bf4bc0b8b9181127fe074579898
Parents: 3c73170
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:08:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/transforms/windowing/DefaultTrigger.java | 2 +-
.../org/apache/beam/sdk/transforms/windowing/Trigger.java | 9 ++++++---
.../java/org/apache/beam/sdk/util/ReshuffleTrigger.java | 2 +-
3 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87c7811a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
index d6b72ef..fee7cdf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
@@ -30,7 +30,7 @@ import org.joda.time.Instant;
public class DefaultTrigger extends Trigger{
private DefaultTrigger() {
- super(null);
+ super();
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87c7811a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
index a960aa4..cfabb8b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms.windowing;
import com.google.common.base.Joiner;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import javax.annotation.Nullable;
@@ -263,13 +264,15 @@ public abstract class Trigger implements Serializable {
public abstract MergingTriggerInfo trigger();
}
- @Nullable
protected final List<Trigger> subTriggers;
- protected Trigger(@Nullable List<Trigger> subTriggers) {
+ protected Trigger(List<Trigger> subTriggers) {
this.subTriggers = subTriggers;
}
+ protected Trigger() {
+ this(Collections.EMPTY_LIST);
+ }
/**
* Called every time an element is incorporated into a window.
@@ -370,7 +373,7 @@ public abstract class Trigger implements Serializable {
}
}
- public Iterable<Trigger> subTriggers() {
+ public List<Trigger> subTriggers() {
return subTriggers;
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87c7811a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
index 9e2c27d..437f14a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
@@ -31,7 +31,7 @@ import org.joda.time.Instant;
public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger {
public ReshuffleTrigger() {
- super(null);
+ super();
}
@Override
[07/18] incubator-beam git commit: Make OrFinallyTrigger public so it
can be examined
Posted by ke...@apache.org.
Make OrFinallyTrigger public so it can be examined
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2e565172
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2e565172
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2e565172
Branch: refs/heads/master
Commit: 2e565172d74bc906bdbdb1c4ee986c9e3d65089f
Parents: 303a42a
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:58:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700
----------------------------------------------------------------------
.../org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e565172/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
index 1a03450..9bef45a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
@@ -26,7 +26,7 @@ import org.joda.time.Instant;
/**
* Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires.
*/
-class OrFinallyTrigger extends Trigger {
+public class OrFinallyTrigger extends Trigger {
private static final int ACTUAL = 0;
private static final int UNTIL = 1;
[06/18] incubator-beam git commit: Add accessors to AfterWatermark
trigger
Posted by ke...@apache.org.
Add accessors to AfterWatermark trigger
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0df929f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0df929f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0df929f9
Branch: refs/heads/master
Commit: 0df929f984c2f5af3d19a4c54635c345ccd5b410
Parents: 2e56517
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 21:13:27 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700
----------------------------------------------------------------------
.../apache/beam/sdk/transforms/windowing/AfterWatermark.java | 8 ++++++++
1 file changed, 8 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0df929f9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
index e2463d8..da96de3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
@@ -85,6 +85,14 @@ public class AfterWatermark {
@Nullable
private final OnceTrigger lateTrigger;
+ public OnceTrigger getEarlyTrigger() {
+ return earlyTrigger;
+ }
+
+ public OnceTrigger getLateTrigger() {
+ return lateTrigger;
+ }
+
@SuppressWarnings("unchecked")
public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) {
super(lateTrigger == null