You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2016/10/18 18:08:04 UTC
[1/2] incubator-beam git commit: Restore trigger-related tests missed
in #1083
Repository: incubator-beam
Updated Branches:
refs/heads/master 71c69b31b -> 6d686288e
Restore trigger-related tests missed in #1083
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8d43e8aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8d43e8aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8d43e8aa
Branch: refs/heads/master
Commit: 8d43e8aa7ccb154e17d6840c25c7a72684c615aa
Parents: 71c69b3
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 18 10:11:37 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Oct 18 11:00:47 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/util/ExecutableTriggerTest.java | 127 +++++++++++++++++++
.../sdk/util/FinishedTriggersBitSetTest.java | 55 ++++++++
.../sdk/util/FinishedTriggersProperties.java | 110 ++++++++++++++++
.../beam/sdk/util/FinishedTriggersSetTest.java | 60 +++++++++
4 files changed, 352 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
new file mode 100644
index 0000000..1e3a1ff
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ExecutableTrigger}.
+ */
+@RunWith(JUnit4.class)
+public class ExecutableTriggerTest {
+
+ @Test
+ public void testIndexAssignmentLeaf() throws Exception {
+ StubTrigger t1 = new StubTrigger();
+ ExecutableTrigger executable = ExecutableTrigger.create(t1);
+ assertEquals(0, executable.getTriggerIndex());
+ }
+
+ @Test
+ public void testIndexAssignmentOneLevel() throws Exception {
+ StubTrigger t1 = new StubTrigger();
+ StubTrigger t2 = new StubTrigger();
+ StubTrigger t = new StubTrigger(t1, t2);
+
+ ExecutableTrigger executable = ExecutableTrigger.create(t);
+
+ assertEquals(0, executable.getTriggerIndex());
+ assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
+ assertSame(t1, executable.subTriggers().get(0).getSpec());
+ assertEquals(2, executable.subTriggers().get(1).getTriggerIndex());
+ assertSame(t2, executable.subTriggers().get(1).getSpec());
+ }
+
+ @Test
+ public void testIndexAssignmentTwoLevel() throws Exception {
+ StubTrigger t11 = new StubTrigger();
+ StubTrigger t12 = new StubTrigger();
+ StubTrigger t13 = new StubTrigger();
+ StubTrigger t14 = new StubTrigger();
+ StubTrigger t21 = new StubTrigger();
+ StubTrigger t22 = new StubTrigger();
+ StubTrigger t1 = new StubTrigger(t11, t12, t13, t14);
+ StubTrigger t2 = new StubTrigger(t21, t22);
+ StubTrigger t = new StubTrigger(t1, t2);
+
+ ExecutableTrigger executable = ExecutableTrigger.create(t);
+
+ assertEquals(0, executable.getTriggerIndex());
+ assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
+ assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree());
+ assertEquals(6, executable.subTriggers().get(1).getTriggerIndex());
+
+ assertSame(t1, executable.getSubTriggerContaining(1).getSpec());
+ assertSame(t2, executable.getSubTriggerContaining(6).getSpec());
+ assertSame(t1, executable.getSubTriggerContaining(2).getSpec());
+ assertSame(t1, executable.getSubTriggerContaining(3).getSpec());
+ assertSame(t1, executable.getSubTriggerContaining(5).getSpec());
+ assertSame(t2, executable.getSubTriggerContaining(7).getSpec());
+ }
+
+ private static class StubTrigger extends Trigger {
+
+ @SafeVarargs
+ protected StubTrigger(Trigger... subTriggers) {
+ super(Arrays.asList(subTriggers));
+ }
+
+ @Override
+ public void onElement(OnElementContext c) throws Exception { }
+
+ @Override
+ public void onMerge(OnMergeContext c) throws Exception { }
+
+ @Override
+ public void clear(TriggerContext c) throws Exception {
+ }
+
+ @Override
+ public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
+ return BoundedWindow.TIMESTAMP_MAX_VALUE;
+ }
+
+ @Override
+ public boolean isCompatible(Trigger other) {
+ return false;
+ }
+
+ @Override
+ public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
+ return this;
+ }
+
+ @Override
+ public boolean shouldFire(TriggerContext c) {
+ return false;
+ }
+
+ @Override
+ public void onFire(TriggerContext c) { }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
new file mode 100644
index 0000000..7f74620
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersBitSetTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FinishedTriggersBitSet}.
+ */
+@RunWith(JUnit4.class)
+public class FinishedTriggersBitSetTest {
+ /**
+ * Tests that after a trigger is set to finished, it reads back as finished.
+ */
+ @Test
+ public void testSetGet() {
+ FinishedTriggersProperties.verifyGetAfterSet(FinishedTriggersBitSet.emptyWithCapacity(1));
+ }
+
+ /**
+ * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
+ * others.
+ */
+ @Test
+ public void testClearRecursively() {
+ FinishedTriggersProperties.verifyClearRecursively(FinishedTriggersBitSet.emptyWithCapacity(1));
+ }
+
+ @Test
+ public void testCopy() throws Exception {
+ FinishedTriggersBitSet finishedSet = FinishedTriggersBitSet.emptyWithCapacity(10);
+ assertThat(finishedSet.copy().getBitSet(), not(theInstance(finishedSet.getBitSet())));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
new file mode 100644
index 0000000..a66f74f
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersProperties.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.beam.sdk.transforms.windowing.AfterAll;
+import org.apache.beam.sdk.transforms.windowing.AfterFirst;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+
+/**
+ * Generalized tests for {@link FinishedTriggers} implementations.
+ */
+public class FinishedTriggersProperties {
+ /**
+ * Tests that for the provided trigger and {@link FinishedTriggers}, when the trigger is set
+ * finished, it is correctly reported as finished.
+ */
+ public static void verifyGetAfterSet(FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+ assertFalse(finishedSet.isFinished(trigger));
+ finishedSet.setFinished(trigger, true);
+ assertTrue(finishedSet.isFinished(trigger));
+ }
+
+ /**
+ * For a few arbitrary triggers, tests that when the trigger is set finished it is correctly
+ * reported as finished.
+ */
+ public static void verifyGetAfterSet(FinishedTriggers finishedSet) {
+ ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
+ AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
+ AfterAll.of(
+ AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
+
+ verifyGetAfterSet(finishedSet, trigger);
+ verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0).subTriggers().get(1));
+ verifyGetAfterSet(finishedSet, trigger.subTriggers().get(0));
+ verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1));
+ verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(1));
+ verifyGetAfterSet(finishedSet, trigger.subTriggers().get(1).subTriggers().get(0));
+ }
+
+ /**
+ * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
+ * others.
+ */
+ public static void verifyClearRecursively(FinishedTriggers finishedSet) {
+ ExecutableTrigger trigger = ExecutableTrigger.create(AfterAll.of(
+ AfterFirst.of(AfterPane.elementCountAtLeast(3), AfterWatermark.pastEndOfWindow()),
+ AfterAll.of(
+ AfterPane.elementCountAtLeast(10), AfterProcessingTime.pastFirstElementInPane())));
+
+ // Set them all finished. This method is not on a trigger as it makes no sense outside tests.
+ setFinishedRecursively(finishedSet, trigger);
+ assertTrue(finishedSet.isFinished(trigger));
+ assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0)));
+ assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(0)));
+ assertTrue(finishedSet.isFinished(trigger.subTriggers().get(0).subTriggers().get(1)));
+
+ // Clear just the second AfterAll
+ finishedSet.clearRecursively(trigger.subTriggers().get(1));
+
+ // Check that the first and all that are still finished
+ assertTrue(finishedSet.isFinished(trigger));
+ verifyFinishedRecursively(finishedSet, trigger.subTriggers().get(0));
+ verifyUnfinishedRecursively(finishedSet, trigger.subTriggers().get(1));
+ }
+
+ private static void setFinishedRecursively(
+ FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+ finishedSet.setFinished(trigger, true);
+ for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+ setFinishedRecursively(finishedSet, subTrigger);
+ }
+ }
+
+ private static void verifyFinishedRecursively(
+ FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+ assertTrue(finishedSet.isFinished(trigger));
+ for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+ verifyFinishedRecursively(finishedSet, subTrigger);
+ }
+ }
+
+ private static void verifyUnfinishedRecursively(
+ FinishedTriggers finishedSet, ExecutableTrigger trigger) {
+ assertFalse(finishedSet.isFinished(trigger));
+ for (ExecutableTrigger subTrigger : trigger.subTriggers()) {
+ verifyUnfinishedRecursively(finishedSet, subTrigger);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8d43e8aa/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
new file mode 100644
index 0000000..072d264
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/FinishedTriggersSetTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.theInstance;
+import static org.junit.Assert.assertThat;
+
+import java.util.HashSet;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link FinishedTriggersSet}.
+ */
+@RunWith(JUnit4.class)
+public class FinishedTriggersSetTest {
+ /**
+ * Tests that after a trigger is set to finished, it reads back as finished.
+ */
+ @Test
+ public void testSetGet() {
+ FinishedTriggersProperties.verifyGetAfterSet(
+ FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
+ }
+
+ /**
+ * Tests that clearing a trigger recursively clears all of that triggers subTriggers, but no
+ * others.
+ */
+ @Test
+ public void testClearRecursively() {
+ FinishedTriggersProperties.verifyClearRecursively(
+ FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>()));
+ }
+
+ @Test
+ public void testCopy() throws Exception {
+ FinishedTriggersSet finishedSet =
+ FinishedTriggersSet.fromSet(new HashSet<ExecutableTrigger>());
+ assertThat(finishedSet.copy().getFinishedTriggers(),
+ not(theInstance(finishedSet.getFinishedTriggers())));
+ }
+}
[2/2] incubator-beam git commit: Restore trigger-related tests missed
in #1083
Posted by lc...@apache.org.
Restore trigger-related tests missed in #1083
This closes #1127
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6d686288
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6d686288
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6d686288
Branch: refs/heads/master
Commit: 6d686288efd5fd64d43ba9802314f2cbbc8df72e
Parents: 71c69b3 8d43e8a
Author: Luke Cwik <lc...@google.com>
Authored: Tue Oct 18 11:01:14 2016 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Tue Oct 18 11:01:14 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/util/ExecutableTriggerTest.java | 127 +++++++++++++++++++
.../sdk/util/FinishedTriggersBitSetTest.java | 55 ++++++++
.../sdk/util/FinishedTriggersProperties.java | 110 ++++++++++++++++
.../beam/sdk/util/FinishedTriggersSetTest.java | 60 +++++++++
4 files changed, 352 insertions(+)
----------------------------------------------------------------------