You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/18 18:08:04 UTC

[1/2] incubator-beam git commit: Restore trigger-related tests missed in #1083

Repository: incubator-beam
Updated Branches:
  refs/heads/master 71c69b31b -> 6d686288e


Restore trigger-related tests missed in #1083


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d43e8aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d43e8aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d43e8aa

Branch: refs/heads/master
Commit: 8d43e8aa7ccb154e17d6840c25c7a72684c615aa
Parents: 71c69b3
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 18 10:11:37 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Oct 18 11:00:47 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/util/ExecutableTriggerTest.java    | 127 +++++++++++++++++++
 .../sdk/util/FinishedTriggersBitSetTest.java    |  55 ++++++++
 .../sdk/util/FinishedTriggersProperties.java    | 110 ++++++++++++++++
 .../beam/sdk/util/FinishedTriggersSetTest.java  |  60 +++++++++
 4 files changed, 352 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
new file mode 100644
index 0000000..1e3a1ff
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ExecutableTrigger}.
+ */
+@RunWith(JUnit4.class)
+public class ExecutableTriggerTest {
+
+  @Test
+  public void testIndexAssignmentLeaf() throws Exception {
+    StubTrigger t1 = new StubTrigger();
+    ExecutableTrigger executable = ExecutableTrigger.create(t1);
+    assertEquals(0, executable.getTriggerIndex());
+  }
+
+  @Test
+  public void testIndexAssignmentOneLevel() throws Exception {
+    StubTrigger t1 = new StubTrigger();
+    StubTrigger t2 = new StubTrigger();
+    StubTrigger t = new StubTrigger(t1, t2);
+
+    ExecutableTrigger executable = ExecutableTrigger.create(t);
+
+    assertEquals(0, executable.getTriggerIndex());
+    assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
+    assertSame(t1, executable.subTriggers().get(0).getSpec());
+    assertEquals(2, executable.subTriggers().get(1).getTriggerIndex());
+    assertSame(t2, executable.subTriggers().get(1).getSpec());
+  }
+
+  @Test
+  public void testIndexAssignmentTwoLevel() throws Exception {
+    StubTrigger t11 = new StubTrigger();
+    StubTrigger t12 = new StubTrigger();
+    StubTrigger t13 = new StubTrigger();
+    StubTrigger t14 = new StubTrigger();
+    StubTrigger t21 = new StubTrigger();
+    StubTrigger t22 = new StubTrigger();
+    StubTrigger t1 = new StubTrigger(t11, t12, t13, t14);
+    StubTrigger t2 = new StubTrigger(t21, t22);
+    StubTrigger t = new StubTrigger(t1, t2);
+
+    ExecutableTrigger executable = ExecutableTrigger.create(t);
+
+    assertEquals(0, executable.getTriggerIndex());
+    assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
+    assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree());
+    assertEquals(6, executable.subTriggers().get(1).getTriggerIndex());
+
+    assertSame(t1, executable.getSubTriggerContaining(1).getSpec());
+    assertSame(t2, executable.getSubTriggerContaining(6).getSpec());
+    assertSame(t1, executable.getSubTriggerContaining(2).getSpec());
+    assertSame(t1, executable.getSubTriggerContaining(3).getSpec());
+    assertSame(t1, executable.getSubTriggerContaining(5).getSpec());
+    assertSame(t2, executable.getSubTriggerContaining(7).getSpec());
+  }
+
+  private static class StubTrigger extends Trigger {
+
+    @SafeVarargs
+    protected StubTrigger(Trigger... subTriggers) {
+      super(Arrays.asList(subTriggers));
+    }
+
+    @Override
+    public void onElement(OnElementContext c) throws Exception { }
+
+    @Override
+    public void onMerge(OnMergeContext c) throws Exception { }
+
+    @Override
+    public void clear(TriggerContext c) throws Exception {
+    }
+
+    @Override
+    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+      return BoundedWindow.TIMESTAMP_MAX_VALUE;
+    }
+
+    @Override
+    public boolean isCompatible(Trigger other) {
+      return false;
+    }
+
+    @Override
+    public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+      return this;
+    }
+
+    @Override
+    public boolean shouldFire(TriggerContext c) {
+      return false;
+    }
+
+    @Override
+    public void onFire(TriggerContext c) { }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
new file mode 100644
index 0000000..7f74620
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FinishedTriggersBitSet}.
+ */
+@RunWith(JUnit4.class)
+public class FinishedTriggersBitSetTest {
+  /**
+   * Tests that after a trigger is set to finished, it reads back as finished.
+   */
+  @Test
+  public void testSetGet() {
+    FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1));
+  }
+
+  /**
+   * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
+   * others.
+   */
+  @Test
+  public void testClearRecursively() {
+    FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1));
+  }
+
+  @Test
+  public void testCopy() throws Exception {
+    FinishedTriggersBitSet finishedSet = FinishedTriggersBitSet.emptyWithCapacity(10);
+    assertThat(finishedSet.copy().getBitSet(), not(theInstance(finishedSet.getBitSet())));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
new file mode 100644
index 0000000..a66f74f
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+
+/**
+ * Generalized tests for {@link FinishedTriggers} implementations.
+ */
+public class FinishedTriggersProperties {
+  /**
+   * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set
+   * finished, it is correctly reported as finished.
+   */
+  public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+    assertFalse(finishedSet.isFinished(trigger));
+    finishedSet.setFinished(trigger, true);
+    assertTrue(finishedSet.isFinished(trigger));
+  }
+
+  /**
+   * For a few arbitrary triggers, tests that when the trigger is set finished it is correctly
+   * reported as finished.
+   */
+  public static void verifyGetAfterSet(FinishedTriggers finishedSet) {
+    ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
+        AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
+        AfterAll.of(
+            AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
+
+    verifyGetAfterSet(finishedSet, trigger);
+    verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0).subTriggers().get(1));
+    verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0));
+    verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1));
+    verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(1));
+    verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(0));
+  }
+
+  /**
+   * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
+   * others.
+   */
+  public static void verifyClearRecursively(FinishedTriggers finishedSet) {
+    ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
+        AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
+        AfterAll.of(
+            AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
+
+    // Set them all finished. This method is not on a trigger as it makes no sense outside tests.
+    setFinishedRecursively(finishedSet, trigger);
+    assertTrue(finishedSet.isFinished(trigger));
+    assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0)));
+    assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0)));
+    assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1)));
+
+    // Clear just the second AfterAll
+    finishedSet.clearRecursively(trigger.subTriggers().get(1));
+
+    // Check that the first and all that are still finished
+    assertTrue(finishedSet.isFinished(trigger));
+    verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0));
+    verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1));
+  }
+
+  private static void setFinishedRecursively(
+      FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+    finishedSet.setFinished(trigger, true);
+    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+      setFinishedRecursively(finishedSet, subTrigger);
+    }
+  }
+
+  private static void verifyFinishedRecursively(
+      FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+    assertTrue(finishedSet.isFinished(trigger));
+    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+      verifyFinishedRecursively(finishedSet, subTrigger);
+    }
+  }
+
+  private static void verifyUnfinishedRecursively(
+      FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+    assertFalse(finishedSet.isFinished(trigger));
+    for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+      verifyUnfinishedRecursively(finishedSet, subTrigger);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
new file mode 100644
index 0000000..072d264
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FinishedTriggersSet}.
+ */
+@RunWith(JUnit4.class)
+public class FinishedTriggersSetTest {
+  /**
+   * Tests that after a trigger is set to finished, it reads back as finished.
+   */
+  @Test
+  public void testSetGet() {
+    FinishedTriggersProperties.verifyGetAfterSet(
+        FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
+  }
+
+  /**
+   * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
+   * others.
+   */
+  @Test
+  public void testClearRecursively() {
+    FinishedTriggersProperties.verifyClearRecursively(
+        FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
+  }
+
+  @Test
+  public void testCopy() throws Exception {
+    FinishedTriggersSet finishedSet =
+        FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
+    assertThat(finishedSet.copy().getFinishedTriggers(),
+        not(theInstance(finishedSet.getFinishedTriggers())));
+  }
+}


[2/2] incubator-beam git commit: Restore trigger-related tests missed in #1083

Posted by lc...@apache.org.
Restore trigger-related tests missed in #1083

This closes #1127


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6d686288
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6d686288
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6d686288

Branch: refs/heads/master
Commit: 6d686288efd5fd64d43ba9802314f2cbbc8df72e
Parents: 71c69b3 8d43e8a
Author: Luke Cwik <lc...@google.com>
Authored: Tue Oct 18 11:01:14 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Oct 18 11:01:14 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/util/ExecutableTriggerTest.java    | 127 +++++++++++++++++++
 .../sdk/util/FinishedTriggersBitSetTest.java    |  55 ++++++++
 .../sdk/util/FinishedTriggersProperties.java    | 110 ++++++++++++++++
 .../beam/sdk/util/FinishedTriggersSetTest.java  |  60 +++++++++
 4 files changed, 352 insertions(+)
----------------------------------------------------------------------