You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/20 19:51:57 UTC

[2/4] incubator-beam git commit: Delete TriggerRunner

Delete TriggerRunner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/33d9baaf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/33d9baaf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/33d9baaf

Branch: refs/heads/master
Commit: 33d9baaf5778c565632f6fe98344b8f1bd8a1d75
Parents: 3237440
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Oct 19 17:41:39 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu Oct 20 12:41:04 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/core/TriggerRunner.java | 247 -------------------
 1 file changed, 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/33d9baaf/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
deleted file mode 100644
index 8d0f322..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableMap;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.Map;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.util.BitSetCoder;
-import org.apache.beam.sdk.util.ExecutableTrigger;
-import org.apache.beam.sdk.util.FinishedTriggers;
-import org.apache.beam.sdk.util.FinishedTriggersBitSet;
-import org.apache.beam.sdk.util.Timers;
-import org.apache.beam.sdk.util.TriggerContextFactory;
-import org.apache.beam.sdk.util.state.MergingStateAccessor;
-import org.apache.beam.sdk.util.state.StateAccessor;
-import org.apache.beam.sdk.util.state.StateTag;
-import org.apache.beam.sdk.util.state.StateTags;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.joda.time.Instant;
-
-/**
- * Executes a trigger while managing persistence of information about which subtriggers are
- * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger.
- *
- * <p>Specifically, the responsibilities are:
- *
- * <ul>
- *   <li>Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by
- *       constructing the appropriate trigger contexts.</li>
- *   <li>Committing a record of which subtriggers are finished to persistent state.</li>
- *   <li>Restoring the record of which subtriggers are finished from persistent state.</li>
- *   <li>Clearing out the persisted finished set when a caller indicates
- *       (via {#link #clearFinished}) that it is no longer needed.</li>
- * </ul>
- *
- * <p>These responsibilities are intertwined: trigger contexts include mutable information about
- * which subtriggers are finished. This class provides the information when building the contexts
- * and commits the information when the method of the {@link ExecutableTrigger} returns.
- *
- * @param <W> The kind of windows being processed.
- */
-public class TriggerRunner<W extends BoundedWindow> {
-  @VisibleForTesting
-  static final StateTag<Object, ValueState<BitSet>> FINISHED_BITS_TAG =
-      StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of()));
-
-  private final ExecutableTrigger rootTrigger;
-  private final TriggerContextFactory<W> contextFactory;
-
-  public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory<W> contextFactory) {
-    checkState(rootTrigger.getTriggerIndex() == 0);
-    this.rootTrigger = rootTrigger;
-    this.contextFactory = contextFactory;
-  }
-
-  private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
-    if (!isFinishedSetNeeded()) {
-      // If no trigger in the tree will ever have finished bits, then we don't need to read them.
-      // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not
-      // finished) for each trigger in the tree.
-      return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
-    }
-
-    BitSet bitSet = state.read();
-    return bitSet == null
-        ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
-            : FinishedTriggersBitSet.fromBitSet(bitSet);
-  }
-
-
-  private void clearFinishedBits(ValueState<BitSet> state) {
-    if (!isFinishedSetNeeded()) {
-      // Nothing to clear.
-      return;
-    }
-    state.clear();
-  }
-
-  /** Return true if the trigger is closed in the window corresponding to the specified state. */
-  public boolean isClosed(StateAccessor<?> state) {
-    return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchForValue(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchOnElement(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchOnFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchShouldFire(W window, StateAccessor<?> state) {
-    if (isFinishedSetNeeded()) {
-      state.access(FINISHED_BITS_TAG).readLater();
-    }
-    rootTrigger.getSpec().prefetchShouldFire(
-        contextFactory.createStateAccessor(window, rootTrigger));
-  }
-
-  /**
-   * Run the trigger logic to deal with a new value.
-   */
-  public void processValue(W window, Instant timestamp, Timers timers, StateAccessor<?> state)
-      throws Exception {
-    // Clone so that we can detect changes and so that changes here don't pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext(
-        window, timers, timestamp, rootTrigger, finishedSet);
-    rootTrigger.invokeOnElement(triggerContext);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
-      justification = "prefetch side effect")
-  public void prefetchForMerge(
-      W window, Collection<W> mergingWindows, MergingStateAccessor<?, W> state) {
-    if (isFinishedSetNeeded()) {
-      for (ValueState<?> value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) {
-        value.readLater();
-      }
-    }
-    rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor(
-        window, mergingWindows, rootTrigger));
-  }
-
-  /**
-   * Run the trigger merging logic as part of executing the specified merge.
-   */
-  public void onMerge(W window, Timers timers, MergingStateAccessor<?, W> state) throws Exception {
-    // Clone so that we can detect changes and so that changes here don't pollute merging.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-
-    // And read the finished bits in each merging window.
-    ImmutableMap.Builder<W, FinishedTriggers> builder = ImmutableMap.builder();
-    for (Map.Entry<W, ValueState<BitSet>> entry :
-        state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) {
-      // Don't need to clone these, since the trigger context doesn't allow modification
-      builder.put(entry.getKey(), readFinishedBits(entry.getValue()));
-      // Clear the underlying finished bits.
-      clearFinishedBits(entry.getValue());
-    }
-    ImmutableMap<W, FinishedTriggers> mergingFinishedSets = builder.build();
-
-    Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext(
-        window, timers, rootTrigger, finishedSet, mergingFinishedSets);
-
-    // Run the merge from the trigger
-    rootTrigger.invokeOnMerge(mergeContext);
-
-    persistFinishedSet(state, finishedSet);
-  }
-
-  public boolean shouldFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    return rootTrigger.invokeShouldFire(context);
-  }
-
-  public void onFire(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    // shouldFire should be false.
-    // However it is too expensive to assert.
-    FinishedTriggersBitSet finishedSet =
-        readFinishedBits(state.access(FINISHED_BITS_TAG)).copy();
-    Trigger.TriggerContext context = contextFactory.base(window, timers,
-        rootTrigger, finishedSet);
-    rootTrigger.invokeOnFire(context);
-    persistFinishedSet(state, finishedSet);
-  }
-
-  private void persistFinishedSet(
-      StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
-    if (!isFinishedSetNeeded()) {
-      return;
-    }
-
-    ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
-    if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
-      if (modifiedFinishedSet.getBitSet().isEmpty()) {
-        finishedSetState.clear();
-      } else {
-        finishedSetState.write(modifiedFinishedSet.getBitSet());
-      }
-    }
-  }
-
-  /**
-   * Clear the finished bits.
-   */
-  public void clearFinished(StateAccessor<?> state) {
-    clearFinishedBits(state.access(FINISHED_BITS_TAG));
-  }
-
-  /**
-   * Clear the state used for executing triggers, but leave the finished set to indicate
-   * the window is closed.
-   */
-  public void clearState(W window, Timers timers, StateAccessor<?> state) throws Exception {
-    // Don't need to clone, because we'll be clearing the finished bits anyways.
-    FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG));
-    rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet));
-  }
-
-  private boolean isFinishedSetNeeded() {
-    // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the
-    // lookup. Right now, we special case this for the DefaultTrigger.
-    return !(rootTrigger.getSpec() instanceof DefaultTrigger);
-  }
-}