You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/01 03:40:17 UTC

[1/3] incubator-beam git commit: Delete the obsolete ExecutableTrigger

Repository: incubator-beam
Updated Branches:
  refs/heads/master af1764785 -> 978c99e9d


Delete the obsolete ExecutableTrigger


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1abe47bb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1abe47bb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1abe47bb

Branch: refs/heads/master
Commit: 1abe47bb566cf2bda700b602161c2139e38121e4
Parents: 97cd3e5
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 25 13:42:53 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 31 19:22:49 2016 -0700

----------------------------------------------------------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   3 +-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   3 +-
 .../beam/runners/core/ReduceFnTester.java       |   6 +-
 .../direct/WatermarkCallbackExecutor.java       |   3 +-
 .../dataflow/DataflowPipelineTranslator.java    |   2 +-
 .../org/apache/beam/sdk/transforms/Flatten.java |   3 +-
 .../apache/beam/sdk/transforms/GroupByKey.java  |   4 +-
 .../beam/sdk/transforms/windowing/Window.java   |   2 +-
 .../apache/beam/sdk/util/ExecutableTrigger.java | 131 -------------------
 .../apache/beam/sdk/util/WindowingStrategy.java |  10 +-
 .../sdk/transforms/windowing/WindowTest.java    |   6 +-
 .../beam/sdk/util/ExecutableTriggerTest.java    | 109 ---------------
 12 files changed, 19 insertions(+), 263 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
index 75a5aa7..dde883c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java
@@ -84,8 +84,7 @@ public class GroupAlsoByWindowViaWindowSetDoFn<
             key,
             windowingStrategy,
             ExecutableTriggerStateMachine.create(
-                TriggerStateMachines.stateMachineForTrigger(
-                    windowingStrategy.getTrigger().getSpec())),
+                TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
             stateInternals,
             timerInternals,
             c.windowingInternals(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
index 4dea775..f1a6ded 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java
@@ -73,8 +73,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFn<K, InputT, OutputT, W extends
             key,
             strategy,
             ExecutableTriggerStateMachine.create(
-                TriggerStateMachines.stateMachineForTrigger(
-                    strategy.getTrigger().getSpec())),
+                TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger())),
             stateInternals,
             timerInternals,
             c.windowingInternals(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index f707349..f5ab8ea 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -132,7 +132,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
       nonCombining(WindowingStrategy<?, W> windowingStrategy) throws Exception {
     return new ReduceFnTester<Integer, Iterable<Integer>, W>(
         windowingStrategy,
-        TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger().getSpec()),
+        TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger()),
         SystemReduceFn.<String, Integer, W>buffering(VarIntCoder.of()),
         IterableCoder.of(VarIntCoder.of()),
         PipelineOptionsFactory.create(),
@@ -196,7 +196,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
     return combining(
         strategy,
-        TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger().getSpec()),
+        TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger()),
         combineFn,
         outputCoder);
   }
@@ -246,7 +246,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> {
 
     return combining(
         strategy,
-        TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger().getSpec()),
+        TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger()),
         combineFn,
         outputCoder,
         options,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
