You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/18 03:38:32 UTC

[01/18] incubator-beam git commit: Add access to values from AfterDelay triggers

Repository: incubator-beam
Updated Branches:
  refs/heads/master 7ac725583 -> c5329f9b4


Add access to values from AfterDelay triggers


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4445ac4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4445ac4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4445ac4

Branch: refs/heads/master
Commit: b4445ac43530441463ee11c395bcf631b66ef2e9
Parents: 0df929f
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:13:43 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700

----------------------------------------------------------------------
 .../AfterDelayFromFirstElementStateMachine.java    | 17 ++++++++++++++++-
 1 file changed, 16 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4445ac4/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index a6616fa..02b156b 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -82,7 +82,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
    */
   protected final List<SerializableFunction<Instant, Instant>> timestampMappers;
 
-  private final TimeDomain timeDomain;
+  protected final TimeDomain timeDomain;
 
   public AfterDelayFromFirstElementStateMachine(
       TimeDomain timeDomain,
@@ -97,6 +97,21 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
   }
 
   /**
+   * The time domain according for which this trigger sets timers.
+   */
+  public TimeDomain getTimeDomain() {
+    return timeDomain;
+  }
+
+  /**
+   * The mapping functions applied to the arrival time of an element to determine when to
+   * set a wake-up timer for triggering.
+   */
+  public List<SerializableFunction<Instant, Instant>> getTimestampMappers() {
+    return timestampMappers;
+  }
+
+  /**
    * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
    * than the timestamp.
    *


[03/18] incubator-beam git commit: Add direct accessors for the components of OrFinallyTrigger

Posted by ke...@apache.org.
Add direct accessors for the components of OrFinallyTrigger


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8afb80e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8afb80e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8afb80e1

Branch: refs/heads/master
Commit: 8afb80e18f80a9d5a4ed18623a770dbf15ff5e65
Parents: b19918d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:44:35 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700

----------------------------------------------------------------------
 .../sdk/transforms/windowing/OrFinallyTrigger.java   | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8afb80e1/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
index 25b7b34..1a03450 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
@@ -35,6 +35,21 @@ class OrFinallyTrigger extends Trigger {
     super(Arrays.asList(actual, until));
   }
 
+  /**
+   * The main trigger, which will continue firing until the "until" trigger fires. See
+   * {@link #getUntilTrigger()}
+   */
+  public Trigger getMainTrigger() {
+    return subTriggers().get(ACTUAL);
+  }
+
+  /**
+   * The trigger that signals termination of this trigger.
+   */
+  public OnceTrigger getUntilTrigger() {
+    return (OnceTrigger) subTriggers().get(UNTIL);
+  }
+
   @Override
   public void onElement(OnElementContext c) throws Exception {
     c.trigger().subTrigger(ACTUAL).invokeOnElement(c);


[10/18] incubator-beam git commit: Construct AfterAllStateMachine with a list of subtriggers

Posted by ke...@apache.org.
Construct AfterAllStateMachine with a list of subtriggers


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b2bb7c04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b2bb7c04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b2bb7c04

Branch: refs/heads/master
Commit: b2bb7c048086a3e5eee7d2652d4bb971bc0694e7
Parents: 87c7811
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:42:38 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/triggers/AfterAllStateMachine.java | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b2bb7c04/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
index 2f4ad63..12cbc3d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.triggers;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
@@ -45,6 +46,10 @@ public class AfterAllStateMachine extends OnceTriggerStateMachine {
     return new AfterAllStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
   }
 
+  public static OnceTriggerStateMachine of(Iterable<? extends TriggerStateMachine> triggers) {
+    return new AfterAllStateMachine(ImmutableList.copyOf(triggers));
+  }
+
   @Override
   public void onElement(OnElementContext c) throws Exception {
     for (ExecutableTriggerStateMachine subTrigger : c.trigger().unfinishedSubTriggers()) {


[13/18] incubator-beam git commit: Accessors for AfterDelay

Posted by ke...@apache.org.
Accessors for AfterDelay


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fa9b3812
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fa9b3812
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fa9b3812

Branch: refs/heads/master
Commit: fa9b3812e7262b1e0368f613d3f667b71f5de59e
Parents: e1c5bfb
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:17:14 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700

----------------------------------------------------------------------
 .../windowing/AfterDelayFromFirstElement.java        | 15 +++++++++++++++
 1 file changed, 15 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fa9b3812/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
index c4bc946..6078b34 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java
@@ -97,6 +97,21 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger {
   }
 
   /**
+   * The time domain according for which this trigger sets timers.
+   */
+  public TimeDomain getTimeDomain() {
+    return timeDomain;
+  }
+
+  /**
+   * The mapping functions applied to the arrival time of an element to determine when to
+   * set a wake-up timer for triggering.
+   */
+  public List<SerializableFunction<Instant, Instant>> getTimestampMappers() {
+    return timestampMappers;
+  }
+
+  /**
    * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater
    * than the timestamp.
    *


[12/18] incubator-beam git commit: Make return types of trigger static factory methods precise

Posted by ke...@apache.org.
Make return types of trigger static factory methods precise

This is helpful in testing and loses no abstraction - the code
locations calling these methods already know the more-specific
type that will be returned.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e1c5bfbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e1c5bfbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e1c5bfbc

Branch: refs/heads/master
Commit: e1c5bfbc76c0ef3766d5b1bf2dbd47e13f0ed97c
Parents: e0c5766
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:15:32 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/windowing/AfterAll.java  | 2 +-
 .../java/org/apache/beam/sdk/transforms/windowing/AfterEach.java | 2 +-
 .../org/apache/beam/sdk/transforms/windowing/AfterFirst.java     | 2 +-
 .../org/apache/beam/sdk/transforms/windowing/AfterWatermark.java | 4 ++--
 .../java/org/apache/beam/sdk/transforms/windowing/Never.java     | 2 +-
 .../java/org/apache/beam/sdk/transforms/windowing/Trigger.java   | 2 +-
 6 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
index cc8c97f..0e37d33 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java
@@ -41,7 +41,7 @@ public class AfterAll extends OnceTrigger {
   /**
    * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers.
    */
-  public static OnceTrigger of(OnceTrigger... triggers) {
+  public static AfterAll of(OnceTrigger... triggers) {
     return new AfterAll(Arrays.<Trigger>asList(triggers));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
index 629c640..961d97f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
@@ -54,7 +54,7 @@ public class AfterEach extends Trigger {
    * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers.
    */
   @SafeVarargs
-  public static Trigger inOrder(Trigger... triggers) {
+  public static AfterEach inOrder(Trigger... triggers) {
     return new AfterEach(Arrays.<Trigger>asList(triggers));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
index 6b06cfa..7840fc4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java
@@ -42,7 +42,7 @@ public class AfterFirst extends OnceTrigger {
   /**
    * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers.
    */
-  public static OnceTrigger of(OnceTrigger... triggers) {
+  public static AfterFirst of(OnceTrigger... triggers) {
     return new AfterFirst(Arrays.<Trigger>asList(triggers));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
index da96de3..89c1ba9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
@@ -102,11 +102,11 @@ public class AfterWatermark {
       this.lateTrigger = lateTrigger;
     }
 
-    public Trigger withEarlyFirings(OnceTrigger earlyTrigger) {
+    public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyTrigger) {
       return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
     }
 
-    public Trigger withLateFirings(OnceTrigger lateTrigger) {
+    public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateTrigger) {
       return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index 5f20465..353258b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -34,7 +34,7 @@ public final class Never {
    * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey}
    * when the {@link BoundedWindow} closes.
    */
-  public static OnceTrigger ever() {
+  public static NeverTrigger ever() {
     // NeverTrigger ignores all inputs and is Window-type independent.
     return new NeverTrigger();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e1c5bfbc/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
index cfabb8b..90e9386 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
@@ -490,7 +490,7 @@ public abstract class Trigger implements Serializable {
    * <p>Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same
    * as {@code AfterFirst.of(t1, t2)}.
    */
-  public Trigger orFinally(OnceTrigger until) {
+  public OrFinallyTrigger orFinally(OnceTrigger until) {
     return new OrFinallyTrigger(this, until);
   }
 


[18/18] incubator-beam git commit: This closes #1101

Posted by ke...@apache.org.
This closes #1101


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5329f9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5329f9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5329f9b

Branch: refs/heads/master
Commit: c5329f9b43c51cf1933d28f88ad44adce96ec7b6
Parents: 7ac7255 0067296
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 17 20:37:24 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 20:37:24 2016 -0700

----------------------------------------------------------------------
 .../core/triggers/AfterAllStateMachine.java     |   5 +
 .../AfterDelayFromFirstElementStateMachine.java |  17 +-
 .../core/triggers/AfterEachStateMachine.java    |   5 +
 .../core/triggers/AfterFirstStateMachine.java   |   6 +
 .../core/triggers/AfterPaneStateMachine.java    |   7 +
 .../triggers/AfterWatermarkStateMachine.java    |   4 +-
 .../core/triggers/TriggerStateMachines.java     | 210 +++++++++++++++++++
 .../core/triggers/TriggerStateMachinesTest.java | 199 ++++++++++++++++++
 .../beam/sdk/transforms/windowing/AfterAll.java |   2 +-
 .../windowing/AfterDelayFromFirstElement.java   |  15 ++
 .../sdk/transforms/windowing/AfterEach.java     |   2 +-
 .../sdk/transforms/windowing/AfterFirst.java    |   2 +-
 .../sdk/transforms/windowing/AfterPane.java     |   7 +
 .../AfterSynchronizedProcessingTime.java        |   6 +-
 .../transforms/windowing/AfterWatermark.java    |  12 +-
 .../transforms/windowing/DefaultTrigger.java    |   2 +-
 .../beam/sdk/transforms/windowing/Never.java    |   8 +-
 .../transforms/windowing/OrFinallyTrigger.java  |  17 +-
 .../sdk/transforms/windowing/Repeatedly.java    |  10 +-
 .../beam/sdk/transforms/windowing/Trigger.java  |  11 +-
 .../apache/beam/sdk/util/ReshuffleTrigger.java  |   2 +-
 21 files changed, 528 insertions(+), 21 deletions(-)
----------------------------------------------------------------------



[17/18] incubator-beam git commit: Add TriggerStateMachines with conversion from Trigger

Posted by ke...@apache.org.
Add TriggerStateMachines with conversion from Trigger


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/00672961
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/00672961
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/00672961

Branch: refs/heads/master
Commit: 00672961b5a3115c298c457dfe43f543947298a0
Parents: 2107f79
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:02:52 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:49 2016 -0700

----------------------------------------------------------------------
 .../core/triggers/TriggerStateMachines.java     | 210 +++++++++++++++++++
 .../core/triggers/TriggerStateMachinesTest.java | 199 ++++++++++++++++++
 2 files changed, 409 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00672961/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
new file mode 100644
index 0000000..317e3b9
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.List;
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Instant;
+
+/** Translates a {@link Trigger} to a {@link TriggerStateMachine}. */
+public class TriggerStateMachines {
+
+  private TriggerStateMachines() {}
+
+  @VisibleForTesting static final StateMachineConverter CONVERTER = new StateMachineConverter();
+
+  public static TriggerStateMachine stateMachineForTrigger(Trigger trigger) {
+    return CONVERTER.evaluateTrigger(trigger);
+  }
+
+  public static OnceTriggerStateMachine stateMachineForOnceTrigger(OnceTrigger trigger) {
+    return CONVERTER.evaluateOnceTrigger(trigger);
+  }
+
+  @VisibleForTesting
+  static class StateMachineConverter {
+
+    public TriggerStateMachine evaluateTrigger(Trigger trigger) {
+      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+      return tryEvaluate(evaluationMethod, trigger);
+    }
+
+    public OnceTriggerStateMachine evaluateOnceTrigger(OnceTrigger trigger) {
+      Method evaluationMethod = getEvaluationMethod(trigger.getClass());
+      return (OnceTriggerStateMachine) tryEvaluate(evaluationMethod, trigger);
+    }
+
+    private TriggerStateMachine tryEvaluate(Method evaluationMethod, Trigger trigger) {
+      try {
+        return (TriggerStateMachine) evaluationMethod.invoke(this, trigger);
+      } catch (InvocationTargetException exc) {
+        if (exc.getCause() instanceof RuntimeException) {
+          throw (RuntimeException) exc.getCause();
+        } else {
+          throw new RuntimeException(exc.getCause());
+        }
+      } catch (IllegalAccessException exc) {
+        throw new IllegalStateException(
+            String.format("Internal error: could not invoke %s", evaluationMethod));
+      }
+    }
+
+    private Method getEvaluationMethod(Class<?> clazz) {
+      Method evaluationMethod;
+      try {
+        return getClass().getDeclaredMethod("evaluateSpecific", clazz);
+      } catch (NoSuchMethodException exc) {
+        throw new UnsupportedOperationException(
+            String.format(
+                "Cannot translate trigger class %s to a state machine.", clazz.getCanonicalName()),
+            exc);
+      }
+    }
+
+    private TriggerStateMachine evaluateSpecific(DefaultTrigger v) {
+      return DefaultTriggerStateMachine.of();
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterWatermark.FromEndOfWindow v) {
+      return AfterWatermarkStateMachine.pastEndOfWindow();
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(NeverTrigger v) {
+      return NeverStateMachine.ever();
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterSynchronizedProcessingTime v) {
+      return new AfterSynchronizedProcessingTimeStateMachine();
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterFirst v) {
+      List<OnceTriggerStateMachine> subStateMachines =
+          Lists.newArrayListWithCapacity(v.subTriggers().size());
+      for (Trigger subtrigger : v.subTriggers()) {
+        subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger));
+      }
+      return AfterFirstStateMachine.of(subStateMachines);
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterAll v) {
+      List<OnceTriggerStateMachine> subStateMachines =
+          Lists.newArrayListWithCapacity(v.subTriggers().size());
+      for (Trigger subtrigger : v.subTriggers()) {
+        subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger));
+      }
+      return AfterAllStateMachine.of(subStateMachines);
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterPane v) {
+      return AfterPaneStateMachine.elementCountAtLeast(v.getElementCount());
+    }
+
+    private TriggerStateMachine evaluateSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) {
+      AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+          AfterWatermarkStateMachine.pastEndOfWindow()
+              .withEarlyFirings(stateMachineForOnceTrigger(v.getEarlyTrigger()));
+
+      if (v.getLateTrigger() != null) {
+        machine = machine.withLateFirings(stateMachineForOnceTrigger(v.getLateTrigger()));
+      }
+      return machine;
+    }
+
+    private TriggerStateMachine evaluateSpecific(AfterEach v) {
+      List<TriggerStateMachine> subStateMachines =
+          Lists.newArrayListWithCapacity(v.subTriggers().size());
+
+      for (Trigger subtrigger : v.subTriggers()) {
+        subStateMachines.add(stateMachineForTrigger(subtrigger));
+      }
+
+      return AfterEachStateMachine.inOrder(subStateMachines);
+    }
+
+    private TriggerStateMachine evaluateSpecific(Repeatedly v) {
+      return RepeatedlyStateMachine.forever(stateMachineForTrigger(v.getRepeatedTrigger()));
+    }
+
+    private TriggerStateMachine evaluateSpecific(OrFinallyTrigger v) {
+      return new OrFinallyStateMachine(
+          stateMachineForTrigger(v.getMainTrigger()),
+          stateMachineForOnceTrigger(v.getUntilTrigger()));
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(AfterProcessingTime v) {
+      return evaluateSpecific((AfterDelayFromFirstElement) v);
+    }
+
+    private OnceTriggerStateMachine evaluateSpecific(final AfterDelayFromFirstElement v) {
+      return new AfterDelayFromFirstElementStateMachineAdapter(v);
+    }
+
+    private static class AfterDelayFromFirstElementStateMachineAdapter
+        extends AfterDelayFromFirstElementStateMachine {
+
+      public AfterDelayFromFirstElementStateMachineAdapter(AfterDelayFromFirstElement v) {
+        this(v.getTimeDomain(), v.getTimestampMappers());
+      }
+
+      private AfterDelayFromFirstElementStateMachineAdapter(
+          TimeDomain timeDomain, List<SerializableFunction<Instant, Instant>> timestampMappers) {
+        super(timeDomain, timestampMappers);
+      }
+
+      @Override
+      public Instant getCurrentTime(TriggerContext context) {
+        switch (timeDomain) {
+          case PROCESSING_TIME:
+            return context.currentProcessingTime();
+          case SYNCHRONIZED_PROCESSING_TIME:
+            return context.currentSynchronizedProcessingTime();
+          case EVENT_TIME:
+            return context.currentEventTime();
+          default:
+            throw new IllegalArgumentException("A time domain that doesn't exist was received!");
+        }
+      }
+
+      @Override
+      protected AfterDelayFromFirstElementStateMachine newWith(
+          List<SerializableFunction<Instant, Instant>> transform) {
+        return new AfterDelayFromFirstElementStateMachineAdapter(timeDomain, transform);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/00672961/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
new file mode 100644
index 0000000..37f8f10
--- /dev/null
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement;
+import org.apache.beam.sdk.transforms.windowing.AfterEach;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
+import org.apache.beam.sdk.transforms.windowing.Never;
+import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger;
+import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger;
+import org.apache.beam.sdk.transforms.windowing.Repeatedly;
+import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests the {@link TriggerStateMachines} static utility methods. */
+@RunWith(JUnit4.class)
+public class TriggerStateMachinesTest {
+
+  //
+  // Tests for leaf trigger translation
+  //
+
+  @Test
+  public void testStateMachineForAfterPane() {
+    int count = 37;
+    AfterPane trigger = AfterPane.elementCountAtLeast(count);
+    AfterPaneStateMachine machine =
+        (AfterPaneStateMachine) TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+
+    assertThat(machine.getElementCount(), equalTo(trigger.getElementCount()));
+  }
+
+  @Test
+  public void testStateMachineForAfterProcessingTime() {
+    Duration minutes = Duration.standardMinutes(94);
+    Duration hours = Duration.standardHours(13);
+
+    AfterDelayFromFirstElement trigger =
+        AfterProcessingTime.pastFirstElementInPane().plusDelayOf(minutes).alignedTo(hours);
+
+    AfterDelayFromFirstElementStateMachine machine =
+        (AfterDelayFromFirstElementStateMachine)
+            TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+
+    assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME));
+
+    // This equality is function equality, but due to the structure of the code (no serialization)
+    // it is OK to check
+    assertThat(machine.getTimestampMappers(), equalTo(trigger.getTimestampMappers()));
+  }
+
+  @Test
+  public void testStateMachineForAfterWatermark() {
+    AfterWatermark.FromEndOfWindow trigger = AfterWatermark.pastEndOfWindow();
+    AfterWatermarkStateMachine.FromEndOfWindow machine =
+        (AfterWatermarkStateMachine.FromEndOfWindow)
+            TriggerStateMachines.stateMachineForOnceTrigger(trigger);
+    // No parameters, so if it doesn't crash, we win!
+  }
+
+  @Test
+  public void testDefaultTriggerTranslation() {
+    DefaultTrigger trigger = DefaultTrigger.of();
+    DefaultTriggerStateMachine machine =
+        (DefaultTriggerStateMachine)
+            checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger));
+    // No parameters, so if it doesn't crash, we win!
+  }
+
+  @Test
+  public void testNeverTranslation() {
+    NeverTrigger trigger = Never.ever();
+    NeverStateMachine machine =
+        (NeverStateMachine) checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger));
+    // No parameters, so if it doesn't crash, we win!
+  }
+
+  //
+  // Tests for composite trigger translation
+  //
+  // These check just that translation was invoked recursively using somewhat random
+  // leaf subtriggers; by induction it all holds together. Beyond this, explicit tests
+  // of particular triggers will suffice.
+
+  private static final int ELEM_COUNT = 472;
+  private static final Duration DELAY = Duration.standardSeconds(95673);
+
+  private final OnceTrigger subtrigger1 = AfterPane.elementCountAtLeast(ELEM_COUNT);
+  private final OnceTrigger subtrigger2 =
+      AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DELAY);
+
+  private final OnceTriggerStateMachine submachine1 =
+      TriggerStateMachines.stateMachineForOnceTrigger(subtrigger1);
+  private final OnceTriggerStateMachine submachine2 =
+      TriggerStateMachines.stateMachineForOnceTrigger(subtrigger2);
+
+  @Test
+  public void testAfterEachTranslation() {
+    AfterEach trigger = AfterEach.inOrder(subtrigger1, subtrigger2);
+    AfterEachStateMachine machine =
+        (AfterEachStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(machine, equalTo(AfterEachStateMachine.inOrder(submachine1, submachine2)));
+  }
+
+  @Test
+  public void testAfterFirstTranslation() {
+    AfterFirst trigger = AfterFirst.of(subtrigger1, subtrigger2);
+    AfterFirstStateMachine machine =
+        (AfterFirstStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(machine, equalTo(AfterFirstStateMachine.of(submachine1, submachine2)));
+  }
+
+  @Test
+  public void testAfterAllTranslation() {
+    AfterAll trigger = AfterAll.of(subtrigger1, subtrigger2);
+    AfterAllStateMachine machine =
+        (AfterAllStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(machine, equalTo(AfterAllStateMachine.of(submachine1, submachine2)));
+  }
+
+  @Test
+  public void testAfterWatermarkEarlyTranslation() {
+    AfterWatermark.AfterWatermarkEarlyAndLate trigger =
+        AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1);
+    AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+        (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
+            TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(
+        machine,
+        equalTo(AfterWatermarkStateMachine.pastEndOfWindow().withEarlyFirings(submachine1)));
+  }
+
+  @Test
+  public void testAfterWatermarkEarlyLateTranslation() {
+    AfterWatermark.AfterWatermarkEarlyAndLate trigger =
+        AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1).withLateFirings(subtrigger2);
+    AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine =
+        (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate)
+            TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(
+        machine,
+        equalTo(
+            AfterWatermarkStateMachine.pastEndOfWindow()
+                .withEarlyFirings(submachine1)
+                .withLateFirings(submachine2)));
+  }
+
+  @Test
+  public void testOrFinallyTranslation() {
+    OrFinallyTrigger trigger = subtrigger1.orFinally(subtrigger2);
+    OrFinallyStateMachine machine =
+        (OrFinallyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(machine, equalTo(submachine1.orFinally(submachine2)));
+  }
+
+  @Test
+  public void testRepeatedlyTranslation() {
+    Repeatedly trigger = Repeatedly.forever(subtrigger1);
+    RepeatedlyStateMachine machine =
+        (RepeatedlyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger);
+
+    assertThat(machine, equalTo(RepeatedlyStateMachine.forever(submachine1)));
+  }
+}


[11/18] incubator-beam git commit: Add accessors for AfterPane(StateMachine) parameters

Posted by ke...@apache.org.
Add accessors for AfterPane(StateMachine) parameters


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e0c57664
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e0c57664
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e0c57664

Branch: refs/heads/master
Commit: e0c576649382ea4d1d70ee9e54ff25018210dcfb
Parents: b4445ac
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:14:32 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/triggers/AfterPaneStateMachine.java     | 7 +++++++
 .../org/apache/beam/sdk/transforms/windowing/AfterPane.java   | 7 +++++++
 2 files changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e0c57664/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
index 723aba6..288643d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java
@@ -49,6 +49,13 @@ private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Lo
   }
 
   /**
+   * The number of elements after which this trigger may fire.
+   */
+  public int getElementCount() {
+    return countElems;
+  }
+
+  /**
    * Creates a trigger that fires when the pane contains at least {@code countElems} elements.
    */
   public static AfterPaneStateMachine elementCountAtLeast(int countElems) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e0c57664/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
index 8c128dd..4d59d58 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java
@@ -51,6 +51,13 @@ private static final StateTag<Object, AccumulatorCombiningState<Long, long[], Lo
   }
 
   /**
+   * The number of elements after which this trigger may fire.
+   */
+  public int getElementCount() {
+    return countElems;
+  }
+
+  /**
    * Creates a trigger that fires when the pane contains at least {@code countElems} elements.
    */
   public static AfterPane elementCountAtLeast(int countElems) {


[08/18] incubator-beam git commit: Construct AfterFirstStateMachine with a list of subtriggers

Posted by ke...@apache.org.
Construct AfterFirstStateMachine with a list of subtriggers


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77332f1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77332f1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77332f1e

Branch: refs/heads/master
Commit: 77332f1e612caf9090e148e1493c11ca8e753076
Parents: b2bb7c0
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:43:08 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/triggers/AfterFirstStateMachine.java     | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77332f1e/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
index 272c278..f4b305e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.triggers;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine;
@@ -46,6 +47,11 @@ public class AfterFirstStateMachine extends OnceTriggerStateMachine {
     return new AfterFirstStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
   }
 
+  public static OnceTriggerStateMachine of(
+      Iterable<? extends TriggerStateMachine> triggers) {
+    return new AfterFirstStateMachine(ImmutableList.copyOf(triggers));
+  }
+
   @Override
   public void onElement(OnElementContext c) throws Exception {
     for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) {


[05/18] incubator-beam git commit: Construct AfterEachStateMachine from list of subtriggers

Posted by ke...@apache.org.
Construct AfterEachStateMachine from list of subtriggers


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f46ce0db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f46ce0db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f46ce0db

Branch: refs/heads/master
Commit: f46ce0db372ffb66915eda22092bd760e474b9c0
Parents: 8afb80e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:56:54 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/triggers/AfterEachStateMachine.java       | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f46ce0db/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
index 140ac75..38357d4 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java
@@ -20,6 +20,7 @@ package org.apache.beam.runners.core.triggers;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -54,6 +55,10 @@ public class AfterEachStateMachine extends TriggerStateMachine {
     return new AfterEachStateMachine(Arrays.<TriggerStateMachine>asList(triggers));
   }
 
+  public static TriggerStateMachine inOrder(Iterable<? extends TriggerStateMachine> triggers) {
+    return new AfterEachStateMachine(ImmutableList.copyOf(triggers));
+  }
+
   @Override
   public void onElement(OnElementContext c) throws Exception {
     if (!c.trigger().isMerging()) {


[15/18] incubator-beam git commit: Make NeverTrigger public for translation

Posted by ke...@apache.org.
Make NeverTrigger public for translation


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/703c84ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/703c84ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/703c84ef

Branch: refs/heads/master
Commit: 703c84efd9c8219b2a8a205f0b6f1f9a999e927c
Parents: fa9b381
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:17:26 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/sdk/transforms/windowing/Never.java   | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/703c84ef/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index 353258b..07b70f4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -39,8 +39,10 @@ public final class Never {
     return new NeverTrigger();
   }
 
-  // package-private in order to check identity for string formatting.
-  static class NeverTrigger extends OnceTrigger {
+  /**
+   * The actual trigger class for {@link Never} triggers.
+   */
+  public static class NeverTrigger extends OnceTrigger {
     protected NeverTrigger() {
       super(null);
     }


[04/18] incubator-beam git commit: Make AfterSynchronizedProcessingTime public

Posted by ke...@apache.org.
Make AfterSynchronizedProcessingTime public

We need to be able to access this class to reason about it when
converting a trigger to a state machine.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b19918df
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b19918df
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b19918df

Branch: refs/heads/master
Commit: b19918df1992d445ad8c13a63722c690ddca3899
Parents: 77332f1
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:43:33 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700

----------------------------------------------------------------------
 .../transforms/windowing/AfterSynchronizedProcessingTime.java  | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b19918df/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
index 59ece10..b96b293 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java
@@ -25,7 +25,11 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.joda.time.Instant;
 
-class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement {
+/**
+ * A trigger that fires after synchronized processing time has reached a shared
+ * threshold between upstream workers.
+ */
+public class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement {
 
   @Override
   @Nullable


[02/18] incubator-beam git commit: Add accessors to Repeatedly trigger

Posted by ke...@apache.org.
Add accessors to Repeatedly trigger


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/303a42ab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/303a42ab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/303a42ab

Branch: refs/heads/master
Commit: 303a42ab993aee70527272dd908c3568a29f1e27
Parents: f46ce0d
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:57:48 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/windowing/Repeatedly.java  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/303a42ab/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
index 8858798..45bc6c1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java
@@ -49,10 +49,16 @@ public class Repeatedly extends Trigger {
     return new Repeatedly(repeated);
   }
 
-  private Repeatedly(Trigger repeated) {
-    super(Arrays.asList(repeated));
+  private Trigger repeatedTrigger;
+
+  private Repeatedly(Trigger repeatedTrigger) {
+    super(Arrays.asList(repeatedTrigger));
+    this.repeatedTrigger = repeatedTrigger;
   }
 
+  public Trigger getRepeatedTrigger() {
+    return repeatedTrigger;
+  }
 
   @Override
   public void onElement(OnElementContext c) throws Exception {


[14/18] incubator-beam git commit: Touch up javadoc for AfterDelayFromFirstElementStateMachine

Posted by ke...@apache.org.
Touch up javadoc for AfterDelayFromFirstElementStateMachine


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2107f796
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2107f796
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2107f796

Branch: refs/heads/master
Commit: 2107f7961e863a34c71a3fcaa4dd900d6394ed05
Parents: 476dcd7
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 17 19:54:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700

----------------------------------------------------------------------
 .../core/triggers/AfterDelayFromFirstElementStateMachine.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2107f796/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
index 02b156b..d9d2c42 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java
@@ -97,7 +97,7 @@ public abstract class AfterDelayFromFirstElementStateMachine extends OnceTrigger
   }
 
   /**
-   * The time domain according for which this trigger sets timers.
+   * The time domain according to which this trigger sets timers.
    */
   public TimeDomain getTimeDomain() {
     return timeDomain;


[16/18] incubator-beam git commit: Make return types more precise for AfterWatermarkTriggerStateMachine

Posted by ke...@apache.org.
Make return types more precise for AfterWatermarkTriggerStateMachine


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/476dcd74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/476dcd74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/476dcd74

Branch: refs/heads/master
Commit: 476dcd740c97831a5d55953014fc5d99135addce
Parents: 703c84e
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 22:46:08 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:36 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/core/triggers/AfterWatermarkStateMachine.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/476dcd74/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
index 5ad6214..524c057 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java
@@ -93,11 +93,11 @@ public class AfterWatermarkStateMachine {
       this.lateTrigger = lateTrigger;
     }
 
-    public TriggerStateMachine withEarlyFirings(OnceTriggerStateMachine earlyTrigger) {
+    public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTriggerStateMachine earlyTrigger) {
       return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
     }
 
-    public TriggerStateMachine withLateFirings(OnceTriggerStateMachine lateTrigger) {
+    public AfterWatermarkEarlyAndLate withLateFirings(OnceTriggerStateMachine lateTrigger) {
       return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger);
     }
 


[09/18] incubator-beam git commit: Make Trigger#subTriggers public and non-null

Posted by ke...@apache.org.
Make Trigger#subTriggers public and non-null


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/87c7811a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/87c7811a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/87c7811a

Branch: refs/heads/master
Commit: 87c7811a3bab6bf4bc0b8b9181127fe074579898
Parents: 3c73170
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:08:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/windowing/DefaultTrigger.java       | 2 +-
 .../org/apache/beam/sdk/transforms/windowing/Trigger.java   | 9 ++++++---
 .../java/org/apache/beam/sdk/util/ReshuffleTrigger.java     | 2 +-
 3 files changed, 8 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87c7811a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
index d6b72ef..fee7cdf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java
@@ -30,7 +30,7 @@ import org.joda.time.Instant;
 public class DefaultTrigger extends Trigger{
 
   private DefaultTrigger() {
-    super(null);
+    super();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87c7811a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
index a960aa4..cfabb8b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java
@@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms.windowing;
 import com.google.common.base.Joiner;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import javax.annotation.Nullable;
@@ -263,13 +264,15 @@ public abstract class Trigger implements Serializable {
     public abstract MergingTriggerInfo trigger();
   }
 
-  @Nullable
   protected final List<Trigger> subTriggers;
 
-  protected Trigger(@Nullable List<Trigger> subTriggers) {
+  protected Trigger(List<Trigger> subTriggers) {
     this.subTriggers = subTriggers;
   }
 
+  protected Trigger() {
+    this(Collections.EMPTY_LIST);
+  }
 
   /**
    * Called every time an element is incorporated into a window.
@@ -370,7 +373,7 @@ public abstract class Trigger implements Serializable {
     }
   }
 
-  public Iterable<Trigger> subTriggers() {
+  public List<Trigger> subTriggers() {
     return subTriggers;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/87c7811a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
index 9e2c27d..437f14a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java
@@ -31,7 +31,7 @@ import org.joda.time.Instant;
 public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger {
 
   public ReshuffleTrigger() {
-    super(null);
+    super();
   }
 
   @Override


[07/18] incubator-beam git commit: Make OrFinallyTrigger public so it can be examined

Posted by ke...@apache.org.
Make OrFinallyTrigger public so it can be examined


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2e565172
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2e565172
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2e565172

Branch: refs/heads/master
Commit: 2e565172d74bc906bdbdb1c4ee986c9e3d65089f
Parents: 303a42a
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 20:58:12 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e565172/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
index 1a03450..9bef45a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java
@@ -26,7 +26,7 @@ import org.joda.time.Instant;
 /**
  * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires.
  */
-class OrFinallyTrigger extends Trigger {
+public class OrFinallyTrigger extends Trigger {
 
   private static final int ACTUAL = 0;
   private static final int UNTIL = 1;


[06/18] incubator-beam git commit: Add accessors to AfterWatermark trigger

Posted by ke...@apache.org.
Add accessors to AfterWatermark trigger


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0df929f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0df929f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0df929f9

Branch: refs/heads/master
Commit: 0df929f984c2f5af3d19a4c54635c345ccd5b410
Parents: 2e56517
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu Oct 13 21:13:27 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 17 19:56:35 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/windowing/AfterWatermark.java | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0df929f9/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
index e2463d8..da96de3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java
@@ -85,6 +85,14 @@ public class AfterWatermark {
     @Nullable
     private final OnceTrigger lateTrigger;
 
+    public OnceTrigger getEarlyTrigger() {
+      return earlyTrigger;
+    }
+
+    public OnceTrigger getLateTrigger() {
+      return lateTrigger;
+    }
+
     @SuppressWarnings("unchecked")
     public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) {
       super(lateTrigger == null