You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/13 00:41:05 UTC
[34/50] [abbrv] incubator-beam git commit: Put classes in
runners-core package into runners.core namespace
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
new file mode 100644
index 0000000..114f5e6
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PaneInfoTracker.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.apache.beam.sdk.util.state.StateTags;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.joda.time.Instant;
+
+/**
+ * Determine the timing and other properties of a new pane for a given computation, key and window.
+ * Incorporates any previous pane, whether the pane has been produced because an
+ * on-time {@link AfterWatermark} trigger firing, and the relation between the element's timestamp
+ * and the current output watermark.
+ */
+public class PaneInfoTracker {
+ private TimerInternals timerInternals;
+
+ public PaneInfoTracker(TimerInternals timerInternals) {
+ this.timerInternals = timerInternals;
+ }
+
+ @VisibleForTesting
+ static final StateTag<Object, ValueState<PaneInfo>> PANE_INFO_TAG =
+ StateTags.makeSystemTagInternal(StateTags.value("pane", PaneInfoCoder.INSTANCE));
+
+ public void clear(StateAccessor<?> state) {
+ state.access(PANE_INFO_TAG).clear();
+ }
+
+ /**
+ * Return a ({@link ReadableState} for) the pane info appropriate for {@code context}. The pane
+ * info includes the timing for the pane, who's calculation is quite subtle.
+ *
+ * @param isFinal should be {@code true} only if the triggering machinery can guarantee
+ * no further firings for the
+ */
+ public ReadableState<PaneInfo> getNextPaneInfo(
+ ReduceFn<?, ?, ?, ?>.Context context, final boolean isFinal) {
+ final Object key = context.key();
+ final ReadableState<PaneInfo> previousPaneFuture =
+ context.state().access(PaneInfoTracker.PANE_INFO_TAG);
+ final Instant windowMaxTimestamp = context.window().maxTimestamp();
+
+ return new ReadableState<PaneInfo>() {
+ @Override
+ @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT",
+ justification = "prefetch side effect")
+ public ReadableState<PaneInfo> readLater() {
+ previousPaneFuture.readLater();
+ return this;
+ }
+
+ @Override
+ public PaneInfo read() {
+ PaneInfo previousPane = previousPaneFuture.read();
+ return describePane(key, windowMaxTimestamp, previousPane, isFinal);
+ }
+ };
+ }
+
+ public void storeCurrentPaneInfo(ReduceFn<?, ?, ?, ?>.Context context, PaneInfo currentPane) {
+ context.state().access(PANE_INFO_TAG).write(currentPane);
+ }
+
+ private <W> PaneInfo describePane(
+ Object key, Instant windowMaxTimestamp, PaneInfo previousPane, boolean isFinal) {
+ boolean isFirst = previousPane == null;
+ Timing previousTiming = isFirst ? null : previousPane.getTiming();
+ long index = isFirst ? 0 : previousPane.getIndex() + 1;
+ long nonSpeculativeIndex = isFirst ? 0 : previousPane.getNonSpeculativeIndex() + 1;
+ Instant outputWM = timerInternals.currentOutputWatermarkTime();
+ Instant inputWM = timerInternals.currentInputWatermarkTime();
+
+ // True if it is not possible to assign the element representing this pane a timestamp
+ // which will make an ON_TIME pane for any following computation.
+ // Ie true if the element's latest possible timestamp is before the current output watermark.
+ boolean isLateForOutput = outputWM != null && windowMaxTimestamp.isBefore(outputWM);
+
+ // True if all emitted panes (if any) were EARLY panes.
+ // Once the ON_TIME pane has fired, all following panes must be considered LATE even
+ // if the output watermark is behind the end of the window.
+ boolean onlyEarlyPanesSoFar = previousTiming == null || previousTiming == Timing.EARLY;
+
+ // True is the input watermark hasn't passed the window's max timestamp.
+ boolean isEarlyForInput = !inputWM.isAfter(windowMaxTimestamp);
+
+ Timing timing;
+ if (isLateForOutput || !onlyEarlyPanesSoFar) {
+ // The output watermark has already passed the end of this window, or we have already
+ // emitted a non-EARLY pane. Irrespective of how this pane was triggered we must
+ // consider this pane LATE.
+ timing = Timing.LATE;
+ } else if (isEarlyForInput) {
+ // This is an EARLY firing.
+ timing = Timing.EARLY;
+ nonSpeculativeIndex = -1;
+ } else {
+ // This is the unique ON_TIME firing for the window.
+ timing = Timing.ON_TIME;
+ }
+
+ WindowTracing.debug(
+ "describePane: {} pane (prev was {}) for key:{}; windowMaxTimestamp:{}; "
+ + "inputWatermark:{}; outputWatermark:{}; isLateForOutput:{}",
+ timing, previousTiming, key, windowMaxTimestamp, inputWM, outputWM, isLateForOutput);
+
+ if (previousPane != null) {
+ // Timing transitions should follow EARLY* ON_TIME? LATE*
+ switch (previousTiming) {
+ case EARLY:
+ checkState(
+ timing == Timing.EARLY || timing == Timing.ON_TIME || timing == Timing.LATE,
+ "EARLY cannot transition to %s", timing);
+ break;
+ case ON_TIME:
+ checkState(
+ timing == Timing.LATE, "ON_TIME cannot transition to %s", timing);
+ break;
+ case LATE:
+ checkState(timing == Timing.LATE, "LATE cannot transtion to %s", timing);
+ break;
+ case UNKNOWN:
+ break;
+ }
+ checkState(!previousPane.isLast(), "Last pane was not last after all.");
+ }
+
+ return PaneInfo.createPane(isFirst, isFinal, timing, index, nonSpeculativeIndex);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java
new file mode 100644
index 0000000..fcdff3b
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PeekingReiterator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.util.common.Reiterator;
+
+/**
+ * A {@link Reiterator} that supports one-element lookahead during iteration.
+ *
+ * @param <T> the type of elements returned by this iterator
+ */
+public final class PeekingReiterator<T> implements Reiterator<T> {
+ private T nextElement;
+ private boolean nextElementComputed;
+ private final Reiterator<T> iterator;
+
+ public PeekingReiterator(Reiterator<T> iterator) {
+ this.iterator = checkNotNull(iterator);
+ }
+
+ PeekingReiterator(PeekingReiterator<T> it) {
+ this.iterator = checkNotNull(checkNotNull(it).iterator.copy());
+ this.nextElement = it.nextElement;
+ this.nextElementComputed = it.nextElementComputed;
+ }
+
+ @Override
+ public boolean hasNext() {
+ computeNext();
+ return nextElementComputed;
+ }
+
+ @Override
+ public T next() {
+ T result = peek();
+ nextElementComputed = false;
+ return result;
+ }
+
+ /**
+ * {@inheritDoc}
+ *
+ * <p>If {@link #peek} is called, {@code remove} is disallowed until
+ * {@link #next} has been subsequently called.
+ */
+ @Override
+ public void remove() {
+ checkState(!nextElementComputed,
+ "After peek(), remove() is disallowed until next() is called");
+ iterator.remove();
+ }
+
+ @Override
+ public PeekingReiterator<T> copy() {
+ return new PeekingReiterator<>(this);
+ }
+
+ /**
+ * Returns the element that would be returned by {@link #next}, without
+ * actually consuming the element.
+ * @throws NoSuchElementException if there is no next element
+ */
+ public T peek() {
+ computeNext();
+ if (!nextElementComputed) {
+ throw new NoSuchElementException();
+ }
+ return nextElement;
+ }
+
+ private void computeNext() {
+ if (nextElementComputed) {
+ return;
+ }
+ if (!iterator.hasNext()) {
+ return;
+ }
+ nextElement = iterator.next();
+ nextElementComputed = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
new file mode 100644
index 0000000..deeac3c
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollectionView;
+
+/**
+ * A {@link DoFnRunner} that can refuse to process elements that are not ready, instead returning
+ * them via the {@link #processElementInReadyWindows(WindowedValue)}.
+ */
+public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT> {
+ private final DoFnRunner<InputT, OutputT> underlying;
+ private final Collection<PCollectionView<?>> views;
+ private final ReadyCheckingSideInputReader sideInputReader;
+
+ private Set<BoundedWindow> notReadyWindows;
+
+ public static <InputT, OutputT> PushbackSideInputDoFnRunner<InputT, OutputT> create(
+ DoFnRunner<InputT, OutputT> underlying,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader) {
+ return new PushbackSideInputDoFnRunner<>(underlying, views, sideInputReader);
+ }
+
+ private PushbackSideInputDoFnRunner(
+ DoFnRunner<InputT, OutputT> underlying,
+ Collection<PCollectionView<?>> views,
+ ReadyCheckingSideInputReader sideInputReader) {
+ this.underlying = underlying;
+ this.views = views;
+ this.sideInputReader = sideInputReader;
+ }
+
+ @Override
+ public void startBundle() {
+ notReadyWindows = new HashSet<>();
+ underlying.startBundle();
+ }
+
+ /**
+ * Call the underlying {@link DoFnRunner#processElement(WindowedValue)} for the provided element
+ * for each window the element is in that is ready.
+ *
+ * @param elem the element to process in all ready windows
+ * @return each element that could not be processed because it requires a side input window
+ * that is not ready.
+ */
+ public Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) {
+ if (views.isEmpty()) {
+ processElement(elem);
+ return Collections.emptyList();
+ }
+ ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder();
+ for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
+ BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows());
+ boolean isReady = !notReadyWindows.contains(mainInputWindow);
+ for (PCollectionView<?> view : views) {
+ BoundedWindow sideInputWindow =
+ view.getWindowingStrategyInternal()
+ .getWindowFn()
+ .getSideInputWindow(mainInputWindow);
+ if (!sideInputReader.isReady(view, sideInputWindow)) {
+ isReady = false;
+ break;
+ }
+ }
+ if (isReady) {
+ processElement(windowElem);
+ } else {
+ notReadyWindows.add(mainInputWindow);
+ pushedBack.add(windowElem);
+ }
+ }
+ return pushedBack.build();
+ }
+
+ @Override
+ public void processElement(WindowedValue<InputT> elem) {
+ underlying.processElement(elem);
+ }
+
+ /**
+ * Call the underlying {@link DoFnRunner#finishBundle()}.
+ */
+ @Override
+ public void finishBundle() {
+ notReadyWindows = null;
+ underlying.finishBundle();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java
new file mode 100644
index 0000000..bb20226
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFn.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.io.Serializable;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.Timers;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.joda.time.Instant;
+
+/**
+ * Specification for processing to happen after elements have been grouped by key.
+ *
+ * @param <K> The type of key being processed.
+ * @param <InputT> The type of input values associated with the key.
+ * @param <OutputT> The output type that will be produced for each key.
+ * @param <W> The type of windows this operates on.
+ */
+public abstract class ReduceFn<K, InputT, OutputT, W extends BoundedWindow>
+ implements Serializable {
+
+ /** Information accessible to all the processing methods in this {@code ReduceFn}. */
+ public abstract class Context {
+ /** Return the key that is being processed. */
+ public abstract K key();
+
+ /** The window that is being processed. */
+ public abstract W window();
+
+ /** Access the current {@link WindowingStrategy}. */
+ public abstract WindowingStrategy<?, W> windowingStrategy();
+
+ /** Return the interface for accessing state. */
+ public abstract StateAccessor<K> state();
+
+ /** Return the interface for accessing timers. */
+ public abstract Timers timers();
+ }
+
+ /** Information accessible within {@link #processValue}. */
+ public abstract class ProcessValueContext extends Context {
+ /** Return the actual value being processed. */
+ public abstract InputT value();
+
+ /** Return the timestamp associated with the value. */
+ public abstract Instant timestamp();
+ }
+
+ /** Information accessible within {@link #onMerge}. */
+ public abstract class OnMergeContext extends Context {
+ /** Return the interface for accessing state. */
+ @Override
+ public abstract MergingStateAccessor<K, W> state();
+ }
+
+ /** Information accessible within {@link #onTrigger}. */
+ public abstract class OnTriggerContext extends Context {
+ /** Returns the {@link PaneInfo} for the trigger firing being processed. */
+ public abstract PaneInfo paneInfo();
+
+ /** Output the given value in the current window. */
+ public abstract void output(OutputT value);
+ }
+
+ //////////////////////////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * Called for each value of type {@code InputT} associated with the current key.
+ */
+ public abstract void processValue(ProcessValueContext c) throws Exception;
+
+ /**
+ * Called when windows are merged.
+ */
+ public abstract void onMerge(OnMergeContext context) throws Exception;
+
+ /**
+ * Called when triggers fire.
+ *
+ * <p>Implementations of {@link ReduceFn} should call {@link OnTriggerContext#output} to emit
+ * any results that should be included in the pane produced by this trigger firing.
+ */
+ public abstract void onTrigger(OnTriggerContext context) throws Exception;
+
+ /**
+ * Called before {@link #onMerge} is invoked to provide an opportunity to prefetch any needed
+ * state.
+ *
+ * @param c Context to use prefetch from.
+ */
+ public void prefetchOnMerge(MergingStateAccessor<K, W> c) throws Exception {}
+
+ /**
+ * Called before {@link #onTrigger} is invoked to provide an opportunity to prefetch any needed
+ * state.
+ *
+ * @param context Context to use prefetch from.
+ */
+ public void prefetchOnTrigger(StateAccessor<K> context) {}
+
+ /**
+ * Called to clear any persisted state that the {@link ReduceFn} may be holding. This will be
+ * called when the windowing is closing and will receive no future interactions.
+ */
+ public abstract void clearState(Context context) throws Exception;
+
+ /**
+ * Returns true if the there is no buffered state.
+ */
+ public abstract ReadableState<Boolean> isEmpty(StateAccessor<K> context);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
new file mode 100644
index 0000000..2043f14
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.ImmutableMap;
+import java.util.Collection;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.ActiveWindowSet;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.Timers;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.MergingStateAccessor;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateAccessor;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
+import org.apache.beam.sdk.util.state.StateNamespaces;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
+import org.apache.beam.sdk.util.state.StateTag;
+import org.joda.time.Instant;
+
+/**
+ * Factory for creating instances of the various {@link ReduceFn} contexts.
+ */
+class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> {
+ public interface OnTriggerCallbacks<OutputT> {
+ void output(OutputT toOutput);
+ }
+
+ private final K key;
+ private final ReduceFn<K, InputT, OutputT, W> reduceFn;
+ private final WindowingStrategy<?, W> windowingStrategy;
+ private final StateInternals<K> stateInternals;
+ private final ActiveWindowSet<W> activeWindows;
+ private final TimerInternals timerInternals;
+ private final WindowingInternals<?, ?> windowingInternals;
+ private final PipelineOptions options;
+
+ ReduceFnContextFactory(K key, ReduceFn<K, InputT, OutputT, W> reduceFn,
+ WindowingStrategy<?, W> windowingStrategy, StateInternals<K> stateInternals,
+ ActiveWindowSet<W> activeWindows, TimerInternals timerInternals,
+ WindowingInternals<?, ?> windowingInternals, PipelineOptions options) {
+ this.key = key;
+ this.reduceFn = reduceFn;
+ this.windowingStrategy = windowingStrategy;
+ this.stateInternals = stateInternals;
+ this.activeWindows = activeWindows;
+ this.timerInternals = timerInternals;
+ this.windowingInternals = windowingInternals;
+ this.options = options;
+ }
+
+ /** Where should we look for state associated with a given window? */
+ public static enum StateStyle {
+ /** All state is associated with the window itself. */
+ DIRECT,
+ /** State is associated with the 'state address' windows tracked by the active window set. */
+ RENAMED
+ }
+
+ private StateAccessorImpl<K, W> stateAccessor(W window, StateStyle style) {
+ return new StateAccessorImpl<K, W>(
+ activeWindows, windowingStrategy.getWindowFn().windowCoder(),
+ stateInternals, StateContexts.createFromComponents(options, windowingInternals, window),
+ style);
+ }
+
+ public ReduceFn<K, InputT, OutputT, W>.Context base(W window, StateStyle style) {
+ return new ContextImpl(stateAccessor(window, style));
+ }
+
+ public ReduceFn<K, InputT, OutputT, W>.ProcessValueContext forValue(
+ W window, InputT value, Instant timestamp, StateStyle style) {
+ return new ProcessValueContextImpl(stateAccessor(window, style), value, timestamp);
+ }
+
+ public ReduceFn<K, InputT, OutputT, W>.OnTriggerContext forTrigger(W window,
+ ReadableState<PaneInfo> pane, StateStyle style, OnTriggerCallbacks<OutputT> callbacks) {
+ return new OnTriggerContextImpl(stateAccessor(window, style), pane, callbacks);
+ }
+
+ public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forMerge(
+ Collection<W> activeToBeMerged, W mergeResult, StateStyle style) {
+ return new OnMergeContextImpl(
+ new MergingStateAccessorImpl<K, W>(activeWindows,
+ windowingStrategy.getWindowFn().windowCoder(),
+ stateInternals, style, activeToBeMerged, mergeResult));
+ }
+
+ public ReduceFn<K, InputT, OutputT, W>.OnMergeContext forPremerge(W window) {
+ return new OnPremergeContextImpl(new PremergingStateAccessorImpl<K, W>(
+ activeWindows, windowingStrategy.getWindowFn().windowCoder(), stateInternals, window));
+ }
+
+ private class TimersImpl implements Timers {
+ private final StateNamespace namespace;
+
+ public TimersImpl(StateNamespace namespace) {
+ checkArgument(namespace instanceof WindowNamespace);
+ this.namespace = namespace;
+ }
+
+ @Override
+ public void setTimer(Instant timestamp, TimeDomain timeDomain) {
+ timerInternals.setTimer(TimerData.of(namespace, timestamp, timeDomain));
+ }
+
+ @Override
+ public void deleteTimer(Instant timestamp, TimeDomain timeDomain) {
+ timerInternals.deleteTimer(TimerData.of(namespace, timestamp, timeDomain));
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return timerInternals.currentProcessingTime();
+ }
+
+ @Override
+ @Nullable
+ public Instant currentSynchronizedProcessingTime() {
+ return timerInternals.currentSynchronizedProcessingTime();
+ }
+
+ @Override
+ public Instant currentEventTime() {
+ return timerInternals.currentInputWatermarkTime();
+ }
+ }
+
+ // ======================================================================
+ // StateAccessors
+ // ======================================================================
+ static class StateAccessorImpl<K, W extends BoundedWindow> implements StateAccessor<K> {
+
+
+ protected final ActiveWindowSet<W> activeWindows;
+ protected final StateContext<W> context;
+ protected final StateNamespace windowNamespace;
+ protected final Coder<W> windowCoder;
+ protected final StateInternals<K> stateInternals;
+ protected final StateStyle style;
+
+ public StateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
+ StateInternals<K> stateInternals, StateContext<W> context, StateStyle style) {
+
+ this.activeWindows = activeWindows;
+ this.windowCoder = windowCoder;
+ this.stateInternals = stateInternals;
+ this.context = checkNotNull(context);
+ this.windowNamespace = namespaceFor(context.window());
+ this.style = style;
+ }
+
+ protected StateNamespace namespaceFor(W window) {
+ return StateNamespaces.window(windowCoder, window);
+ }
+
+ protected StateNamespace windowNamespace() {
+ return windowNamespace;
+ }
+
+ W window() {
+ return context.window();
+ }
+
+ StateNamespace namespace() {
+ return windowNamespace();
+ }
+
+ @Override
+ public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
+ switch (style) {
+ case DIRECT:
+ return stateInternals.state(windowNamespace(), address, context);
+ case RENAMED:
+ return stateInternals.state(
+ namespaceFor(activeWindows.writeStateAddress(context.window())), address, context);
+ }
+ throw new RuntimeException(); // cases are exhaustive.
+ }
+ }
+
+ static class MergingStateAccessorImpl<K, W extends BoundedWindow>
+ extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
+ private final Collection<W> activeToBeMerged;
+
+ public MergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
+ StateInternals<K> stateInternals, StateStyle style, Collection<W> activeToBeMerged,
+ W mergeResult) {
+ super(activeWindows, windowCoder, stateInternals,
+ StateContexts.windowOnly(mergeResult), style);
+ this.activeToBeMerged = activeToBeMerged;
+ }
+
+ @Override
+ public <StateT extends State> StateT access(StateTag<? super K, StateT> address) {
+ switch (style) {
+ case DIRECT:
+ return stateInternals.state(windowNamespace(), address, context);
+ case RENAMED:
+ return stateInternals.state(
+ namespaceFor(activeWindows.mergedWriteStateAddress(
+ activeToBeMerged, context.window())),
+ address,
+ context);
+ }
+ throw new RuntimeException(); // cases are exhaustive.
+ }
+
+ @Override
+ public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
+ StateTag<? super K, StateT> address) {
+ ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
+ for (W mergingWindow : activeToBeMerged) {
+ StateNamespace namespace = null;
+ switch (style) {
+ case DIRECT:
+ namespace = namespaceFor(mergingWindow);
+ break;
+ case RENAMED:
+ namespace = namespaceFor(activeWindows.writeStateAddress(mergingWindow));
+ break;
+ }
+ checkNotNull(namespace); // cases are exhaustive.
+ builder.put(mergingWindow, stateInternals.state(namespace, address, context));
+ }
+ return builder.build();
+ }
+ }
+
+ static class PremergingStateAccessorImpl<K, W extends BoundedWindow>
+ extends StateAccessorImpl<K, W> implements MergingStateAccessor<K, W> {
+ public PremergingStateAccessorImpl(ActiveWindowSet<W> activeWindows, Coder<W> windowCoder,
+ StateInternals<K> stateInternals, W window) {
+ super(activeWindows, windowCoder, stateInternals,
+ StateContexts.windowOnly(window), StateStyle.RENAMED);
+ }
+
+ Collection<W> mergingWindows() {
+ return activeWindows.readStateAddresses(context.window());
+ }
+
+ @Override
+ public <StateT extends State> Map<W, StateT> accessInEachMergingWindow(
+ StateTag<? super K, StateT> address) {
+ ImmutableMap.Builder<W, StateT> builder = ImmutableMap.builder();
+ for (W stateAddressWindow : activeWindows.readStateAddresses(context.window())) {
+ StateT stateForWindow =
+ stateInternals.state(namespaceFor(stateAddressWindow), address, context);
+ builder.put(stateAddressWindow, stateForWindow);
+ }
+ return builder.build();
+ }
+ }
+
+ // ======================================================================
+ // Contexts
+ // ======================================================================
+
+ private class ContextImpl extends ReduceFn<K, InputT, OutputT, W>.Context {
+ private final StateAccessorImpl<K, W> state;
+ private final TimersImpl timers;
+
+ private ContextImpl(StateAccessorImpl<K, W> state) {
+ reduceFn.super();
+ this.state = state;
+ this.timers = new TimersImpl(state.namespace());
+ }
+
+ @Override
+ public K key() {
+ return key;
+ }
+
+ @Override
+ public W window() {
+ return state.window();
+ }
+
+ @Override
+ public WindowingStrategy<?, W> windowingStrategy() {
+ return windowingStrategy;
+ }
+
+ @Override
+ public StateAccessor<K> state() {
+ return state;
+ }
+
+ @Override
+ public Timers timers() {
+ return timers;
+ }
+ }
+
+ private class ProcessValueContextImpl
+ extends ReduceFn<K, InputT, OutputT, W>.ProcessValueContext {
+ private final InputT value;
+ private final Instant timestamp;
+ private final StateAccessorImpl<K, W> state;
+ private final TimersImpl timers;
+
+ private ProcessValueContextImpl(StateAccessorImpl<K, W> state,
+ InputT value, Instant timestamp) {
+ reduceFn.super();
+ this.state = state;
+ this.value = value;
+ this.timestamp = timestamp;
+ this.timers = new TimersImpl(state.namespace());
+ }
+
+ @Override
+ public K key() {
+ return key;
+ }
+
+ @Override
+ public W window() {
+ return state.window();
+ }
+
+ @Override
+ public WindowingStrategy<?, W> windowingStrategy() {
+ return windowingStrategy;
+ }
+
+ @Override
+ public StateAccessor<K> state() {
+ return state;
+ }
+
+ @Override
+ public InputT value() {
+ return value;
+ }
+
+ @Override
+ public Instant timestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public Timers timers() {
+ return timers;
+ }
+ }
+
+ private class OnTriggerContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnTriggerContext {
+ private final StateAccessorImpl<K, W> state;
+ private final ReadableState<PaneInfo> pane;
+ private final OnTriggerCallbacks<OutputT> callbacks;
+ private final TimersImpl timers;
+
+ private OnTriggerContextImpl(StateAccessorImpl<K, W> state, ReadableState<PaneInfo> pane,
+ OnTriggerCallbacks<OutputT> callbacks) {
+ reduceFn.super();
+ this.state = state;
+ this.pane = pane;
+ this.callbacks = callbacks;
+ this.timers = new TimersImpl(state.namespace());
+ }
+
+ @Override
+ public K key() {
+ return key;
+ }
+
+ @Override
+ public W window() {
+ return state.window();
+ }
+
+ @Override
+ public WindowingStrategy<?, W> windowingStrategy() {
+ return windowingStrategy;
+ }
+
+ @Override
+ public StateAccessor<K> state() {
+ return state;
+ }
+
+ @Override
+ public PaneInfo paneInfo() {
+ return pane.read();
+ }
+
+ @Override
+ public void output(OutputT value) {
+ callbacks.output(value);
+ }
+
+ @Override
+ public Timers timers() {
+ return timers;
+ }
+ }
+
+ private class OnMergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
+ private final MergingStateAccessorImpl<K, W> state;
+ private final TimersImpl timers;
+
+ private OnMergeContextImpl(MergingStateAccessorImpl<K, W> state) {
+ reduceFn.super();
+ this.state = state;
+ this.timers = new TimersImpl(state.namespace());
+ }
+
+ @Override
+ public K key() {
+ return key;
+ }
+
+ @Override
+ public WindowingStrategy<?, W> windowingStrategy() {
+ return windowingStrategy;
+ }
+
+ @Override
+ public MergingStateAccessor<K, W> state() {
+ return state;
+ }
+
+ @Override
+ public W window() {
+ return state.window();
+ }
+
+ @Override
+ public Timers timers() {
+ return timers;
+ }
+ }
+
+ private class OnPremergeContextImpl extends ReduceFn<K, InputT, OutputT, W>.OnMergeContext {
+ private final PremergingStateAccessorImpl<K, W> state;
+ private final TimersImpl timers;
+
+ private OnPremergeContextImpl(PremergingStateAccessorImpl<K, W> state) {
+ reduceFn.super();
+ this.state = state;
+ this.timers = new TimersImpl(state.namespace());
+ }
+
+ @Override
+ public K key() {
+ return key;
+ }
+
+ @Override
+ public WindowingStrategy<?, W> windowingStrategy() {
+ return windowingStrategy;
+ }
+
+ @Override
+ public MergingStateAccessor<K, W> state() {
+ return state;
+ }
+
+ @Override
+ public W window() {
+ return state.window();
+ }
+
+ @Override
+ public Timers timers() {
+ return timers;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
new file mode 100644
index 0000000..96d764a
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -0,0 +1,993 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.core.ReduceFnContextFactory.OnTriggerCallbacks;
+import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.ActiveWindowSet;
+import org.apache.beam.sdk.util.MergingActiveWindowSet;
+import org.apache.beam.sdk.util.NonMergingActiveWindowSet;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.TriggerContextFactory;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/**
+ * Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} has partitioned the
+ * {@link PCollection} by key.
+ *
+ * <p>The {@link #onTrigger} relies on a {@link TriggerRunner} to manage the execution of
+ * the triggering logic. The {@code ReduceFnRunner}s responsibilities are:
+ *
+ * <ul>
+ * <li>Tracking the windows that are active (have buffered data) as elements arrive and
+ * triggers are fired.
+ * <li>Holding the watermark based on the timestamps of elements in a pane and releasing it
+ * when the trigger fires.
+ * <li>Calling the appropriate callbacks on {@link ReduceFn} based on trigger execution, timer
+ * firings, etc, and providing appropriate contexts to the {@link ReduceFn} for actions
+ * such as output.
+ * <li>Scheduling garbage collection of state associated with a specific window, and making that
+ * happen when the appropriate timer fires.
+ * </ul>
+ *
+ * @param <K> The type of key being processed.
+ * @param <InputT> The type of values associated with the key.
+ * @param <OutputT> The output type that will be produced for each key.
+ * @param <W> The type of windows this operates on.
+ */
+public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
+
+ /**
+ * The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}.
+ *
+ * <ul>
+ * <li>It runs the trigger from the {@link WindowingStrategy}.</li>
+ * <li>It merges windows according to the {@link WindowingStrategy}.</li>
+ * <li>It chooses how to track active windows and clear out expired windows
+ * according to the {@link WindowingStrategy}, based on the allowed lateness and
+ * whether windows can merge.</li>
+ * <li>It decides whether to emit empty final panes according to whether the
+ * {@link WindowingStrategy} requires it.<li>
+ * <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.</li>
+ * </ul>
+ */
+ private final WindowingStrategy<Object, W> windowingStrategy;
+
+ private final OutputWindowedValue<KV<K, OutputT>> outputter;
+
+ private final StateInternals<K> stateInternals;
+
+ private final Aggregator<Long, Long> droppedDueToClosedWindow;
+
+ private final K key;
+
+ /**
+ * Track which windows are still active and the 'state address' windows which hold their state.
+ *
+ * <ul>
+ * <li>State: Global map for all active windows for this computation and key.
+ * <li>Lifetime: Cleared when no active windows need to be tracked. A window lives within
+ * the active window set until its trigger is closed or the window is garbage collected.
+ * </ul>
+ */
+ private final ActiveWindowSet<W> activeWindows;
+
+ /**
+ * Always a {@link SystemReduceFn}.
+ *
+ * <ul>
+ * <li>State: A bag of accumulated values, or the intermediate result of a combiner.
+ * <li>State style: RENAMED
+ * <li>Merging: Concatenate or otherwise combine the state from each merged window.
+ * <li>Lifetime: Cleared when a pane fires if DISCARDING_FIRED_PANES. Otherwise cleared
+ * when trigger is finished or when the window is garbage collected.
+ * </ul>
+ */
+ private final ReduceFn<K, InputT, OutputT, W> reduceFn;
+
+ /**
+ * Manage the setting and firing of timer events.
+ *
+ * <ul>
+ * <li>Merging: End-of-window and garbage collection timers are cancelled when windows are
+ * merged away. Timers created by triggers are never garbage collected and are left to
+ * fire and be ignored.
+ * <li>Lifetime: Timers automatically disappear after they fire.
+ * </ul>
+ */
+ private final TimerInternals timerInternals;
+
+ /**
+ * Manage the execution and state for triggers.
+ *
+ * <ul>
+ * <li>State: Tracks which sub-triggers have finished, and any additional state needed to
+ * determine when the trigger should fire.
+ * <li>State style: DIRECT
+ * <li>Merging: Finished bits are explicitly managed. Other state is eagerly merged as
+ * needed.
+ * <li>Lifetime: Most trigger state is cleared when the final pane is emitted. However
+ * the finished bits are left behind and must be cleared when the window is
+ * garbage collected.
+ * </ul>
+ */
+ private final TriggerRunner<W> triggerRunner;
+
+ /**
+ * Store the output watermark holds for each window.
+ *
+ * <ul>
+ * <li>State: Bag of hold timestamps.
+ * <li>State style: RENAMED
+ * <li>Merging: Depending on {@link OutputTimeFn}, may need to be recalculated on merging.
+ * When a pane fires it may be necessary to add (back) an end-of-window or garbage collection
+ * hold.
+ * <li>Lifetime: Cleared when a pane fires or when the window is garbage collected.
+ * </ul>
+ */
+ private final WatermarkHold<W> watermarkHold;
+
+ private final ReduceFnContextFactory<K, InputT, OutputT, W> contextFactory;
+
+ /**
+ * Store the previously emitted pane (if any) for each window.
+ *
+ * <ul>
+ * <li>State: The previous {@link PaneInfo} passed to the user's {@code DoFn.ProcessElement}
+ * method, if any.
+ * <li>Style style: DIRECT
+ * <li>Merging: Always keyed by actual window, so does not depend on {@link #activeWindows}.
+ * Cleared when window is merged away.
+ * <li>Lifetime: Cleared when trigger is closed or window is garbage collected.
+ * </ul>
+ */
+ private final PaneInfoTracker paneInfoTracker;
+
+ /**
+ * Store whether we've seen any elements for a window since the last pane was emitted.
+ *
+ * <ul>
+ * <li>State: Unless DISCARDING_FIRED_PANES, a count of number of elements added so far.
+ * <li>State style: RENAMED.
+ * <li>Merging: Counts are summed when windows are merged.
+ * <li>Lifetime: Cleared when pane fires or window is garbage collected.
+ * </ul>
+ */
+ private final NonEmptyPanes<K, W> nonEmptyPanes;
+
+ public ReduceFnRunner(
+ K key,
+ WindowingStrategy<?, W> windowingStrategy,
+ StateInternals<K> stateInternals,
+ TimerInternals timerInternals,
+ WindowingInternals<?, KV<K, OutputT>> windowingInternals,
+ Aggregator<Long, Long> droppedDueToClosedWindow,
+ ReduceFn<K, InputT, OutputT, W> reduceFn,
+ PipelineOptions options) {
+ this.key = key;
+ this.timerInternals = timerInternals;
+ this.paneInfoTracker = new PaneInfoTracker(timerInternals);
+ this.stateInternals = stateInternals;
+ this.outputter = new OutputViaWindowingInternals<>(windowingInternals);
+ this.droppedDueToClosedWindow = droppedDueToClosedWindow;
+ this.reduceFn = reduceFn;
+
+ @SuppressWarnings("unchecked")
+ WindowingStrategy<Object, W> objectWindowingStrategy =
+ (WindowingStrategy<Object, W>) windowingStrategy;
+ this.windowingStrategy = objectWindowingStrategy;
+
+ this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);
+
+ // Note this may incur I/O to load persisted window set data.
+ this.activeWindows = createActiveWindowSet();
+
+ this.contextFactory =
+ new ReduceFnContextFactory<K, InputT, OutputT, W>(key, reduceFn, this.windowingStrategy,
+ stateInternals, this.activeWindows, timerInternals, windowingInternals, options);
+
+ this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy);
+ this.triggerRunner =
+ new TriggerRunner<>(
+ windowingStrategy.getTrigger(),
+ new TriggerContextFactory<>(
+ windowingStrategy.getWindowFn(), stateInternals, activeWindows));
+ }
+
+ private ActiveWindowSet<W> createActiveWindowSet() {
+ return windowingStrategy.getWindowFn().isNonMerging()
+ ? new NonMergingActiveWindowSet<W>()
+ : new MergingActiveWindowSet<W>(windowingStrategy.getWindowFn(), stateInternals);
+ }
+
+ @VisibleForTesting
+ boolean isFinished(W window) {
+ return triggerRunner.isClosed(contextFactory.base(window, StateStyle.DIRECT).state());
+ }
+
+ @VisibleForTesting
+ boolean hasNoActiveWindows() {
+ return activeWindows.getActiveAndNewWindows().isEmpty();
+ }
+
+ /**
+ * Incorporate {@code values} into the underlying reduce function, and manage holds, timers,
+ * triggers, and window merging.
+ *
+ * <p>The general strategy is:
+ * <ol>
+ * <li>Use {@link WindowedValue#getWindows} (itself determined using
+ * {@link WindowFn#assignWindows}) to determine which windows each element belongs to. Some
+ * of those windows will already have state associated with them. The rest are considered
+ * NEW.
+ * <li>Use {@link WindowFn#mergeWindows} to attempt to merge currently ACTIVE and NEW windows.
+ * Each NEW window will become either ACTIVE or be discardedL.
+ * (See {@link ActiveWindowSet} for definitions of these terms.)
+ * <li>If at all possible, eagerly substitute NEW windows with their ACTIVE state address
+ * windows before any state is associated with the NEW window. In the common case that
+ * windows for new elements are merged into existing ACTIVE windows then no additional
+ * storage or merging overhead will be incurred.
+ * <li>Otherwise, keep track of the state address windows for ACTIVE windows so that their
+ * states can be merged on-demand when a pane fires.
+ * <li>Process the element for each of the windows it's windows have been merged into according
+ * to {@link ActiveWindowSet}. Processing may require running triggers, setting timers,
+ * setting holds, and invoking {@link ReduceFn#onTrigger}.
+ * </ol>
+ */
+ public void processElements(Iterable<WindowedValue<InputT>> values) throws Exception {
+ // If an incoming element introduces a new window, attempt to merge it into an existing
+ // window eagerly.
+ Map<W, W> windowToMergeResult = collectAndMergeWindows(values);
+
+ Set<W> windowsToConsider = new HashSet<>();
+
+ // Process each element, using the updated activeWindows determined by collectAndMergeWindows.
+ for (WindowedValue<InputT> value : values) {
+ windowsToConsider.addAll(processElement(windowToMergeResult, value));
+ }
+
+ // Trigger output from any window for which the trigger is ready
+ for (W mergedWindow : windowsToConsider) {
+ ReduceFn<K, InputT, OutputT, W>.Context directContext =
+ contextFactory.base(mergedWindow, StateStyle.DIRECT);
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+ contextFactory.base(mergedWindow, StateStyle.RENAMED);
+ triggerRunner.prefetchShouldFire(mergedWindow, directContext.state());
+ emitIfAppropriate(directContext, renamedContext);
+ }
+
+ // We're all done with merging and emitting elements so can compress the activeWindow state.
+ // Any windows which are still NEW must have come in on a new element which was then discarded
+ // due to the window's trigger being closed. We can thus delete them.
+ activeWindows.cleanupTemporaryWindows();
+ }
+
+ public void persist() {
+ activeWindows.persist();
+ }
+
+ /**
+ * Extract the windows associated with the values, and invoke merge. Return a map
+ * from windows to the merge result window. If a window is not in the domain of
+ * the result map then it did not get merged into a different window.
+ */
+ private Map<W, W> collectAndMergeWindows(Iterable<WindowedValue<InputT>> values)
+ throws Exception {
+ // No-op if no merging can take place
+ if (windowingStrategy.getWindowFn().isNonMerging()) {
+ return ImmutableMap.of();
+ }
+
+ // Collect the windows from all elements (except those which are too late) and
+ // make sure they are already in the active window set or are added as NEW windows.
+ for (WindowedValue<?> value : values) {
+ for (BoundedWindow untypedWindow : value.getWindows()) {
+ @SuppressWarnings("unchecked")
+ W window = (W) untypedWindow;
+
+ // For backwards compat with pre 1.4 only.
+ // We may still have ACTIVE windows with multiple state addresses, representing
+ // a window who's state has not yet been eagerly merged.
+ // We'll go ahead and merge that state now so that we don't have to worry about
+ // this legacy case anywhere else.
+ if (activeWindows.isActive(window)) {
+ Set<W> stateAddressWindows = activeWindows.readStateAddresses(window);
+ if (stateAddressWindows.size() > 1) {
+ // This is a legacy window who's state has not been eagerly merged.
+ // Do that now.
+ ReduceFn<K, InputT, OutputT, W>.OnMergeContext premergeContext =
+ contextFactory.forPremerge(window);
+ reduceFn.onMerge(premergeContext);
+ watermarkHold.onMerge(premergeContext);
+ activeWindows.merged(window);
+ }
+ }
+
+ // Add this window as NEW if it is not currently ACTIVE.
+ // If we had already seen this window and closed its trigger, then the
+ // window will not be currently ACTIVE. It will then be added as NEW here,
+ // and fall into the merging logic as usual.
+ activeWindows.ensureWindowExists(window);
+ }
+ }
+
+ // Merge all of the active windows and retain a mapping from source windows to result windows.
+ Map<W, W> windowToMergeResult = new HashMap<>();
+ activeWindows.merge(new OnMergeCallback(windowToMergeResult));
+ return windowToMergeResult;
+ }
+
+ private class OnMergeCallback implements ActiveWindowSet.MergeCallback<W> {
+ private final Map<W, W> windowToMergeResult;
+
+ OnMergeCallback(Map<W, W> windowToMergeResult) {
+ this.windowToMergeResult = windowToMergeResult;
+ }
+
+ /**
+ * Return the subset of {@code windows} which are currently ACTIVE. We only need to worry
+ * about merging state from ACTIVE windows. NEW windows by definition have no existing state.
+ */
+ private List<W> activeWindows(Iterable<W> windows) {
+ List<W> active = new ArrayList<>();
+ for (W window : windows) {
+ if (activeWindows.isActive(window)) {
+ active.add(window);
+ }
+ }
+ return active;
+ }
+
+ /**
+ * Called from the active window set to indicate {@code toBeMerged} (of which only
+ * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) will later
+ * be merged into {@code mergeResult}.
+ */
+ @Override
+ public void prefetchOnMerge(
+ Collection<W> toBeMerged, W mergeResult) throws Exception {
+ List<W> activeToBeMerged = activeWindows(toBeMerged);
+ ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
+ contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
+ ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
+ contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
+
+ // Prefetch various state.
+ triggerRunner.prefetchForMerge(mergeResult, activeToBeMerged, directMergeContext.state());
+ reduceFn.prefetchOnMerge(renamedMergeContext.state());
+ watermarkHold.prefetchOnMerge(renamedMergeContext.state());
+ nonEmptyPanes.prefetchOnMerge(renamedMergeContext.state());
+ }
+
+ /**
+ * Called from the active window set to indicate {@code toBeMerged} (of which only
+ * {@code activeToBeMerged} are ACTIVE and thus have state associated with them) are about
+ * to be merged into {@code mergeResult}.
+ */
+ @Override
+ public void onMerge(Collection<W> toBeMerged, W mergeResult) throws Exception {
+ // Remember we have merged these windows.
+ for (W window : toBeMerged) {
+ windowToMergeResult.put(window, mergeResult);
+ }
+
+ // At this point activeWindows has NOT incorporated the results of the merge.
+ List<W> activeToBeMerged = activeWindows(toBeMerged);
+ ReduceFn<K, InputT, OutputT, W>.OnMergeContext directMergeContext =
+ contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.DIRECT);
+ ReduceFn<K, InputT, OutputT, W>.OnMergeContext renamedMergeContext =
+ contextFactory.forMerge(activeToBeMerged, mergeResult, StateStyle.RENAMED);
+
+ // Run the reduceFn to perform any needed merging.
+ reduceFn.onMerge(renamedMergeContext);
+
+ // Merge the watermark holds.
+ watermarkHold.onMerge(renamedMergeContext);
+
+ // Merge non-empty pane state.
+ nonEmptyPanes.onMerge(renamedMergeContext.state());
+
+ // Have the trigger merge state as needed.
+ triggerRunner.onMerge(
+ directMergeContext.window(), directMergeContext.timers(), directMergeContext.state());
+
+ for (W active : activeToBeMerged) {
+ if (active.equals(mergeResult)) {
+ // Not merged away.
+ continue;
+ }
+ // Cleanup flavor A: Currently ACTIVE window is about to be merged away.
+ // Clear any state not already cleared by the onMerge calls above.
+ WindowTracing.debug("ReduceFnRunner.onMerge: Merging {} into {}", active, mergeResult);
+ ReduceFn<K, InputT, OutputT, W>.Context directClearContext =
+ contextFactory.base(active, StateStyle.DIRECT);
+ // No need for the end-of-window or garbage collection timers.
+ // We will establish a new end-of-window or garbage collection timer for the mergeResult
+ // window in processElement below. There must be at least one element for the mergeResult
+ // window since a new element with a new window must have triggered this onMerge.
+ cancelEndOfWindowAndGarbageCollectionTimers(directClearContext);
+ // We no longer care about any previous panes of merged away windows. The
+ // merge result window gets to start fresh if it is new.
+ paneInfoTracker.clear(directClearContext.state());
+ }
+ }
+ }
+
+ /**
+ * Process an element.
+ *
+ * @param value the value being processed
+ * @return the set of windows in which the element was actually processed
+ */
+ private Collection<W> processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT> value)
+ throws Exception {
+ // Redirect element windows to the ACTIVE windows they have been merged into.
+ // The compressed representation (value, {window1, window2, ...}) actually represents
+ // distinct elements (value, window1), (value, window2), ...
+ // so if window1 and window2 merge, the resulting window will contain both copies
+ // of the value.
+ Collection<W> windows = new ArrayList<>();
+ for (BoundedWindow untypedWindow : value.getWindows()) {
+ @SuppressWarnings("unchecked")
+ W window = (W) untypedWindow;
+ W mergeResult = windowToMergeResult.get(window);
+ if (mergeResult == null) {
+ mergeResult = window;
+ }
+ windows.add(mergeResult);
+ }
+
+ // Prefetch in each of the windows if we're going to need to process triggers
+ for (W window : windows) {
+ ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
+ window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+ triggerRunner.prefetchForValue(window, directContext.state());
+ }
+
+ // Process the element for each (mergeResultWindow, not closed) window it belongs to.
+ List<W> triggerableWindows = new ArrayList<>(windows.size());
+ for (W window : windows) {
+ ReduceFn<K, InputT, OutputT, W>.ProcessValueContext directContext = contextFactory.forValue(
+ window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT);
+ if (triggerRunner.isClosed(directContext.state())) {
+ // This window has already been closed.
+ droppedDueToClosedWindow.addValue(1L);
+ WindowTracing.debug(
+ "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} "
+ + "since window is no longer active at inputWatermark:{}; outputWatermark:{}",
+ value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ continue;
+ }
+
+ triggerableWindows.add(window);
+ activeWindows.ensureWindowIsActive(window);
+ ReduceFn<K, InputT, OutputT, W>.ProcessValueContext renamedContext = contextFactory.forValue(
+ window, value.getValue(), value.getTimestamp(), StateStyle.RENAMED);
+
+ nonEmptyPanes.recordContent(renamedContext.state());
+
+ // Make sure we've scheduled the end-of-window or garbage collection timer for this window.
+ Instant timer = scheduleEndOfWindowOrGarbageCollectionTimer(directContext);
+
+ // Hold back progress of the output watermark until we have processed the pane this
+ // element will be included within. If the element is too late for that, place a hold at
+ // the end-of-window or garbage collection time to allow empty panes to contribute elements
+ // which won't be dropped due to lateness by a following computation (assuming the following
+ // computation uses the same allowed lateness value...)
+ @Nullable Instant hold = watermarkHold.addHolds(renamedContext);
+
+ if (hold != null) {
+ // Assert that holds have a proximate timer.
+ boolean holdInWindow = !hold.isAfter(window.maxTimestamp());
+ boolean timerInWindow = !timer.isAfter(window.maxTimestamp());
+ checkState(
+ holdInWindow == timerInWindow,
+ "set a hold at %s, a timer at %s, which disagree as to whether they are in window %s",
+ hold,
+ timer,
+ directContext.window());
+ }
+
+ // Execute the reduceFn, which will buffer the value as appropriate
+ reduceFn.processValue(renamedContext);
+
+ // Run the trigger to update its state
+ triggerRunner.processValue(
+ directContext.window(),
+ directContext.timestamp(),
+ directContext.timers(),
+ directContext.state());
+
+ // At this point, if triggerRunner.shouldFire before the processValue then
+ // triggerRunner.shouldFire after the processValue. In other words adding values
+ // cannot take a trigger state from firing to non-firing.
+ // (We don't actually assert this since it is too slow.)
+ }
+
+ return triggerableWindows;
+ }
+
+ /**
+ * Called when an end-of-window, garbage collection, or trigger-specific timer fires.
+ */
+ public void onTimer(TimerData timer) throws Exception {
+ // Which window is the timer for?
+ checkArgument(timer.getNamespace() instanceof WindowNamespace,
+ "Expected timer to be in WindowNamespace, but was in %s", timer.getNamespace());
+ @SuppressWarnings("unchecked")
+ WindowNamespace<W> windowNamespace = (WindowNamespace<W>) timer.getNamespace();
+ W window = windowNamespace.getWindow();
+ ReduceFn<K, InputT, OutputT, W>.Context directContext =
+ contextFactory.base(window, StateStyle.DIRECT);
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext =
+ contextFactory.base(window, StateStyle.RENAMED);
+
+ // Has this window had its trigger finish?
+ // - The trigger may implement isClosed as constant false.
+ // - If the window function does not support windowing then all windows will be considered
+ // active.
+ // So we must take conjunction of activeWindows and triggerRunner state.
+ boolean windowIsActiveAndOpen =
+ activeWindows.isActive(window) && !triggerRunner.isClosed(directContext.state());
+
+ if (!windowIsActiveAndOpen) {
+ WindowTracing.debug(
+ "ReduceFnRunner.onTimer: Note that timer {} is for non-ACTIVE window {}", timer, window);
+ }
+
+ // If this is an end-of-window timer then we may need to set a garbage collection timer
+ // if allowed lateness is non-zero.
+ boolean isEndOfWindow = TimeDomain.EVENT_TIME == timer.getDomain()
+ && timer.getTimestamp().equals(window.maxTimestamp());
+
+ // If this is a garbage collection timer then we should trigger and garbage collect the window.
+ // We'll consider any timer at or after the end-of-window time to be a signal to garbage
+ // collect.
+ Instant cleanupTime = garbageCollectionTime(window);
+ boolean isGarbageCollection = TimeDomain.EVENT_TIME == timer.getDomain()
+ && !timer.getTimestamp().isBefore(cleanupTime);
+
+ if (isGarbageCollection) {
+ WindowTracing.debug(
+ "ReduceFnRunner.onTimer: Cleaning up for key:{}; window:{} at {} with "
+ + "inputWatermark:{}; outputWatermark:{}",
+ key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+
+ if (windowIsActiveAndOpen) {
+ // We need to call onTrigger to emit the final pane if required.
+ // The final pane *may* be ON_TIME if no prior ON_TIME pane has been emitted,
+ // and the watermark has passed the end of the window.
+ @Nullable Instant newHold =
+ onTrigger(directContext, renamedContext, true/* isFinished */, isEndOfWindow);
+ checkState(newHold == null,
+ "Hold placed at %s despite isFinished being true.", newHold);
+ }
+
+ // Cleanup flavor B: Clear all the remaining state for this window since we'll never
+ // see elements for it again.
+ clearAllState(directContext, renamedContext, windowIsActiveAndOpen);
+ } else {
+ WindowTracing.debug(
+ "ReduceFnRunner.onTimer: Triggering for key:{}; window:{} at {} with "
+ + "inputWatermark:{}; outputWatermark:{}",
+ key, window, timer.getTimestamp(), timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ if (windowIsActiveAndOpen) {
+ emitIfAppropriate(directContext, renamedContext);
+ }
+
+ if (isEndOfWindow) {
+ // If the window strategy trigger includes a watermark trigger then at this point
+ // there should be no data holds, either because we'd already cleared them on an
+ // earlier onTrigger, or because we just cleared them on the above emitIfAppropriate.
+ // We could assert this but it is very expensive.
+
+ // Since we are processing an on-time firing we should schedule the garbage collection
+ // timer. (If getAllowedLateness is zero then the timer event will be considered a
+ // cleanup event and handled by the above).
+ // Note we must do this even if the trigger is finished so that we are sure to cleanup
+ // any final trigger finished bits.
+ checkState(
+ windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO),
+ "Unexpected zero getAllowedLateness");
+ WindowTracing.debug(
+ "ReduceFnRunner.onTimer: Scheduling cleanup timer for key:{}; window:{} at {} with "
+ + "inputWatermark:{}; outputWatermark:{}",
+ key, directContext.window(), cleanupTime, timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ checkState(!cleanupTime.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "Cleanup time %s is beyond end-of-time", cleanupTime);
+ directContext.timers().setTimer(cleanupTime, TimeDomain.EVENT_TIME);
+ }
+ }
+ }
+
+ /**
+ * Clear all the state associated with {@code context}'s window.
+ * Should only be invoked if we know all future elements for this window will be considered
+ * beyond allowed lateness.
+ * This is a superset of the clearing done by {@link #emitIfAppropriate} below since:
+ * <ol>
+ * <li>We can clear the trigger finished bits since we'll never need to ask if the trigger is
+ * closed again.
+ * <li>We can clear any remaining garbage collection hold.
+ * </ol>
+ */
+ private void clearAllState(
+ ReduceFn<K, InputT, OutputT, W>.Context directContext,
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
+ boolean windowIsActiveAndOpen)
+ throws Exception {
+ if (windowIsActiveAndOpen) {
+ // Since both the window is in the active window set AND the trigger was not yet closed,
+ // it is possible we still have state.
+ reduceFn.clearState(renamedContext);
+ watermarkHold.clearHolds(renamedContext);
+ nonEmptyPanes.clearPane(renamedContext.state());
+ // These calls work irrespective of whether the window is active or not, but
+ // are unnecessary if the window is not active.
+ triggerRunner.clearState(
+ directContext.window(), directContext.timers(), directContext.state());
+ paneInfoTracker.clear(directContext.state());
+ } else {
+ // If !windowIsActiveAndOpen then !activeWindows.isActive (1) or triggerRunner.isClosed (2).
+ // For (1), if !activeWindows.isActive then the window must be merging and has been
+ // explicitly removed by emitIfAppropriate. But in that case the trigger must have fired
+ // and been closed, so this case reduces to (2).
+ // For (2), if triggerRunner.isClosed then the trigger was fired and entered the
+ // closed state. In that case emitIfAppropriate will have cleared all state in
+ // reduceFn, triggerRunner (except for finished bits), paneInfoTracker and activeWindows.
+ // We also know nonEmptyPanes must have been unconditionally cleared by the trigger.
+ // Since the trigger fired the existing watermark holds must have been cleared, and since
+ // the trigger closed no new end of window or garbage collection hold will have been
+ // placed by WatermarkHold.extractAndRelease.
+ // Thus all the state clearing above is unnecessary.
+ //
+ // But(!) for backwards compatibility we must allow a pipeline to be updated from
+ // an sdk version <= 1.3. In that case it is possible we have an end-of-window or
+ // garbage collection hold keyed by the current window (reached via directContext) rather
+ // than the state address window (reached via renamedContext).
+ // However this can only happen if:
+ // - We have merging windows.
+ // - We are DISCARDING_FIRED_PANES.
+ // - A pane has fired.
+ // - But the trigger is not (yet) closed.
+ if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES
+ && !windowingStrategy.getWindowFn().isNonMerging()) {
+ watermarkHold.clearHolds(directContext);
+ }
+ }
+
+ // Don't need to track address state windows anymore.
+ activeWindows.remove(directContext.window());
+ // We'll never need to test for the trigger being closed again.
+ triggerRunner.clearFinished(directContext.state());
+ }
+
+ /** Should the reduce function state be cleared? */
+ private boolean shouldDiscardAfterFiring(boolean isFinished) {
+ if (isFinished) {
+ // This is the last firing for trigger.
+ return true;
+ }
+ if (windowingStrategy.getMode() == AccumulationMode.DISCARDING_FIRED_PANES) {
+ // Nothing should be accumulated between panes.
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Possibly emit a pane if a trigger is ready to fire or timers require it, and cleanup state.
+ */
+ private void emitIfAppropriate(ReduceFn<K, InputT, OutputT, W>.Context directContext,
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext)
+ throws Exception {
+ if (!triggerRunner.shouldFire(
+ directContext.window(), directContext.timers(), directContext.state())) {
+ // Ignore unless trigger is ready to fire
+ return;
+ }
+
+ // Inform the trigger of the transition to see if it is finished
+ triggerRunner.onFire(directContext.window(), directContext.timers(), directContext.state());
+ boolean isFinished = triggerRunner.isClosed(directContext.state());
+
+ // Will be able to clear all element state after triggering?
+ boolean shouldDiscard = shouldDiscardAfterFiring(isFinished);
+
+ // Run onTrigger to produce the actual pane contents.
+ // As a side effect it will clear all element holds, but not necessarily any
+ // end-of-window or garbage collection holds.
+ onTrigger(directContext, renamedContext, isFinished, false /*isEndOfWindow*/);
+
+ // Now that we've triggered, the pane is empty.
+ nonEmptyPanes.clearPane(renamedContext.state());
+
+ // Cleanup buffered data if appropriate
+ if (shouldDiscard) {
+ // Cleanup flavor C: The user does not want any buffered data to persist between panes.
+ reduceFn.clearState(renamedContext);
+ }
+
+ if (isFinished) {
+ // Cleanup flavor D: If trigger is closed we will ignore all new incoming elements.
+ // Clear state not otherwise cleared by onTrigger and clearPane above.
+ // Remember the trigger is, indeed, closed until the window is garbage collected.
+ triggerRunner.clearState(
+ directContext.window(), directContext.timers(), directContext.state());
+ paneInfoTracker.clear(directContext.state());
+ activeWindows.remove(directContext.window());
+ }
+ }
+
+ /**
+ * Do we need to emit a pane?
+ */
+ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing timing) {
+ if (!isEmpty) {
+ // The pane has elements.
+ return true;
+ }
+ if (timing == Timing.ON_TIME) {
+ // This is the unique ON_TIME pane.
+ return true;
+ }
+ if (isFinished && windowingStrategy.getClosingBehavior() == ClosingBehavior.FIRE_ALWAYS) {
+ // This is known to be the final pane, and the user has requested it even when empty.
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Run the {@link ReduceFn#onTrigger} method and produce any necessary output.
+ *
+ * @return output watermark hold added, or {@literal null} if none.
+ */
+ @Nullable
+ private Instant onTrigger(
+ final ReduceFn<K, InputT, OutputT, W>.Context directContext,
+ ReduceFn<K, InputT, OutputT, W>.Context renamedContext,
+ boolean isFinished, boolean isEndOfWindow)
+ throws Exception {
+ Instant inputWM = timerInternals.currentInputWatermarkTime();
+
+ // Prefetch necessary states
+ ReadableState<WatermarkHold.OldAndNewHolds> outputTimestampFuture =
+ watermarkHold.extractAndRelease(renamedContext, isFinished).readLater();
+ ReadableState<PaneInfo> paneFuture =
+ paneInfoTracker.getNextPaneInfo(directContext, isFinished).readLater();
+ ReadableState<Boolean> isEmptyFuture =
+ nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
+
+ reduceFn.prefetchOnTrigger(directContext.state());
+ triggerRunner.prefetchOnFire(directContext.window(), directContext.state());
+
+ // Calculate the pane info.
+ final PaneInfo pane = paneFuture.read();
+ // Extract the window hold, and as a side effect clear it.
+
+ WatermarkHold.OldAndNewHolds pair = outputTimestampFuture.read();
+ final Instant outputTimestamp = pair.oldHold;
+ @Nullable Instant newHold = pair.newHold;
+
+ if (newHold != null) {
+ // We can't be finished yet.
+ checkState(
+ !isFinished, "new hold at %s but finished %s", newHold, directContext.window());
+ // The hold cannot be behind the input watermark.
+ checkState(
+ !newHold.isBefore(inputWM), "new hold %s is before input watermark %s", newHold, inputWM);
+ if (newHold.isAfter(directContext.window().maxTimestamp())) {
+ // The hold must be for garbage collection, which can't have happened yet.
+ checkState(
+ newHold.isEqual(garbageCollectionTime(directContext.window())),
+ "new hold %s should be at garbage collection for window %s plus %s",
+ newHold,
+ directContext.window(),
+ windowingStrategy.getAllowedLateness());
+ } else {
+ // The hold must be for the end-of-window, which can't have happened yet.
+ checkState(
+ newHold.isEqual(directContext.window().maxTimestamp()),
+ "new hold %s should be at end of window %s",
+ newHold,
+ directContext.window());
+ checkState(
+ !isEndOfWindow,
+ "new hold at %s for %s but this is the watermark trigger",
+ newHold,
+ directContext.window());
+ }
+ }
+
+ // Only emit a pane if it has data or empty panes are observable.
+ if (needToEmit(isEmptyFuture.read(), isFinished, pane.getTiming())) {
+ // Run reduceFn.onTrigger method.
+ final List<W> windows = Collections.singletonList(directContext.window());
+ ReduceFn<K, InputT, OutputT, W>.OnTriggerContext renamedTriggerContext =
+ contextFactory.forTrigger(directContext.window(), paneFuture, StateStyle.RENAMED,
+ new OnTriggerCallbacks<OutputT>() {
+ @Override
+ public void output(OutputT toOutput) {
+ // We're going to output panes, so commit the (now used) PaneInfo.
+ // TODO: This is unnecessary if the trigger isFinished since the saved
+ // state will be immediately deleted.
+ paneInfoTracker.storeCurrentPaneInfo(directContext, pane);
+
+ // Output the actual value.
+ outputter.outputWindowedValue(
+ KV.of(key, toOutput), outputTimestamp, windows, pane);
+ }
+ });
+
+ reduceFn.onTrigger(renamedTriggerContext);
+ }
+
+ return newHold;
+ }
+
+ /**
+ * Make sure we'll eventually have a timer fire which will tell us to garbage collect
+ * the window state. For efficiency we may need to do this in two steps rather
+ * than one. Return the time at which the timer will fire.
+ *
+ * <ul>
+ * <li>If allowedLateness is zero then we'll garbage collect at the end of the window.
+ * For simplicity we'll set our own timer for this situation even though an
+ * {@link AfterWatermark} trigger may have also set an end-of-window timer.
+ * ({@code setTimer} is idempotent.)
+ * <li>If allowedLateness is non-zero then we could just always set a timer for the garbage
+ * collection time. However if the windows are large (eg hourly) and the allowedLateness is small
+ * (eg seconds) then we'll end up with nearly twice the number of timers in-flight. So we
+ * instead set an end-of-window timer and then roll that forward to a garbage collection timer
+ * when it fires. We use the input watermark to distinguish those cases.
+ * </ul>
+ */
+ private Instant scheduleEndOfWindowOrGarbageCollectionTimer(
+ ReduceFn<?, ?, ?, W>.Context directContext) {
+ Instant inputWM = timerInternals.currentInputWatermarkTime();
+ Instant endOfWindow = directContext.window().maxTimestamp();
+ String which;
+ Instant timer;
+ if (endOfWindow.isBefore(inputWM)) {
+ timer = garbageCollectionTime(directContext.window());
+ which = "garbage collection";
+ } else {
+ timer = endOfWindow;
+ which = "end-of-window";
+ }
+ WindowTracing.trace(
+ "ReduceFnRunner.scheduleEndOfWindowOrGarbageCollectionTimer: Scheduling {} timer at {} for "
+ + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
+ which,
+ timer,
+ key,
+ directContext.window(),
+ inputWM,
+ timerInternals.currentOutputWatermarkTime());
+ checkState(!timer.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE),
+ "Timer %s is beyond end-of-time", timer);
+ directContext.timers().setTimer(timer, TimeDomain.EVENT_TIME);
+ return timer;
+ }
+
+ private void cancelEndOfWindowAndGarbageCollectionTimers(
+ ReduceFn<?, ?, ?, W>.Context directContext) {
+ WindowTracing.debug(
+ "ReduceFnRunner.cancelEndOfWindowAndGarbageCollectionTimers: Deleting timers for "
+ + "key:{}; window:{} where inputWatermark:{}; outputWatermark:{}",
+ key, directContext.window(), timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ Instant eow = directContext.window().maxTimestamp();
+ directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
+ Instant gc = garbageCollectionTime(directContext.window());
+ if (gc.isAfter(eow)) {
+ directContext.timers().deleteTimer(eow, TimeDomain.EVENT_TIME);
+ }
+ }
+
+ /**
+ * Return when {@code window} should be garbage collected. If the window's expiration time is on
+ * or after the end of the global window, it will be truncated to the end of the global window.
+ */
+ private Instant garbageCollectionTime(W window) {
+
+ // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the
+ // global window, then we truncate it. The conditional is phrased like it is because the
+ // addition of EOW + allowed lateness might even overflow the maximum allowed Instant
+ if (GlobalWindow.INSTANCE
+ .maxTimestamp()
+ .minus(windowingStrategy.getAllowedLateness())
+ .isBefore(window.maxTimestamp())) {
+ return GlobalWindow.INSTANCE.maxTimestamp();
+ } else {
+ return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+ }
+ }
+
+ /**
+ * An object that can output a value with all of its windowing information. This is a deliberately
+ * restricted subinterface of {@link WindowingInternals} to express how it is used here.
+ */
+ private interface OutputWindowedValue<OutputT> {
+ void outputWindowedValue(OutputT output, Instant timestamp,
+ Collection<? extends BoundedWindow> windows, PaneInfo pane);
+ }
+
+ private static class OutputViaWindowingInternals<OutputT>
+ implements OutputWindowedValue<OutputT> {
+
+ private final WindowingInternals<?, OutputT> windowingInternals;
+
+ public OutputViaWindowingInternals(WindowingInternals<?, OutputT> windowingInternals) {
+ this.windowingInternals = windowingInternals;
+ }
+
+ @Override
+ public void outputWindowedValue(
+ OutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ windowingInternals.outputWindowedValue(output, timestamp, windows, pane);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4bf3a3b3/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
new file mode 100644
index 0000000..df74ed3
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.List;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.util.ExecutionContext.StepContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Runs a {@link OldDoFn} by constructing the appropriate contexts and passing them in.
+ *
+ * @param <InputT> the type of the {@link OldDoFn} (main) input elements
+ * @param <OutputT> the type of the {@link OldDoFn} (main) output elements
+ */
+public class SimpleDoFnRunner<InputT, OutputT> extends DoFnRunnerBase<InputT, OutputT> {
+
+ protected SimpleDoFnRunner(PipelineOptions options, OldDoFn<InputT, OutputT> fn,
+ SideInputReader sideInputReader,
+ OutputManager outputManager,
+ TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> sideOutputTags, StepContext stepContext,
+ AggregatorFactory aggregatorFactory, WindowingStrategy<?, ?> windowingStrategy) {
+ super(options, fn, sideInputReader, outputManager, mainOutputTag, sideOutputTags, stepContext,
+ aggregatorFactory, windowingStrategy);
+ }
+
+ @Override
+ protected void invokeProcessElement(WindowedValue<InputT> elem) {
+ final OldDoFn<InputT, OutputT>.ProcessContext processContext = createProcessContext(elem);
+ // This can contain user code. Wrap it in case it throws an exception.
+ try {
+ fn.processElement(processContext);
+ } catch (Exception ex) {
+ throw wrapUserCodeException(ex);
+ }
+ }
+}