index 7961f24..c8bf912 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkCallbackExecutor.java
@@ -107,8 +107,7 @@ class WatermarkCallbackExecutor {
     public static <W extends BoundedWindow> WatermarkCallback onGuaranteedFiring(
         BoundedWindow window, WindowingStrategy<?, W> strategy, Runnable callback) {
       @SuppressWarnings("unchecked")
-      Instant firingAfter =
-          strategy.getTrigger().getSpec().getWatermarkThatGuaranteesFiring((W) window);
+      Instant firingAfter = strategy.getTrigger().getWatermarkThatGuaranteesFiring((W) window);
       return new WatermarkCallback(firingAfter, callback);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
index 12aa696..ce5af6b 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
@@ -934,7 +934,7 @@ public class DataflowPipelineTranslator {
                 !windowingStrategy.getWindowFn().isNonMerging()
                 || (isStreaming && !transform.fewKeys())
                 // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
-                || !(windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger);
+                || !(windowingStrategy.getTrigger() instanceof DefaultTrigger);
             context.addInput(
                 PropertyNames.DISALLOW_COMBINER_LIFTING, disallowCombinerLifting);
             context.addInput(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index f3f4f88..e612836 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -118,8 +118,7 @@ public class Flatten {
                 + windowingStrategy.getWindowFn() + ", " + other.getWindowFn());
           }
 
-          if (!windowingStrategy.getTrigger().getSpec()
-              .isCompatible(other.getTrigger().getSpec())) {
+          if (!windowingStrategy.getTrigger().isCompatible(other.getTrigger())) {
             throw new IllegalStateException(
                 "Inputs to Flatten had incompatible triggers: "
                 + windowingStrategy.getTrigger() + ", " + other.getTrigger());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index eaf68b2..1faac59 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -169,7 +169,7 @@ public class GroupByKey<K, V>
     // Verify that the input PCollection is bounded, or that there is windowing/triggering being
     // used. Without this, the watermark (at end of global window) will never be reached.
     if (windowingStrategy.getWindowFn() instanceof GlobalWindows
-        && windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger
+        && windowingStrategy.getTrigger() instanceof DefaultTrigger
         && input.isBounded() != IsBounded.BOUNDED) {
       throw new IllegalStateException("GroupByKey cannot be applied to non-bounded PCollection in "
           + "the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform "
@@ -212,7 +212,7 @@ public class GroupByKey<K, V>
     // We also switch to the continuation trigger associated with the current trigger.
     return inputStrategy
         .withWindowFn(inputWindowFn)
-        .withTrigger(inputStrategy.getTrigger().getSpec().getContinuationTrigger());
+        .withTrigger(inputStrategy.getTrigger().getContinuationTrigger());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index 684a776..90e6a3a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -544,7 +544,7 @@ public class Window {
 
       // Make sure that the windowing strategy is complete & valid.
       if (outputStrategy.isTriggerSpecified()
-          && !(outputStrategy.getTrigger().getSpec() instanceof DefaultTrigger)) {
+          && !(outputStrategy.getTrigger() instanceof DefaultTrigger)) {
         if (!(outputStrategy.getWindowFn() instanceof GlobalWindows)
             && !outputStrategy.isAllowedLatenessSpecified()) {
           throw new IllegalArgumentException("Except when using GlobalWindows,"

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
deleted file mode 100644
index 48a49aa..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ExecutableTrigger.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
-
-/**
- * A wrapper around a trigger used during execution. While an actual trigger may appear multiple
- * times (both in the same trigger expression and in other trigger expressions), the {@code
- * ExecutableTrigger} wrapped around them forms a tree (only one occurrence).
- *
- * @deprecated uses of {@link ExecutableTrigger} should be ported to
- *     org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine.
- */
-@Deprecated
-public class ExecutableTrigger implements Serializable {
-
-  /** Store the index assigned to this trigger. */
-  private final int triggerIndex;
-  private final int firstIndexAfterSubtree;
-  private final List<ExecutableTrigger> subTriggers = new ArrayList<>();
-  private final Trigger trigger;
-
-  public static <W extends BoundedWindow> ExecutableTrigger create(Trigger trigger) {
-    return create(trigger, 0);
-  }
-
-  private static <W extends BoundedWindow> ExecutableTrigger create(
-      Trigger trigger, int nextUnusedIndex) {
-    if (trigger instanceof OnceTrigger) {
-      return new ExecutableOnceTrigger((OnceTrigger) trigger, nextUnusedIndex);
-    } else {
-      return new ExecutableTrigger(trigger, nextUnusedIndex);
-    }
-  }
-
-  public static <W extends BoundedWindow> ExecutableTrigger createForOnceTrigger(
-      OnceTrigger trigger, int nextUnusedIndex) {
-    return new ExecutableOnceTrigger(trigger, nextUnusedIndex);
-  }
-
-  private ExecutableTrigger(Trigger trigger, int nextUnusedIndex) {
-    this.trigger = checkNotNull(trigger, "trigger must not be null");
-    this.triggerIndex = nextUnusedIndex++;
-
-    if (trigger.subTriggers() != null) {
-      for (Trigger subTrigger : trigger.subTriggers()) {
-        ExecutableTrigger subExecutable = create(subTrigger, nextUnusedIndex);
-        subTriggers.add(subExecutable);
-        nextUnusedIndex = subExecutable.firstIndexAfterSubtree;
-      }
-    }
-    firstIndexAfterSubtree = nextUnusedIndex;
-  }
-
-  public List<ExecutableTrigger> subTriggers() {
-    return subTriggers;
-  }
-
-  @Override
-  public String toString() {
-    return trigger.toString();
-  }
-
-  /**
-   * Return the underlying trigger specification corresponding to this {@code ExecutableTrigger}.
-   */
-  public Trigger getSpec() {
-    return trigger;
-  }
-
-  public int getTriggerIndex() {
-    return triggerIndex;
-  }
-
-  public final int getFirstIndexAfterSubtree() {
-    return firstIndexAfterSubtree;
-  }
-
-  public boolean isCompatible(ExecutableTrigger other) {
-    return trigger.isCompatible(other.trigger);
-  }
-
-  public ExecutableTrigger getSubTriggerContaining(int index) {
-    checkNotNull(subTriggers);
-    checkState(index > triggerIndex && index < firstIndexAfterSubtree,
-        "Cannot find sub-trigger containing index not in this tree.");
-    ExecutableTrigger previous = null;
-    for (ExecutableTrigger subTrigger : subTriggers) {
-      if (index < subTrigger.triggerIndex) {
-        return previous;
-      }
-      previous = subTrigger;
-    }
-    return previous;
-  }
-
-  /**
-   * {@link ExecutableTrigger} that enforces the fact that the trigger should always FIRE_AND_FINISH
-   * and never just FIRE.
-   */
-  private static class ExecutableOnceTrigger extends ExecutableTrigger {
-
-    public ExecutableOnceTrigger(OnceTrigger trigger, int nextUnusedIndex) {
-      super(trigger, nextUnusedIndex);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
index b332ed7..137f108 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java
@@ -58,7 +58,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
 
   private final WindowFn<T, W> windowFn;
   private final OutputTimeFn<? super W> outputTimeFn;
-  private final ExecutableTrigger trigger;
+  private final Trigger trigger;
   private final AccumulationMode mode;
   private final Duration allowedLateness;
   private final ClosingBehavior closingBehavior;
@@ -69,7 +69,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
 
   private WindowingStrategy(
       WindowFn<T, W> windowFn,
-      ExecutableTrigger trigger, boolean triggerSpecified,
+      Trigger trigger, boolean triggerSpecified,
       AccumulationMode mode, boolean modeSpecified,
       Duration allowedLateness, boolean allowedLatenessSpecified,
       OutputTimeFn<? super W> outputTimeFn, boolean outputTimeFnSpecified,
@@ -96,7 +96,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
   public static <T, W extends BoundedWindow> WindowingStrategy<T, W> of(
       WindowFn<T, W> windowFn) {
     return new WindowingStrategy<>(windowFn,
-        ExecutableTrigger.create(DefaultTrigger.<W>of()), false,
+        DefaultTrigger.of(), false,
         AccumulationMode.DISCARDING_FIRED_PANES, false,
         DEFAULT_ALLOWED_LATENESS, false,
         OutputTimeFns.outputAtEndOfWindow(), false,
@@ -107,7 +107,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     return windowFn;
   }
 
-  public ExecutableTrigger getTrigger() {
+  public Trigger getTrigger() {
     return trigger;
   }
 
@@ -150,7 +150,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
   public WindowingStrategy<T, W> withTrigger(Trigger trigger) {
     return new WindowingStrategy<T, W>(
         windowFn,
-        ExecutableTrigger.create(trigger), true,
+        trigger, true,
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
         outputTimeFn, outputTimeFnSpecified,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index 30228fe..3125ae8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -71,7 +71,7 @@ public class WindowTest implements Serializable {
       .apply(Window.<String>into(FixedWindows.of(Duration.standardMinutes(10))))
       .getWindowingStrategy();
     assertTrue(strategy.getWindowFn() instanceof FixedWindows);
-    assertTrue(strategy.getTrigger().getSpec() instanceof DefaultTrigger);
+    assertTrue(strategy.getTrigger() instanceof DefaultTrigger);
     assertEquals(AccumulationMode.DISCARDING_FIRED_PANES, strategy.getMode());
   }
 
@@ -88,7 +88,7 @@ public class WindowTest implements Serializable {
       .getWindowingStrategy();
 
     assertEquals(fixed10, strategy.getWindowFn());
-    assertEquals(trigger, strategy.getTrigger().getSpec());
+    assertEquals(trigger, strategy.getTrigger());
     assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
   }
 
@@ -105,7 +105,7 @@ public class WindowTest implements Serializable {
       .getWindowingStrategy();
 
     assertEquals(fixed10, strategy.getWindowFn());
-    assertEquals(trigger, strategy.getTrigger().getSpec());
+    assertEquals(trigger, strategy.getTrigger());
     assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode());
     assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1abe47bb/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
deleted file mode 100644
index befc07e..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ExecutableTriggerTest.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
-
-import java.util.Arrays;
-import java.util.List;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.joda.time.Instant;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ExecutableTrigger}.
- */
-@RunWith(JUnit4.class)
-public class ExecutableTriggerTest {
-
-  @Test
-  public void testIndexAssignmentLeaf() throws Exception {
-    StubTrigger t1 = new StubTrigger();
-    ExecutableTrigger executable = ExecutableTrigger.create(t1);
-    assertEquals(0, executable.getTriggerIndex());
-  }
-
-  @Test
-  public void testIndexAssignmentOneLevel() throws Exception {
-    StubTrigger t1 = new StubTrigger();
-    StubTrigger t2 = new StubTrigger();
-    StubTrigger t = new StubTrigger(t1, t2);
-
-    ExecutableTrigger executable = ExecutableTrigger.create(t);
-
-    assertEquals(0, executable.getTriggerIndex());
-    assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
-    assertSame(t1, executable.subTriggers().get(0).getSpec());
-    assertEquals(2, executable.subTriggers().get(1).getTriggerIndex());
-    assertSame(t2, executable.subTriggers().get(1).getSpec());
-  }
-
-  @Test
-  public void testIndexAssignmentTwoLevel() throws Exception {
-    StubTrigger t11 = new StubTrigger();
-    StubTrigger t12 = new StubTrigger();
-    StubTrigger t13 = new StubTrigger();
-    StubTrigger t14 = new StubTrigger();
-    StubTrigger t21 = new StubTrigger();
-    StubTrigger t22 = new StubTrigger();
-    StubTrigger t1 = new StubTrigger(t11, t12, t13, t14);
-    StubTrigger t2 = new StubTrigger(t21, t22);
-    StubTrigger t = new StubTrigger(t1, t2);
-
-    ExecutableTrigger executable = ExecutableTrigger.create(t);
-
-    assertEquals(0, executable.getTriggerIndex());
-    assertEquals(1, executable.subTriggers().get(0).getTriggerIndex());
-    assertEquals(6, executable.subTriggers().get(0).getFirstIndexAfterSubtree());
-    assertEquals(6, executable.subTriggers().get(1).getTriggerIndex());
-
-    assertSame(t1, executable.getSubTriggerContaining(1).getSpec());
-    assertSame(t2, executable.getSubTriggerContaining(6).getSpec());
-    assertSame(t1, executable.getSubTriggerContaining(2).getSpec());
-    assertSame(t1, executable.getSubTriggerContaining(3).getSpec());
-    assertSame(t1, executable.getSubTriggerContaining(5).getSpec());
-    assertSame(t2, executable.getSubTriggerContaining(7).getSpec());
-  }
-
-  private static class StubTrigger extends Trigger {
-
-    @SafeVarargs
-    protected StubTrigger(Trigger... subTriggers) {
-      super(Arrays.asList(subTriggers));
-    }
-
-    @Override
-    public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) {
-      return BoundedWindow.TIMESTAMP_MAX_VALUE;
-    }
-
-    @Override
-    public boolean isCompatible(Trigger other) {
-      return false;
-    }
-
-    @Override
-    public Trigger getContinuationTrigger(List<Trigger> continuationTriggers) {
-      return this;
-    }
-  }
-}


[3/3] incubator-beam git commit: This closes #1187

Posted by ke...@apache.org.
This closes #1187


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/978c99e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/978c99e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/978c99e9

Branch: refs/heads/master
Commit: 978c99e9d699f79d25662c98c091114b8f608ee7
Parents: af17647 1abe47b
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 31 20:40:02 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 31 20:40:02 2016 -0700

----------------------------------------------------------------------
 .../core/GroupAlsoByWindowViaWindowSetDoFn.java |   3 +-
 .../GroupAlsoByWindowsViaOutputBufferDoFn.java  |   3 +-
 .../beam/runners/core/ReduceFnTester.java       |   6 +-
 .../direct/WatermarkCallbackExecutor.java       |   3 +-
 .../dataflow/DataflowPipelineTranslator.java    |   2 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   4 +-
 .../org/apache/beam/sdk/transforms/Flatten.java |   3 +-
 .../apache/beam/sdk/transforms/GroupByKey.java  |   4 +-
 .../beam/sdk/transforms/windowing/Window.java   |   2 +-
 .../apache/beam/sdk/util/ExecutableTrigger.java | 131 -------------------
 .../apache/beam/sdk/util/WindowingStrategy.java |  10 +-
 .../sdk/transforms/windowing/WindowTest.java    |   6 +-
 .../beam/sdk/util/ExecutableTriggerTest.java    | 109 ---------------
 13 files changed, 21 insertions(+), 265 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/978c99e9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java
----------------------------------------------------------------------


[2/3] incubator-beam git commit: Update Dataflow worker image

Posted by ke...@apache.org.
Update Dataflow worker image


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/97cd3e5c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/97cd3e5c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/97cd3e5c

Branch: refs/heads/master
Commit: 97cd3e5c02bf8c048029b51d7588a110eb5ce62d
Parents: 7e45830
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Oct 31 11:27:36 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Oct 31 19:22:49 2016 -0700

----------------------------------------------------------------------
 .../java/org/apache/beam/runners/dataflow/DataflowRunner.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/97cd3e5c/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 2943ab9..ce126db 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -208,9 +208,9 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   // Default Docker container images that execute Dataflow worker harness, residing in Google
   // Container Registry, separately for Batch and Streaming.
   public static final String BATCH_WORKER_HARNESS_CONTAINER_IMAGE =
-      "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161026";
+      "dataflow.gcr.io/v1beta3/beam-java-batch:beam-master-20161031";
   public static final String STREAMING_WORKER_HARNESS_CONTAINER_IMAGE =
-      "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161026";
+      "dataflow.gcr.io/v1beta3/beam-java-streaming:beam-master-20161031";
 
   // The limit of CreateJob request size.
   private static final int CREATE_JOB_REQUEST_LIMIT_BYTES = 10 * 1024 * 1024;