You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/01 03:40:17 UTC
[1/3] incubator-beam git commit: Delete the obsolete ExecutableTrigger
Repository: incubator-beam
Updated Branches:
refs/heads/master af1764785 -> 978c99e9d
Delete the obsolete ExecutableTrigger
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1abe47bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1abe47bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1abe47bb
Branch: refs/heads/master
Commit: 1abe47bb566cf2bda700b602161c2139e38121e4
Parents: 97cd3e5
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 25 13:42:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 31 19:22:49 2016 -0700
----------------------------------------------------------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 3 +-
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 3 +-
.../beam/runners/core/ReduceFnTester.java | 6 +-
.../direct/WatermarkCallbackExecutor.java | 3 +-
.../dataflow/DataflowPipelineTranslator.java | 2 +-
.../org/apache/beam/sdk/transforms/Flatten.java | 3 +-
.../apache/beam/sdk/transforms/GroupByKey.java | 4 +-
.../beam/sdk/transforms/windowing/Window.java | 2 +-
.../apache/beam/sdk/util/ExecutableTrigger.java | 131 -------------------
.../apache/beam/sdk/util/WindowingStrategy.java | 10 +-
.../sdk/transforms/windowing/WindowTest.java | 6 +-
.../beam/sdk/util/ExecutableTriggerTest.java | 109 ---------------
12 files changed, 19 insertions(+), 263 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 75a5aa7..dde883c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -84,8 +84,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
key,
windowingStrategy,
ExecutableTriggerStateMachine.create(
- TriggerStateMachines.stateMachineForTrigger(
- windowingStrategy.getTrigger().getSpec())),
+ TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
stateInternals,
timerInternals,
c.windowingInternals(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 4dea775..f1a6ded 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -73,8 +73,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
key,
strategy,
ExecutableTriggerStateMachine.create(
- TriggerStateMachines.stateMachineForTrigger(
- strategy.getTrigger().getSpec())),
+ TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger())),
stateInternals,
timerInternals,
c.windowingInternals(),
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index f707349..f5ab8ea 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -132,7 +132,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
nonCombining(WindowingStrategy<?, W> windowingStrategy) throws Exception {
return new ReduceFnTester<Integer, Iterable<Integer>, W>(
windowingStrategy,
- TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger().getSpec()),
+ TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger()),
SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()),
IterableCoder.of(VarIntCoder.of()),
PipelineOptionsFactory.create(),
@@ -196,7 +196,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
return combining(
strategy,
- TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger().getSpec()),
+ TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger()),
combineFn,
outputCoder);
}
@@ -246,7 +246,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
return combining(
strategy,
- TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger().getSpec()),
+ TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger()),
combineFn,
outputCoder,
options,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
index 7961f24..c8bf912 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
@@ -107,8 +107,7 @@ class WatermarkCallbackExecutor {
public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
@SuppressWarnings("unchecked")
- Instant firingAfter =
- strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window);
+ Instant firingAfter = strategy.getTrigger().getWatermarkThatGuaranteesFiring((W) window);
return new WatermarkCallback(firingAfter, callback);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 12aa696..ce5af6b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -934,7 +934,7 @@ public class DataflowPipelineTranslator {
!windowingStrategy.getWindowFn().isNonMerging()
|| (isStreaming && !transform.fewKeys())
// TODO: Allow combiner lifting on the non-default trigger, as appropriate.
- || !(windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger);
+ || !(windowingStrategy.getTrigger() instanceof DefaultTrigger);
context.addInput(
PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
context.addInput(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index f3f4f88..e612836 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -118,8 +118,7 @@ public class Flatten {
+ windowingStrategy.getWindowFn() + ", " + other.getWindowFn());
}
- if (!windowingStrategy.getTrigger().getSpec()
- .isCompatible(other.getTrigger().getSpec())) {
+ if (!windowingStrategy.getTrigger().isCompatible(other.getTrigger())) {
throw new IllegalStateException(
"Inputs to Flatten had incompatible triggers: "
+ windowingStrategy.getTrigger() + ", " + other.getTrigger());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index eaf68b2..1faac59 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -169,7 +169,7 @@ public class GroupByKey<K, V>
// Verify that the input PCollection is bounded, or that there is windowing/triggering being
// used. Without this, the watermark (at end of global window) will never be reached.
if (windowingStrategy.getWindowFn() instanceof GlobalWindows
- && windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger
+ && windowingStrategy.getTrigger() instanceof DefaultTrigger
&& input.isBounded() != IsBounded.BOUNDED) {
throw new IllegalStateException("GroupByKey cannot be applied to non-bounded PCollection in "
+ "the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform "
@@ -212,7 +212,7 @@ public class GroupByKey<K, V>
// We also switch to the continuation trigger associated with the current trigger.
return inputStrategy
.withWindowFn(inputWindowFn)
- .withTrigger(inputStrategy.getTrigger().getSpec().getContinuationTrigger());
+ .withTrigger(inputStrategy.getTrigger().getContinuationTrigger());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 684a776..90e6a3a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -544,7 +544,7 @@ public class Window {
// Make sure that the windowing strategy is complete & valid.
if (outputStrategy.isTriggerSpecified()
- && !(outputStrategy.getTrigger().getSpec() instanceof DefaultTrigger)) {
+ && !(outputStrategy.getTrigger() instanceof DefaultTrigger)) {
if (!(outputStrategy.getWindowFn() instanceof GlobalWindows)
&& !outputStrategy.isAllowedLatenessSpecified()) {
throw new IllegalArgumentException("Except when using GlobalWindows,"
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
deleted file mode 100644
index 48a49aa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-
-/**
- * A wrapper around a trigger used during execution. While an actual trigger may appear multiple
- * times (both in the same trigger expression and in other trigger expressions), the {@code
- * ExecutableTrigger} wrapped around them forms a tree (only one occurrence).
- *
- * @deprecated uses of {@link ExecutableTrigger} should be ported to
- * org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.
- */
-@Deprecated
-public class ExecutableTrigger implements Serializable {
-
- /** Store the index assigned to this trigger. */
- private final int triggerIndex;
- private final int firstIndexAfterSubtree;
- private final List<ExecutableTrigger> subTriggers = new ArrayList<>();
- private final Trigger trigger;
-
- public static <W extends BoundedWindow> ExecutableTrigger create(Trigger trigger) {
- return create(trigger, 0);
- }
-
- private static <W extends BoundedWindow> ExecutableTrigger create(
- Trigger trigger, int nextUnusedIndex) {
- if (trigger instanceof OnceTrigger) {
- return new ExecutableOnceTrigger((OnceTrigger) trigger, nextUnusedIndex);
- } else {
- return new ExecutableTrigger(trigger, nextUnusedIndex);
- }
- }
-
- public static <W extends BoundedWindow> ExecutableTrigger createForOnceTrigger(
- OnceTrigger trigger, int nextUnusedIndex) {
- return new ExecutableOnceTrigger(trigger, nextUnusedIndex);
- }
-
- private ExecutableTrigger(Trigger trigger, int nextUnusedIndex) {
- this.trigger = checkNotNull(trigger, "trigger must not be null");
- this.triggerIndex = nextUnusedIndex++;
-
- if (trigger.subTriggers() != null) {
- for (Trigger subTrigger : trigger.subTriggers()) {
- ExecutableTrigger subExecutable = create(subTrigger, nextUnusedIndex);
- subTriggers.add(subExecutable);
- nextUnusedIndex = subExecutable.firstIndexAfterSubtree;
- }
- }
- firstIndexAfterSubtree = nextUnusedIndex;
- }
-
- public List<ExecutableTrigger> subTriggers() {
- return subTriggers;
- }
-
- @Override
- public String toString() {
- return trigger.toString();
- }
-
- /**
- * Return the underlying trigger specification corresponding to this {@code ExecutableTrigger}.
- */
- public Trigger getSpec() {
- return trigger;
- }
-
- public int getTriggerIndex() {
- return triggerIndex;
- }
-
- public final int getFirstIndexAfterSubtree() {
- return firstIndexAfterSubtree;
- }
-
- public boolean isCompatible(ExecutableTrigger other) {
- return trigger.isCompatible(other.trigger);
- }
-
- public ExecutableTrigger getSubTriggerContaining(int index) {
- checkNotNull(subTriggers);
- checkState(index > triggerIndex && index < firstIndexAfterSubtree,
- "Cannot find sub-trigger containing index not in this tree.");
- ExecutableTrigger previous = null;
- for (ExecutableTrigger subTrigger : subTriggers) {
- if (index < subTrigger.triggerIndex) {
- return previous;
- }
- previous = subTrigger;
- }
- return previous;
- }
-
- /**
- * {@link ExecutableTrigger} that enforces the fact that the trigger should always FIRE_AND_FINISH
- * and never just FIRE.
- */
- private static class ExecutableOnceTrigger extends ExecutableTrigger {
-
- public ExecutableOnceTrigger(OnceTrigger trigger, int nextUnusedIndex) {
- super(trigger, nextUnusedIndex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
index b332ed7..137f108 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
@@ -58,7 +58,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
private final WindowFn<T, W> windowFn;
private final OutputTimeFn<? super W> outputTimeFn;
- private final ExecutableTrigger trigger;
+ private final Trigger trigger;
private final AccumulationMode mode;
private final Duration allowedLateness;
private final ClosingBehavior closingBehavior;
@@ -69,7 +69,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
private WindowingStrategy(
WindowFn<T, W> windowFn,
- ExecutableTrigger trigger, boolean triggerSpecified,
+ Trigger trigger, boolean triggerSpecified,
AccumulationMode mode, boolean modeSpecified,
Duration allowedLateness, boolean allowedLatenessSpecified,
OutputTimeFn<? super W> outputTimeFn, boolean outputTimeFnSpecified,
@@ -96,7 +96,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
public static <T, W extends BoundedWindow> WindowingStrategy<T, W> of(
WindowFn<T, W> windowFn) {
return new WindowingStrategy<>(windowFn,
- ExecutableTrigger.create(DefaultTrigger.<W>of()), false,
+ DefaultTrigger.of(), false,
AccumulationMode.DISCARDING_FIRED_PANES, false,
DEFAULT_ALLOWED_LATENESS, false,
OutputTimeFns.outputAtEndOfWindow(), false,
@@ -107,7 +107,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
return windowFn;
}
- public ExecutableTrigger getTrigger() {
+ public Trigger getTrigger() {
return trigger;
}
@@ -150,7 +150,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
public WindowingStrategy<T, W> withTrigger(Trigger trigger) {
return new WindowingStrategy<T, W>(
windowFn,
- ExecutableTrigger.create(trigger), true,
+ trigger, true,
mode, modeSpecified,
allowedLateness, allowedLatenessSpecified,
outputTimeFn, outputTimeFnSpecified,
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 30228fe..3125ae8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -71,7 +71,7 @@ public class WindowTest implements Serializable {
.apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(10))))
.getWindowingStrategy();
assertTrue(strategy.getWindowFn() instanceof FixedWindows);
- assertTrue(strategy.getTrigger().getSpec() instanceof DefaultTrigger);
+ assertTrue(strategy.getTrigger() instanceof DefaultTrigger);
assertEquals(AccumulationMode.DISCARDING_FIRED_PANES, strategy.getMode());
}
@@ -88,7 +88,7 @@ public class WindowTest implements Serializable {
.getWindowingStrategy();
assertEquals(fixed10, strategy.getWindowFn());
- assertEquals(trigger, strategy.getTrigger().getSpec());
+ assertEquals(trigger, strategy.getTrigger());
assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
}
@@ -105,7 +105,7 @@ public class WindowTest implements Serializable {
.getWindowingStrategy();
assertEquals(fixed10, strategy.getWindowFn());
- assertEquals(trigger, strategy.getTrigger().getSpec());
+ assertEquals(trigger, strategy.getTrigger());
assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
deleted file mode 100644
index befc07e..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ExecutableTrigger}.
- */
-@RunWith(JUnit4.class)
-public class ExecutableTriggerTest {
-
- @Test
- public void testIndexAssignmentLeaf() throws Exception {
- StubTrigger t1 = new StubTrigger();
- ExecutableTrigger executable = ExecutableTrigger.create(t1);
- assertEquals(0, executable.getTriggerIndex());
- }
-
- @Test
- public void testIndexAssignmentOneLevel() throws Exception {
- StubTrigger t1 = new StubTrigger();
- StubTrigger t2 = new StubTrigger();
- StubTrigger t = new StubTrigger(t1, t2);
-
- ExecutableTrigger executable = ExecutableTrigger.create(t);
-
- assertEquals(0, executable.getTriggerIndex());
- assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
- assertSame(t1, executable.subTriggers().get(0).getSpec());
- assertEquals(2, executable.subTriggers().get(1).getTriggerIndex());
- assertSame(t2, executable.subTriggers().get(1).getSpec());
- }
-
- @Test
- public void testIndexAssignmentTwoLevel() throws Exception {
- StubTrigger t11 = new StubTrigger();
- StubTrigger t12 = new StubTrigger();
- StubTrigger t13 = new StubTrigger();
- StubTrigger t14 = new StubTrigger();
- StubTrigger t21 = new StubTrigger();
- StubTrigger t22 = new StubTrigger();
- StubTrigger t1 = new StubTrigger(t11, t12, t13, t14);
- StubTrigger t2 = new StubTrigger(t21, t22);
- StubTrigger t = new StubTrigger(t1, t2);
-
- ExecutableTrigger executable = ExecutableTrigger.create(t);
-
- assertEquals(0, executable.getTriggerIndex());
- assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
- assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree());
- assertEquals(6, executable.subTriggers().get(1).getTriggerIndex());
-
- assertSame(t1, executable.getSubTriggerContaining(1).getSpec());
- assertSame(t2, executable.getSubTriggerContaining(6).getSpec());
- assertSame(t1, executable.getSubTriggerContaining(2).getSpec());
- assertSame(t1, executable.getSubTriggerContaining(3).getSpec());
- assertSame(t1, executable.getSubTriggerContaining(5).getSpec());
- assertSame(t2, executable.getSubTriggerContaining(7).getSpec());
- }
-
- private static class StubTrigger extends Trigger {
-
- @SafeVarargs
- protected StubTrigger(Trigger... subTriggers) {
- super(Arrays.asList(subTriggers));
- }
-
- @Override
- public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
- return BoundedWindow.TIMESTAMP_MAX_VALUE;
- }
-
- @Override
- public boolean isCompatible(Trigger other) {
- return false;
- }
-
- @Override
- public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
- return this;
- }
- }
-}
[3/3] incubator-beam git commit: This closes #1187
Posted by ke...@apache.org.
This closes #1187
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/978c99e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/978c99e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/978c99e9
Branch: refs/heads/master
Commit: 978c99e9d699f79d25662c98c091114b8f608ee7
Parents: af17647 1abe47b
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 31 20:40:02 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 31 20:40:02 2016 -0700
----------------------------------------------------------------------
.../core/GroupAlsoByWindowViaWindowSetDoFn.java | 3 +-
.../GroupAlsoByWindowsViaOutputBufferDoFn.java | 3 +-
.../beam/runners/core/ReduceFnTester.java | 6 +-
.../direct/WatermarkCallbackExecutor.java | 3 +-
.../dataflow/DataflowPipelineTranslator.java | 2 +-
.../beam/runners/dataflow/DataflowRunner.java | 4 +-
.../org/apache/beam/sdk/transforms/Flatten.java | 3 +-
.../apache/beam/sdk/transforms/GroupByKey.java | 4 +-
.../beam/sdk/transforms/windowing/Window.java | 2 +-
.../apache/beam/sdk/util/ExecutableTrigger.java | 131 -------------------
.../apache/beam/sdk/util/WindowingStrategy.java | 10 +-
.../sdk/transforms/windowing/WindowTest.java | 6 +-
.../beam/sdk/util/ExecutableTriggerTest.java | 109 ---------------
13 files changed, 21 insertions(+), 265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/978c99e9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
[2/3] incubator-beam git commit: Update Dataflow worker image
Posted by ke...@apache.org.
Update Dataflow worker image
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/97cd3e5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/97cd3e5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/97cd3e5c
Branch: refs/heads/master
Commit: 97cd3e5c02bf8c048029b51d7588a110eb5ce62d
Parents: 7e45830
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 31 11:27:36 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 31 19:22:49 2016 -0700
----------------------------------------------------------------------
.../java/org/apache/beam/runners/dataflow/DataflowRunner.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/97cd3e5c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 2943ab9..ce126db 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -208,9 +208,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
// Default Docker container images that execute Dataflow worker harness, residing in Google
// Container Registry, separately for Batch and Streaming.
public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE =
- "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161026";
+ "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161031";
public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE =
- "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161026";
+ "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161031";
// The limit of CreateJob request size.
private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;