You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:10 UTC
[17/50] [abbrv] incubator-beam git commit: Remove InProcess Prefixes
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
deleted file mode 100644
index fb8eb7c..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ /dev/null
@@ -1,1420 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ComparisonChain;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-import com.google.common.collect.SortedMultiset;
-import com.google.common.collect.TreeMultiset;
-
-import org.joda.time.Instant;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicReference;
-
-import javax.annotation.Nullable;
-
-/**
- * Manages watermarks of {@link PCollection PCollections} and input and output watermarks of
- * {@link AppliedPTransform AppliedPTransforms} to provide event-time and completion tracking for
- * in-memory execution. {@link InMemoryWatermarkManager} is designed to update and return a
- * consistent view of watermarks in the presence of concurrent updates.
- *
- * <p>An {@link InMemoryWatermarkManager} is provided with the collection of root
- * {@link AppliedPTransform AppliedPTransforms} and a map of {@link PCollection PCollections} to
- * all the {@link AppliedPTransform AppliedPTransforms} that consume them at construction time.
- *
- * <p>Whenever a root {@link AppliedPTransform transform} produces elements, the
- * {@link InMemoryWatermarkManager} is provided with the produced elements and the output watermark
- * of the producing {@link AppliedPTransform transform}. The
- * {@link InMemoryWatermarkManager watermark manager} is responsible for computing the watermarks
- * of all {@link AppliedPTransform transforms} that consume one or more
- * {@link PCollection PCollections}.
- *
- * <p>Whenever a non-root {@link AppliedPTransform} finishes processing one or more in-flight
- * elements (referred to as the input {@link CommittedBundle bundle}), the following occurs
- * atomically:
- * <ul>
- * <li>All of the in-flight elements are removed from the collection of pending elements for the
- * {@link AppliedPTransform}.</li>
- * <li>All of the elements produced by the {@link AppliedPTransform} are added to the collection
- * of pending elements for each {@link AppliedPTransform} that consumes them.</li>
- * <li>The input watermark for the {@link AppliedPTransform} becomes the maximum value of
- * <ul>
- * <li>the previous input watermark</li>
- * <li>the minimum of
- * <ul>
- * <li>the timestamps of all currently pending elements</li>
- * <li>all input {@link PCollection} watermarks</li>
- * </ul>
- * </li>
- * </ul>
- * </li>
- * <li>The output watermark for the {@link AppliedPTransform} becomes the maximum of
- * <ul>
- * <li>the previous output watermark</li>
- * <li>the minimum of
- * <ul>
- * <li>the current input watermark</li>
- * <li>the current watermark holds</li>
- * </ul>
- * </li>
- * </ul>
- * </li>
- * <li>The watermark of the output {@link PCollection} can be advanced to the output watermark of
- * the {@link AppliedPTransform}</li>
- * <li>The watermark of all downstream {@link AppliedPTransform AppliedPTransforms} can be
- * advanced.</li>
- * </ul>
- *
- * <p>The watermark of a {@link PCollection} is equal to the output watermark of the
- * {@link AppliedPTransform} that produces it.
- *
- * <p>The watermarks for a {@link PTransform} are updated as follows when output is committed:<pre>
- * Watermark_In' = MAX(Watermark_In, MIN(U(TS_Pending), U(Watermark_InputPCollection)))
- * Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(StateHold)))
- * Watermark_PCollection = Watermark_Out_ProducingPTransform
- * </pre>
- */
-public class InMemoryWatermarkManager {
- /**
- * The watermark of some {@link Pipeline} element, usually a {@link PTransform} or a
- * {@link PCollection}.
- *
- * <p>A watermark is a monotonically increasing value, which represents the point up to which the
- * system believes it has received all of the data. Data that arrives with a timestamp that is
- * before the watermark is considered late. {@link BoundedWindow#TIMESTAMP_MAX_VALUE} is a special
- * timestamp which indicates we have received all of the data and there will be no more on-time or
- * late data. This value is represented by {@link InMemoryWatermarkManager#THE_END_OF_TIME}.
- */
- private static interface Watermark {
- /**
- * Returns the current value of this watermark.
- */
- Instant get();
-
- /**
- * Refreshes the value of this watermark from its input watermarks and watermark holds.
- *
- * @return true if the value of the watermark has changed (and thus dependent watermark must
- * also be updated
- */
- WatermarkUpdate refresh();
- }
-
- /**
- * The result of computing a {@link Watermark}.
- */
- private static enum WatermarkUpdate {
- /** The watermark is later than the value at the previous time it was computed. */
- ADVANCED(true),
- /** The watermark is equal to the value at the previous time it was computed. */
- NO_CHANGE(false);
-
- private final boolean advanced;
-
- private WatermarkUpdate(boolean advanced) {
- this.advanced = advanced;
- }
-
- public boolean isAdvanced() {
- return advanced;
- }
-
- /**
- * Returns the {@link WatermarkUpdate} that is a result of combining the two watermark updates.
- *
- * If either of the input {@link WatermarkUpdate WatermarkUpdates} were advanced, the result
- * {@link WatermarkUpdate} has been advanced.
- */
- public WatermarkUpdate union(WatermarkUpdate that) {
- if (this.advanced) {
- return this;
- }
- return that;
- }
-
- /**
- * Returns the {@link WatermarkUpdate} based on the former and current
- * {@link Instant timestamps}.
- */
- public static WatermarkUpdate fromTimestamps(Instant oldTime, Instant currentTime) {
- if (currentTime.isAfter(oldTime)) {
- return ADVANCED;
- }
- return NO_CHANGE;
- }
- }
-
- /**
- * The input {@link Watermark} of an {@link AppliedPTransform}.
- *
- * <p>At any point, the value of an {@link AppliedPTransformInputWatermark} is equal to the
- * minimum watermark across all of its input {@link Watermark Watermarks}, and the minimum
- * timestamp of all of the pending elements, restricted to be monotonically increasing.
- *
- * <p>See {@link #refresh()} for more information.
- */
- private static class AppliedPTransformInputWatermark implements Watermark {
- private final Collection<? extends Watermark> inputWatermarks;
- private final SortedMultiset<WindowedValue<?>> pendingElements;
- private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
-
- private AtomicReference<Instant> currentWatermark;
-
- public AppliedPTransformInputWatermark(Collection<? extends Watermark> inputWatermarks) {
- this.inputWatermarks = inputWatermarks;
- this.pendingElements = TreeMultiset.create(new WindowedValueByTimestampComparator());
- this.objectTimers = new HashMap<>();
- currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
- }
-
- @Override
- public Instant get() {
- return currentWatermark.get();
- }
-
- /**
- * {@inheritDoc}.
- *
- * <p>When refresh is called, the value of the {@link AppliedPTransformInputWatermark} becomes
- * equal to the maximum value of
- * <ul>
- * <li>the previous input watermark</li>
- * <li>the minimum of
- * <ul>
- * <li>the timestamps of all currently pending elements</li>
- * <li>all input {@link PCollection} watermarks</li>
- * </ul>
- * </li>
- * </ul>
- */
- @Override
- public synchronized WatermarkUpdate refresh() {
- Instant oldWatermark = currentWatermark.get();
- Instant minInputWatermark = BoundedWindow.TIMESTAMP_MAX_VALUE;
- for (Watermark inputWatermark : inputWatermarks) {
- minInputWatermark = INSTANT_ORDERING.min(minInputWatermark, inputWatermark.get());
- }
- if (!pendingElements.isEmpty()) {
- minInputWatermark = INSTANT_ORDERING.min(
- minInputWatermark, pendingElements.firstEntry().getElement().getTimestamp());
- }
- Instant newWatermark = INSTANT_ORDERING.max(oldWatermark, minInputWatermark);
- currentWatermark.set(newWatermark);
- return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
- }
-
- private synchronized void addPendingElements(Iterable<? extends WindowedValue<?>> newPending) {
- for (WindowedValue<?> pendingElement : newPending) {
- pendingElements.add(pendingElement);
- }
- }
-
- private synchronized void removePendingElements(
- Iterable<? extends WindowedValue<?>> finishedElements) {
- for (WindowedValue<?> finishedElement : finishedElements) {
- pendingElements.remove(finishedElement);
- }
- }
-
- private synchronized void updateTimers(TimerUpdate update) {
- NavigableSet<TimerData> keyTimers = objectTimers.get(update.key);
- if (keyTimers == null) {
- keyTimers = new TreeSet<>();
- objectTimers.put(update.key, keyTimers);
- }
- for (TimerData timer : update.setTimers) {
- if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
- keyTimers.add(timer);
- }
- }
- for (TimerData timer : update.deletedTimers) {
- if (TimeDomain.EVENT_TIME.equals(timer.getDomain())) {
- keyTimers.remove(timer);
- }
- }
- // We don't keep references to timers that have been fired and delivered via #getFiredTimers()
- }
-
- private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
- return extractFiredTimers(currentWatermark.get(), objectTimers);
- }
-
- @Override
- public synchronized String toString() {
- return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class)
- .add("pendingElements", pendingElements)
- .add("currentWatermark", currentWatermark)
- .toString();
- }
- }
-
- /**
- * The output {@link Watermark} of an {@link AppliedPTransform}.
- *
- * <p>The value of an {@link AppliedPTransformOutputWatermark} is equal to the minimum of the
- * current watermark hold and the {@link AppliedPTransformInputWatermark} for the same
- * {@link AppliedPTransform}, restricted to be monotonically increasing. See
- * {@link #refresh()} for more information.
- */
- private static class AppliedPTransformOutputWatermark implements Watermark {
- private final Watermark inputWatermark;
- private final PerKeyHolds holds;
- private AtomicReference<Instant> currentWatermark;
-
- public AppliedPTransformOutputWatermark(AppliedPTransformInputWatermark inputWatermark) {
- this.inputWatermark = inputWatermark;
- holds = new PerKeyHolds();
- currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
- }
-
- public synchronized void updateHold(Object key, Instant newHold) {
- if (newHold == null) {
- holds.removeHold(key);
- } else {
- holds.updateHold(key, newHold);
- }
- }
-
- @Override
- public Instant get() {
- return currentWatermark.get();
- }
-
- /**
- * {@inheritDoc}.
- *
- * <p>When refresh is called, the value of the {@link AppliedPTransformOutputWatermark} becomes
- * equal to the maximum value of:
- * <ul>
- * <li>the previous output watermark</li>
- * <li>the minimum of
- * <ul>
- * <li>the current input watermark</li>
- * <li>the current watermark holds</li>
- * </ul>
- * </li>
- * </ul>
- */
- @Override
- public synchronized WatermarkUpdate refresh() {
- Instant oldWatermark = currentWatermark.get();
- Instant newWatermark = INSTANT_ORDERING.min(inputWatermark.get(), holds.getMinHold());
- newWatermark = INSTANT_ORDERING.max(oldWatermark, newWatermark);
- currentWatermark.set(newWatermark);
- return WatermarkUpdate.fromTimestamps(oldWatermark, newWatermark);
- }
-
- @Override
- public synchronized String toString() {
- return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class)
- .add("holds", holds)
- .add("currentWatermark", currentWatermark)
- .toString();
- }
- }
-
- /**
- * The input {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
- * {@link AppliedPTransform}.
- *
- * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeInputWatermark} is equal
- * to the minimum across all pending bundles at the {@link AppliedPTransform} and all upstream
- * {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the input
- * synchronized processing time at any step is equal to the maximum of:
- * <ul>
- * <li>The most recently returned synchronized processing input time
- * <li>The minimum of
- * <ul>
- * <li>The current processing time
- * <li>The current synchronized processing time input hold
- * </ul>
- * </ul>
- */
- private static class SynchronizedProcessingTimeInputWatermark implements Watermark {
- private final Collection<? extends Watermark> inputWms;
- private final Collection<CommittedBundle<?>> pendingBundles;
- private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers;
- private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers;
-
- private final PriorityQueue<TimerData> pendingTimers;
-
- private AtomicReference<Instant> earliestHold;
-
- public SynchronizedProcessingTimeInputWatermark(Collection<? extends Watermark> inputWms) {
- this.inputWms = inputWms;
- this.pendingBundles = new HashSet<>();
- this.processingTimers = new HashMap<>();
- this.synchronizedProcessingTimers = new HashMap<>();
- this.pendingTimers = new PriorityQueue<>();
- Instant initialHold = BoundedWindow.TIMESTAMP_MAX_VALUE;
- for (Watermark wm : inputWms) {
- initialHold = INSTANT_ORDERING.min(initialHold, wm.get());
- }
- earliestHold = new AtomicReference<>(initialHold);
- }
-
- @Override
- public Instant get() {
- return earliestHold.get();
- }
-
- /**
- * {@inheritDoc}.
- *
- * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeInputWatermark}
- * becomes equal to the minimum value of
- * <ul>
- * <li>the timestamps of all currently pending bundles</li>
- * <li>all input {@link PCollection} synchronized processing time watermarks</li>
- * </ul>
- *
- * <p>Note that this value is not monotonic, but the returned value for the synchronized
- * processing time must be.
- */
- @Override
- public synchronized WatermarkUpdate refresh() {
- Instant oldHold = earliestHold.get();
- Instant minTime = THE_END_OF_TIME.get();
- for (Watermark input : inputWms) {
- minTime = INSTANT_ORDERING.min(minTime, input.get());
- }
- for (CommittedBundle<?> bundle : pendingBundles) {
- // TODO: Track elements in the bundle by the processing time they were output instead of
- // entire bundles. Requried to support arbitrarily splitting and merging bundles between
- // steps
- minTime = INSTANT_ORDERING.min(minTime, bundle.getSynchronizedProcessingOutputWatermark());
- }
- earliestHold.set(minTime);
- return WatermarkUpdate.fromTimestamps(oldHold, minTime);
- }
-
- public synchronized void addPending(CommittedBundle<?> bundle) {
- pendingBundles.add(bundle);
- }
-
- public synchronized void removePending(CommittedBundle<?> bundle) {
- pendingBundles.remove(bundle);
- }
-
- /**
- * Return the earliest timestamp of the earliest timer that has not been completed. This is
- * either the earliest timestamp across timers that have not been completed, or the earliest
- * timestamp across timers that have been delivered but have not been completed.
- */
- public synchronized Instant getEarliestTimerTimestamp() {
- Instant earliest = THE_END_OF_TIME.get();
- for (NavigableSet<TimerData> timers : processingTimers.values()) {
- if (!timers.isEmpty()) {
- earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
- }
- }
- for (NavigableSet<TimerData> timers : synchronizedProcessingTimers.values()) {
- if (!timers.isEmpty()) {
- earliest = INSTANT_ORDERING.min(timers.first().getTimestamp(), earliest);
- }
- }
- if (!pendingTimers.isEmpty()) {
- earliest = INSTANT_ORDERING.min(pendingTimers.peek().getTimestamp(), earliest);
- }
- return earliest;
- }
-
- private synchronized void updateTimers(TimerUpdate update) {
- Map<TimeDomain, NavigableSet<TimerData>> timerMap = timerMap(update.key);
- for (TimerData addedTimer : update.setTimers) {
- NavigableSet<TimerData> timerQueue = timerMap.get(addedTimer.getDomain());
- if (timerQueue != null) {
- timerQueue.add(addedTimer);
- }
- }
-
- for (TimerData completedTimer : update.completedTimers) {
- pendingTimers.remove(completedTimer);
- }
- for (TimerData deletedTimer : update.deletedTimers) {
- NavigableSet<TimerData> timerQueue = timerMap.get(deletedTimer.getDomain());
- if (timerQueue != null) {
- timerQueue.remove(deletedTimer);
- }
- }
- }
-
- private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredDomainTimers(
- TimeDomain domain, Instant firingTime) {
- Map<StructuralKey<?>, List<TimerData>> firedTimers;
- switch (domain) {
- case PROCESSING_TIME:
- firedTimers = extractFiredTimers(firingTime, processingTimers);
- break;
- case SYNCHRONIZED_PROCESSING_TIME:
- firedTimers =
- extractFiredTimers(
- INSTANT_ORDERING.min(firingTime, earliestHold.get()),
- synchronizedProcessingTimers);
- break;
- default:
- throw new IllegalArgumentException(
- "Called getFiredTimers on a Synchronized Processing Time watermark"
- + " and gave a non-processing time domain "
- + domain);
- }
- for (Map.Entry<StructuralKey<?>, ? extends Collection<TimerData>> firedTimer :
- firedTimers.entrySet()) {
- pendingTimers.addAll(firedTimer.getValue());
- }
- return firedTimers;
- }
-
- private Map<TimeDomain, NavigableSet<TimerData>> timerMap(StructuralKey<?> key) {
- NavigableSet<TimerData> processingQueue = processingTimers.get(key);
- if (processingQueue == null) {
- processingQueue = new TreeSet<>();
- processingTimers.put(key, processingQueue);
- }
- NavigableSet<TimerData> synchronizedProcessingQueue =
- synchronizedProcessingTimers.get(key);
- if (synchronizedProcessingQueue == null) {
- synchronizedProcessingQueue = new TreeSet<>();
- synchronizedProcessingTimers.put(key, synchronizedProcessingQueue);
- }
- EnumMap<TimeDomain, NavigableSet<TimerData>> result = new EnumMap<>(TimeDomain.class);
- result.put(TimeDomain.PROCESSING_TIME, processingQueue);
- result.put(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, synchronizedProcessingQueue);
- return result;
- }
-
- @Override
- public synchronized String toString() {
- return MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class)
- .add("earliestHold", earliestHold)
- .toString();
- }
- }
-
- /**
- * The output {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} hold for an
- * {@link AppliedPTransform}.
- *
- * <p>At any point, the hold value of an {@link SynchronizedProcessingTimeOutputWatermark} is
- * equal to the minimum across all incomplete timers at the {@link AppliedPTransform} and all
- * upstream {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} watermarks. The value of the output
- * synchronized processing time at any step is equal to the maximum of:
- * <ul>
- * <li>The most recently returned synchronized processing output time
- * <li>The minimum of
- * <ul>
- * <li>The current processing time
- * <li>The current synchronized processing time output hold
- * </ul>
- * </ul>
- */
- private static class SynchronizedProcessingTimeOutputWatermark implements Watermark {
- private final SynchronizedProcessingTimeInputWatermark inputWm;
- private AtomicReference<Instant> latestRefresh;
-
- public SynchronizedProcessingTimeOutputWatermark(
- SynchronizedProcessingTimeInputWatermark inputWm) {
- this.inputWm = inputWm;
- this.latestRefresh = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
- }
-
- @Override
- public Instant get() {
- return latestRefresh.get();
- }
-
- /**
- * {@inheritDoc}.
- *
- * <p>When refresh is called, the value of the {@link SynchronizedProcessingTimeOutputWatermark}
- * becomes equal to the minimum value of:
- * <ul>
- * <li>the current input watermark.
- * <li>all {@link TimeDomain#SYNCHRONIZED_PROCESSING_TIME} timers that are based on the input
- * watermark.
- * <li>all {@link TimeDomain#PROCESSING_TIME} timers that are based on the input watermark.
- * </ul>
- *
- * <p>Note that this value is not monotonic, but the returned value for the synchronized
- * processing time must be.
- */
- @Override
- public synchronized WatermarkUpdate refresh() {
- // Hold the output synchronized processing time to the input watermark, which takes into
- // account buffered bundles, and the earliest pending timer, which determines what to hold
- // downstream timers to.
- Instant oldRefresh = latestRefresh.get();
- Instant newTimestamp =
- INSTANT_ORDERING.min(inputWm.get(), inputWm.getEarliestTimerTimestamp());
- latestRefresh.set(newTimestamp);
- return WatermarkUpdate.fromTimestamps(oldRefresh, newTimestamp);
- }
-
- @Override
- public synchronized String toString() {
- return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class)
- .add("latestRefresh", latestRefresh)
- .toString();
- }
- }
-
- /**
- * The {@code Watermark} that is after the latest time it is possible to represent in the global
- * window. This is a distinguished value representing a complete {@link PTransform}.
- */
- private static final Watermark THE_END_OF_TIME = new Watermark() {
- @Override
- public WatermarkUpdate refresh() {
- // THE_END_OF_TIME is a distinguished value that cannot be advanced.
- return WatermarkUpdate.NO_CHANGE;
- }
-
- @Override
- public Instant get() {
- return BoundedWindow.TIMESTAMP_MAX_VALUE;
- }
- };
-
- private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
-
- /**
- * A function that takes a WindowedValue and returns the exploded representation of that
- * {@link WindowedValue}.
- */
- private static final Function<WindowedValue<?>, ? extends Iterable<? extends WindowedValue<?>>>
- EXPLODE_WINDOWS_FN =
- new Function<WindowedValue<?>, Iterable<? extends WindowedValue<?>>>() {
- @Override
- public Iterable<? extends WindowedValue<?>> apply(WindowedValue<?> input) {
- return input.explodeWindows();
- }
- };
-
- /**
- * For each (Object, PriorityQueue) pair in the provided map, remove each Timer that is before the
- * latestTime argument and put in in the result with the same key, then remove all of the keys
- * which have no more pending timers.
- *
- * The result collection retains ordering of timers (from earliest to latest).
- */
- private static Map<StructuralKey<?>, List<TimerData>> extractFiredTimers(
- Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers) {
- Map<StructuralKey<?>, List<TimerData>> result = new HashMap<>();
- Set<StructuralKey<?>> emptyKeys = new HashSet<>();
- for (Map.Entry<StructuralKey<?>, NavigableSet<TimerData>> pendingTimers :
- objectTimers.entrySet()) {
- NavigableSet<TimerData> timers = pendingTimers.getValue();
- if (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
- ArrayList<TimerData> keyFiredTimers = new ArrayList<>();
- result.put(pendingTimers.getKey(), keyFiredTimers);
- while (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
- keyFiredTimers.add(timers.first());
- timers.remove(timers.first());
- }
- }
- if (timers.isEmpty()) {
- emptyKeys.add(pendingTimers.getKey());
- }
- }
- objectTimers.keySet().removeAll(emptyKeys);
- return result;
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////
-
- /**
- * The {@link Clock} providing the current time in the {@link TimeDomain#PROCESSING_TIME} domain.
- */
- private final Clock clock;
-
- /**
- * A map from each {@link PCollection} to all {@link AppliedPTransform PTransform applications}
- * that consume that {@link PCollection}.
- */
- private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers;
-
- /**
- * The input and output watermark of each {@link AppliedPTransform}.
- */
- private final Map<AppliedPTransform<?, ?, ?>, TransformWatermarks> transformToWatermarks;
-
- /**
- * A queue of pending updates to the state of this {@link InMemoryWatermarkManager}.
- */
- private final ConcurrentLinkedQueue<PendingWatermarkUpdate> pendingUpdates;
-
- /**
- * A queue of pending {@link AppliedPTransform AppliedPTransforms} that have potentially
- * stale data.
- */
- private final ConcurrentLinkedQueue<AppliedPTransform<?, ?, ?>> pendingRefreshes;
-
- /**
- * Creates a new {@link InMemoryWatermarkManager}. All watermarks within the newly created
- * {@link InMemoryWatermarkManager} start at {@link BoundedWindow#TIMESTAMP_MIN_VALUE}, the
- * minimum watermark, with no watermark holds or pending elements.
- *
- * @param rootTransforms the root-level transforms of the {@link Pipeline}
- * @param consumers a mapping between each {@link PCollection} in the {@link Pipeline} to the
- * transforms that consume it as a part of their input
- */
- public static InMemoryWatermarkManager create(
- Clock clock,
- Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
- Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
- return new InMemoryWatermarkManager(clock, rootTransforms, consumers);
- }
-
- private InMemoryWatermarkManager(
- Clock clock,
- Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
- Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> consumers) {
- this.clock = clock;
- this.consumers = consumers;
- this.pendingUpdates = new ConcurrentLinkedQueue<>();
- this.pendingRefreshes = new ConcurrentLinkedQueue<>();
-
- transformToWatermarks = new HashMap<>();
-
- for (AppliedPTransform<?, ?, ?> rootTransform : rootTransforms) {
- getTransformWatermark(rootTransform);
- }
- for (Collection<AppliedPTransform<?, ?, ?>> intermediateTransforms : consumers.values()) {
- for (AppliedPTransform<?, ?, ?> transform : intermediateTransforms) {
- getTransformWatermark(transform);
- }
- }
- }
-
- private TransformWatermarks getTransformWatermark(AppliedPTransform<?, ?, ?> transform) {
- TransformWatermarks wms = transformToWatermarks.get(transform);
- if (wms == null) {
- List<Watermark> inputCollectionWatermarks = getInputWatermarks(transform);
- AppliedPTransformInputWatermark inputWatermark =
- new AppliedPTransformInputWatermark(inputCollectionWatermarks);
- AppliedPTransformOutputWatermark outputWatermark =
- new AppliedPTransformOutputWatermark(inputWatermark);
-
- SynchronizedProcessingTimeInputWatermark inputProcessingWatermark =
- new SynchronizedProcessingTimeInputWatermark(getInputProcessingWatermarks(transform));
- SynchronizedProcessingTimeOutputWatermark outputProcessingWatermark =
- new SynchronizedProcessingTimeOutputWatermark(inputProcessingWatermark);
-
- wms =
- new TransformWatermarks(
- inputWatermark, outputWatermark, inputProcessingWatermark, outputProcessingWatermark);
- transformToWatermarks.put(transform, wms);
- }
- return wms;
- }
-
- private Collection<Watermark> getInputProcessingWatermarks(
- AppliedPTransform<?, ?, ?> transform) {
- ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder();
- Collection<? extends PValue> inputs = transform.getInput().expand();
- if (inputs.isEmpty()) {
- inputWmsBuilder.add(THE_END_OF_TIME);
- }
- for (PValue pvalue : inputs) {
- Watermark producerOutputWatermark =
- getTransformWatermark(pvalue.getProducingTransformInternal())
- .synchronizedProcessingOutputWatermark;
- inputWmsBuilder.add(producerOutputWatermark);
- }
- return inputWmsBuilder.build();
- }
-
- private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) {
- ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder();
- Collection<? extends PValue> inputs = transform.getInput().expand();
- if (inputs.isEmpty()) {
- inputWatermarksBuilder.add(THE_END_OF_TIME);
- }
- for (PValue pvalue : inputs) {
- Watermark producerOutputWatermark =
- getTransformWatermark(pvalue.getProducingTransformInternal()).outputWatermark;
- inputWatermarksBuilder.add(producerOutputWatermark);
- }
- List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build();
- return inputCollectionWatermarks;
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////
-
- /**
- * Gets the input and output watermarks for an {@link AppliedPTransform}. If the
- * {@link AppliedPTransform PTransform} has not processed any elements, return a watermark of
- * {@link BoundedWindow#TIMESTAMP_MIN_VALUE}.
- *
- * @return a snapshot of the input watermark and output watermark for the provided transform
- */
- public TransformWatermarks getWatermarks(AppliedPTransform<?, ?, ?> transform) {
- return transformToWatermarks.get(transform);
- }
-
- /**
- * Updates the watermarks of a transform with one or more inputs.
- *
- * <p>Each transform has two monotonically increasing watermarks: the input watermark, which can,
- * at any time, be updated to equal:
- * <pre>
- * MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks))
- * </pre>
- * and the output watermark, which can, at any time, be updated to equal:
- * <pre>
- * MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
- * </pre>.
- *
- * @param completed the input that has completed
- * @param timerUpdate the timers that were added, removed, and completed as part of producing
- * this update
- * @param result the result that was produced by processing the input
- * @param earliestHold the earliest watermark hold in the transform's state. {@code null} if there
- * is no hold
- */
- public void updateWatermarks(
- @Nullable CommittedBundle<?> completed,
- TimerUpdate timerUpdate,
- CommittedResult result,
- Instant earliestHold) {
- pendingUpdates.offer(PendingWatermarkUpdate.create(completed,
- timerUpdate,
- result,
- earliestHold));
- }
-
- /**
- * Applies all pending updates to this {@link InMemoryWatermarkManager}, causing the pending state
- * of all {@link TransformWatermarks} to be advanced as far as possible.
- */
- private void applyPendingUpdates() {
- Set<AppliedPTransform<?, ?, ?>> updatedTransforms = new HashSet<>();
- PendingWatermarkUpdate pending = pendingUpdates.poll();
- while (pending != null) {
- applyPendingUpdate(pending);
- updatedTransforms.add(pending.getTransform());
- pending = pendingUpdates.poll();
- }
- pendingRefreshes.addAll(updatedTransforms);
- }
-
- private void applyPendingUpdate(PendingWatermarkUpdate pending) {
- CommittedResult result = pending.getResult();
- AppliedPTransform transform = result.getTransform();
- CommittedBundle<?> inputBundle = pending.getInputBundle();
-
- updatePending(inputBundle, pending.getTimerUpdate(), result);
-
- TransformWatermarks transformWms = transformToWatermarks.get(transform);
- transformWms.setEventTimeHold(inputBundle == null ? null : inputBundle.getKey(),
- pending.getEarliestHold());
- }
-
- /**
- * First adds all produced elements to the queue of pending elements for each consumer, then adds
- * all pending timers to the collection of pending timers, then removes all completed and deleted
- * timers from the collection of pending timers, then removes all completed elements from the
- * pending queue of the transform.
- *
- * <p>It is required that all newly pending elements are added to the queue of pending elements
- * for each consumer prior to the completed elements being removed, as doing otherwise could cause
- * a Watermark to appear in a state in which the upstream (completed) element does not hold the
- * watermark but the element it produced is not yet pending. This can cause the watermark to
- * erroneously advance.
- */
- private void updatePending(
- CommittedBundle<?> input,
- TimerUpdate timerUpdate,
- CommittedResult result) {
- // Newly pending elements must be added before completed elements are removed, as the two
- // do not share a Mutex within this call and thus can be interleaved with external calls to
- // refresh.
- for (CommittedBundle<?> bundle : result.getOutputs()) {
- for (AppliedPTransform<?, ?, ?> consumer : consumers.get(bundle.getPCollection())) {
- TransformWatermarks watermarks = transformToWatermarks.get(consumer);
- watermarks.addPending(bundle);
- }
- }
-
- TransformWatermarks completedTransform = transformToWatermarks.get(result.getTransform());
- if (input != null) {
- // Add the unprocessed inputs
- completedTransform.addPending(result.getUnprocessedInputs());
- }
- completedTransform.updateTimers(timerUpdate);
- if (input != null) {
- completedTransform.removePending(input);
- }
- }
-
- /**
- * Refresh the watermarks contained within this {@link InMemoryWatermarkManager}, causing all
- * watermarks to be advanced as far as possible.
- */
- synchronized void refreshAll() {
- applyPendingUpdates();
- while (!pendingRefreshes.isEmpty()) {
- refreshWatermarks(pendingRefreshes.poll());
- }
- }
-
- private void refreshWatermarks(AppliedPTransform<?, ?, ?> toRefresh) {
- TransformWatermarks myWatermarks = transformToWatermarks.get(toRefresh);
- WatermarkUpdate updateResult = myWatermarks.refresh();
- Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>();
- if (updateResult.isAdvanced()) {
- for (PValue outputPValue : toRefresh.getOutput().expand()) {
- additionalRefreshes.addAll(consumers.get(outputPValue));
- }
- }
- pendingRefreshes.addAll(additionalRefreshes);
- }
-
- /**
- * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the
- * pending timers will be removed from this {@link InMemoryWatermarkManager}.
- */
- public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() {
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> allTimers = new HashMap<>();
- for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
- transformToWatermarks.entrySet()) {
- Map<StructuralKey<?>, FiredTimers> keyFiredTimers =
- watermarksEntry.getValue().extractFiredTimers();
- if (!keyFiredTimers.isEmpty()) {
- allTimers.put(watermarksEntry.getKey(), keyFiredTimers);
- }
- }
- return allTimers;
- }
-
- /**
- * A (key, Instant) pair that holds the watermark. Holds are per-key, but the watermark is global,
- * and as such the watermark manager must track holds and the release of holds on a per-key basis.
- *
- * <p>The {@link #compareTo(KeyedHold)} method of {@link KeyedHold} is not consistent with equals,
- * as the key is arbitrarily ordered via identity, rather than object equality.
- */
- private static final class KeyedHold implements Comparable<KeyedHold> {
- private static final Ordering<Object> KEY_ORDERING = Ordering.arbitrary().nullsLast();
-
- private final Object key;
- private final Instant timestamp;
-
- /**
- * Create a new KeyedHold with the specified key and timestamp.
- */
- public static KeyedHold of(Object key, Instant timestamp) {
- return new KeyedHold(key, MoreObjects.firstNonNull(timestamp, THE_END_OF_TIME.get()));
- }
-
- private KeyedHold(Object key, Instant timestamp) {
- this.key = key;
- this.timestamp = timestamp;
- }
-
- @Override
- public int compareTo(KeyedHold that) {
- return ComparisonChain.start()
- .compare(this.timestamp, that.timestamp)
- .compare(this.key, that.key, KEY_ORDERING)
- .result();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(timestamp, key);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof KeyedHold)) {
- return false;
- }
- KeyedHold that = (KeyedHold) other;
- return Objects.equals(this.timestamp, that.timestamp) && Objects.equals(this.key, that.key);
- }
-
- /**
- * Get the value of this {@link KeyedHold}.
- */
- public Instant getTimestamp() {
- return timestamp;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(KeyedHold.class)
- .add("key", key)
- .add("hold", timestamp)
- .toString();
- }
- }
-
- private static class PerKeyHolds {
- private final Map<Object, KeyedHold> keyedHolds;
- private final PriorityQueue<KeyedHold> allHolds;
-
- private PerKeyHolds() {
- this.keyedHolds = new HashMap<>();
- this.allHolds = new PriorityQueue<>();
- }
-
- /**
- * Gets the minimum hold across all keys in this {@link PerKeyHolds}, or THE_END_OF_TIME if
- * there are no holds within this {@link PerKeyHolds}.
- */
- public Instant getMinHold() {
- return allHolds.isEmpty() ? THE_END_OF_TIME.get() : allHolds.peek().getTimestamp();
- }
-
- /**
- * Updates the hold of the provided key to the provided value, removing any other holds for
- * the same key.
- */
- public void updateHold(@Nullable Object key, Instant newHold) {
- removeHold(key);
- KeyedHold newKeyedHold = KeyedHold.of(key, newHold);
- keyedHolds.put(key, newKeyedHold);
- allHolds.offer(newKeyedHold);
- }
-
- /**
- * Removes the hold of the provided key.
- */
- public void removeHold(Object key) {
- KeyedHold oldHold = keyedHolds.get(key);
- if (oldHold != null) {
- allHolds.remove(oldHold);
- }
- }
- }
-
- /**
- * A reference to the input and output watermarks of an {@link AppliedPTransform}.
- */
- public class TransformWatermarks {
- private final AppliedPTransformInputWatermark inputWatermark;
- private final AppliedPTransformOutputWatermark outputWatermark;
-
- private final SynchronizedProcessingTimeInputWatermark synchronizedProcessingInputWatermark;
- private final SynchronizedProcessingTimeOutputWatermark synchronizedProcessingOutputWatermark;
-
- private Instant latestSynchronizedInputWm;
- private Instant latestSynchronizedOutputWm;
-
- private TransformWatermarks(
- AppliedPTransformInputWatermark inputWatermark,
- AppliedPTransformOutputWatermark outputWatermark,
- SynchronizedProcessingTimeInputWatermark inputSynchProcessingWatermark,
- SynchronizedProcessingTimeOutputWatermark outputSynchProcessingWatermark) {
- this.inputWatermark = inputWatermark;
- this.outputWatermark = outputWatermark;
-
- this.synchronizedProcessingInputWatermark = inputSynchProcessingWatermark;
- this.synchronizedProcessingOutputWatermark = outputSynchProcessingWatermark;
- this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
- this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
- }
-
- /**
- * Returns the input watermark of the {@link AppliedPTransform}.
- */
- public Instant getInputWatermark() {
- return Preconditions.checkNotNull(inputWatermark.get());
- }
-
- /**
- * Returns the output watermark of the {@link AppliedPTransform}.
- */
- public Instant getOutputWatermark() {
- return outputWatermark.get();
- }
-
- /**
- * Returns the synchronized processing input time of the {@link AppliedPTransform}.
- *
- * <p>The returned value is guaranteed to be monotonically increasing, and outside of the
- * presence of holds, will increase as the system time progresses.
- */
- public synchronized Instant getSynchronizedProcessingInputTime() {
- latestSynchronizedInputWm = INSTANT_ORDERING.max(
- latestSynchronizedInputWm,
- INSTANT_ORDERING.min(clock.now(), synchronizedProcessingInputWatermark.get()));
- return latestSynchronizedInputWm;
- }
-
- /**
- * Returns the synchronized processing output time of the {@link AppliedPTransform}.
- *
- * <p>The returned value is guaranteed to be monotonically increasing, and outside of the
- * presence of holds, will increase as the system time progresses.
- */
- public synchronized Instant getSynchronizedProcessingOutputTime() {
- latestSynchronizedOutputWm = INSTANT_ORDERING.max(
- latestSynchronizedOutputWm,
- INSTANT_ORDERING.min(clock.now(), synchronizedProcessingOutputWatermark.get()));
- return latestSynchronizedOutputWm;
- }
-
- private WatermarkUpdate refresh() {
- inputWatermark.refresh();
- synchronizedProcessingInputWatermark.refresh();
- WatermarkUpdate eventOutputUpdate = outputWatermark.refresh();
- WatermarkUpdate syncOutputUpdate = synchronizedProcessingOutputWatermark.refresh();
- return eventOutputUpdate.union(syncOutputUpdate);
- }
-
- private void setEventTimeHold(Object key, Instant newHold) {
- outputWatermark.updateHold(key, newHold);
- }
-
- private void removePending(CommittedBundle<?> bundle) {
- inputWatermark.removePendingElements(elementsFromBundle(bundle));
- synchronizedProcessingInputWatermark.removePending(bundle);
- }
-
- private void addPending(CommittedBundle<?> bundle) {
- inputWatermark.addPendingElements(elementsFromBundle(bundle));
- synchronizedProcessingInputWatermark.addPending(bundle);
- }
-
- private Iterable<? extends WindowedValue<?>> elementsFromBundle(CommittedBundle<?> bundle) {
- return FluentIterable.from(bundle.getElements()).transformAndConcat(EXPLODE_WINDOWS_FN);
- }
-
- private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() {
- Map<StructuralKey<?>, List<TimerData>> eventTimeTimers =
- inputWatermark.extractFiredEventTimeTimers();
- Map<StructuralKey<?>, List<TimerData>> processingTimers;
- Map<StructuralKey<?>, List<TimerData>> synchronizedTimers;
- if (inputWatermark.get().equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
- processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
- TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
- synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
- TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
- } else {
- processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
- TimeDomain.PROCESSING_TIME, clock.now());
- synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
- TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime());
- }
- Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>();
- groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers);
-
- Map<StructuralKey<?>, FiredTimers> keyFiredTimers = new HashMap<>();
- for (Map.Entry<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> firedTimers :
- groupedTimers.entrySet()) {
- keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue()));
- }
- return keyFiredTimers;
- }
-
- @SafeVarargs
- private final void groupFiredTimers(
- Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedToMutate,
- Map<StructuralKey<?>, List<TimerData>>... timersToGroup) {
- for (Map<StructuralKey<?>, List<TimerData>> subGroup : timersToGroup) {
- for (Map.Entry<StructuralKey<?>, List<TimerData>> newTimers : subGroup.entrySet()) {
- Map<TimeDomain, List<TimerData>> grouped = groupedToMutate.get(newTimers.getKey());
- if (grouped == null) {
- grouped = new HashMap<>();
- groupedToMutate.put(newTimers.getKey(), grouped);
- }
- grouped.put(newTimers.getValue().get(0).getDomain(), newTimers.getValue());
- }
- }
- }
-
- private void updateTimers(TimerUpdate update) {
- inputWatermark.updateTimers(update);
- synchronizedProcessingInputWatermark.updateTimers(update);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(TransformWatermarks.class)
- .add("inputWatermark", inputWatermark)
- .add("outputWatermark", outputWatermark)
- .add("inputProcessingTime", synchronizedProcessingInputWatermark)
- .add("outputProcessingTime", synchronizedProcessingOutputWatermark)
- .toString();
- }
- }
-
- /**
- * A collection of newly set, deleted, and completed timers.
- *
- * <p>setTimers and deletedTimers are collections of {@link TimerData} that have been added to the
- * {@link TimerInternals} of an executed step. completedTimers are timers that were delivered as
- * the input to the executed step.
- */
- public static class TimerUpdate {
- private final StructuralKey<?> key;
- private final Iterable<? extends TimerData> completedTimers;
-
- private final Iterable<? extends TimerData> setTimers;
- private final Iterable<? extends TimerData> deletedTimers;
-
- /**
- * Returns a TimerUpdate for a null key with no timers.
- */
- public static TimerUpdate empty() {
- return new TimerUpdate(
- null,
- Collections.<TimerData>emptyList(),
- Collections.<TimerData>emptyList(),
- Collections.<TimerData>emptyList());
- }
-
- /**
- * Creates a new {@link TimerUpdate} builder with the provided completed timers that needs the
- * set and deleted timers to be added to it.
- */
- public static TimerUpdateBuilder builder(StructuralKey<?> key) {
- return new TimerUpdateBuilder(key);
- }
-
- /**
- * A {@link TimerUpdate} builder that needs to be provided with set timers and deleted timers.
- */
- public static final class TimerUpdateBuilder {
- private final StructuralKey<?> key;
- private final Collection<TimerData> completedTimers;
- private final Collection<TimerData> setTimers;
- private final Collection<TimerData> deletedTimers;
-
- private TimerUpdateBuilder(StructuralKey<?> key) {
- this.key = key;
- this.completedTimers = new HashSet<>();
- this.setTimers = new HashSet<>();
- this.deletedTimers = new HashSet<>();
- }
-
- /**
- * Adds all of the provided timers to the collection of completed timers, and returns this
- * {@link TimerUpdateBuilder}.
- */
- public TimerUpdateBuilder withCompletedTimers(Iterable<TimerData> completedTimers) {
- Iterables.addAll(this.completedTimers, completedTimers);
- return this;
- }
-
- /**
- * Adds the provided timer to the collection of set timers, removing it from deleted timers if
- * it has previously been deleted. Returns this {@link TimerUpdateBuilder}.
- */
- public TimerUpdateBuilder setTimer(TimerData setTimer) {
- deletedTimers.remove(setTimer);
- setTimers.add(setTimer);
- return this;
- }
-
- /**
- * Adds the provided timer to the collection of deleted timers, removing it from set timers if
- * it has previously been set. Returns this {@link TimerUpdateBuilder}.
- */
- public TimerUpdateBuilder deletedTimer(TimerData deletedTimer) {
- deletedTimers.add(deletedTimer);
- setTimers.remove(deletedTimer);
- return this;
- }
-
- /**
- * Returns a new {@link TimerUpdate} with the most recently set completedTimers, setTimers,
- * and deletedTimers.
- */
- public TimerUpdate build() {
- return new TimerUpdate(
- key,
- ImmutableSet.copyOf(completedTimers),
- ImmutableSet.copyOf(setTimers),
- ImmutableSet.copyOf(deletedTimers));
- }
- }
-
- private TimerUpdate(
- StructuralKey<?> key,
- Iterable<? extends TimerData> completedTimers,
- Iterable<? extends TimerData> setTimers,
- Iterable<? extends TimerData> deletedTimers) {
- this.key = key;
- this.completedTimers = completedTimers;
- this.setTimers = setTimers;
- this.deletedTimers = deletedTimers;
- }
-
- @VisibleForTesting
- StructuralKey<?> getKey() {
- return key;
- }
-
- @VisibleForTesting
- Iterable<? extends TimerData> getCompletedTimers() {
- return completedTimers;
- }
-
- @VisibleForTesting
- Iterable<? extends TimerData> getSetTimers() {
- return setTimers;
- }
-
- @VisibleForTesting
- Iterable<? extends TimerData> getDeletedTimers() {
- return deletedTimers;
- }
-
- /**
- * Returns a {@link TimerUpdate} that is like this one, but with the specified completed timers.
- */
- public TimerUpdate withCompletedTimers(Iterable<TimerData> completedTimers) {
- return new TimerUpdate(this.key, completedTimers, setTimers, deletedTimers);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(key, completedTimers, setTimers, deletedTimers);
- }
-
- @Override
- public boolean equals(Object other) {
- if (other == null || !(other instanceof TimerUpdate)) {
- return false;
- }
- TimerUpdate that = (TimerUpdate) other;
- return Objects.equals(this.key, that.key)
- && Objects.equals(this.completedTimers, that.completedTimers)
- && Objects.equals(this.setTimers, that.setTimers)
- && Objects.equals(this.deletedTimers, that.deletedTimers);
- }
- }
-
- /**
- * A pair of {@link TimerData} and key which can be delivered to the appropriate
- * {@link AppliedPTransform}. A timer fires at the transform that set it with a specific key when
- * the time domain in which it lives progresses past a specified time, as determined by the
- * {@link InMemoryWatermarkManager}.
- */
- public static class FiredTimers {
- private final Map<TimeDomain, ? extends Collection<TimerData>> timers;
-
- private FiredTimers(Map<TimeDomain, ? extends Collection<TimerData>> timers) {
- this.timers = timers;
- }
-
- /**
- * Gets all of the timers that have fired within the provided {@link TimeDomain}. If no timers
- * fired within the provided domain, return an empty collection.
- *
- * <p>Timers within a {@link TimeDomain} are guaranteed to be in order of increasing timestamp.
- */
- public Collection<TimerData> getTimers(TimeDomain domain) {
- Collection<TimerData> domainTimers = timers.get(domain);
- if (domainTimers == null) {
- return Collections.emptyList();
- }
- return domainTimers;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(FiredTimers.class).add("timers", timers).toString();
- }
- }
-
- private static class WindowedValueByTimestampComparator extends Ordering<WindowedValue<?>> {
- @Override
- public int compare(WindowedValue<?> o1, WindowedValue<?> o2) {
- return ComparisonChain.start()
- .compare(o1.getTimestamp(), o2.getTimestamp())
- .result();
- }
- }
-
- public Set<AppliedPTransform<?, ?, ?>> getCompletedTransforms() {
- Set<AppliedPTransform<?, ?, ?>> result = new HashSet<>();
- for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> wms :
- transformToWatermarks.entrySet()) {
- if (wms.getValue().getOutputWatermark().equals(THE_END_OF_TIME.get())) {
- result.add(wms.getKey());
- }
- }
- return result;
- }
-
- @AutoValue
- abstract static class PendingWatermarkUpdate {
- @Nullable
- public abstract CommittedBundle<?> getInputBundle();
- public abstract TimerUpdate getTimerUpdate();
- public abstract CommittedResult getResult();
- public abstract Instant getEarliestHold();
-
- /**
- * Gets the {@link AppliedPTransform} that generated this result.
- */
- public AppliedPTransform<?, ?, ?> getTransform() {
- return getResult().getTransform();
- }
-
- public static PendingWatermarkUpdate create(
- CommittedBundle<?> inputBundle,
- TimerUpdate timerUpdate,
- CommittedResult result, Instant earliestHold) {
- return new AutoValue_InMemoryWatermarkManager_PendingWatermarkUpdate(inputBundle,
- timerUpdate,
- result,
- earliestHold);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
deleted file mode 100644
index 0c7449c..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-
-/**
- * A factory that produces bundles that perform no additional validation.
- */
-class InProcessBundleFactory implements BundleFactory {
- public static InProcessBundleFactory create() {
- return new InProcessBundleFactory();
- }
-
- private InProcessBundleFactory() {}
-
- @Override
- public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
- return InProcessBundle.create(output, StructuralKey.of(null, VoidCoder.of()));
- }
-
- @Override
- public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
- return InProcessBundle.create(output, input.getKey());
- }
-
- @Override
- public <K, T> UncommittedBundle<T> createKeyedBundle(
- CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
- return InProcessBundle.create(output, key);
- }
-
- /**
- * A {@link UncommittedBundle} that buffers elements in memory.
- */
- private static final class InProcessBundle<T> implements UncommittedBundle<T> {
- private final PCollection<T> pcollection;
- private final StructuralKey<?> key;
- private boolean committed = false;
- private ImmutableList.Builder<WindowedValue<T>> elements;
-
- /**
- * Create a new {@link InProcessBundle} for the specified {@link PCollection}.
- */
- public static <T> InProcessBundle<T> create(PCollection<T> pcollection, StructuralKey<?> key) {
- return new InProcessBundle<>(pcollection, key);
- }
-
- private InProcessBundle(PCollection<T> pcollection, StructuralKey<?> key) {
- this.pcollection = pcollection;
- this.key = key;
- this.elements = ImmutableList.builder();
- }
-
- @Override
- public PCollection<T> getPCollection() {
- return pcollection;
- }
-
- @Override
- public InProcessBundle<T> add(WindowedValue<T> element) {
- checkState(
- !committed,
- "Can't add element %s to committed bundle in PCollection %s",
- element,
- pcollection);
- elements.add(element);
- return this;
- }
-
- @Override
- public CommittedBundle<T> commit(final Instant synchronizedCompletionTime) {
- checkState(!committed, "Can't commit already committed bundle %s", this);
- committed = true;
- final Iterable<WindowedValue<T>> committedElements = elements.build();
- return new CommittedInProcessBundle<>(
- pcollection, key, committedElements, synchronizedCompletionTime);
- }
- }
-
- private static class CommittedInProcessBundle<T> implements CommittedBundle<T> {
- public CommittedInProcessBundle(
- PCollection<T> pcollection,
- StructuralKey<?> key,
- Iterable<WindowedValue<T>> committedElements,
- Instant synchronizedCompletionTime) {
- this.pcollection = pcollection;
- this.key = key;
- this.committedElements = committedElements;
- this.synchronizedCompletionTime = synchronizedCompletionTime;
- }
-
- private final PCollection<T> pcollection;
- /** The structural value key of the Bundle, as specified by the coder that created it. */
- private final StructuralKey<?> key;
- private final Iterable<WindowedValue<T>> committedElements;
- private final Instant synchronizedCompletionTime;
-
- @Override
- public StructuralKey<?> getKey() {
- return key;
- }
-
- @Override
- public Iterable<WindowedValue<T>> getElements() {
- return committedElements;
- }
-
- @Override
- public PCollection<T> getPCollection() {
- return pcollection;
- }
-
- @Override
- public Instant getSynchronizedProcessingOutputWatermark() {
- return synchronizedCompletionTime;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .omitNullValues()
- .add("pcollection", pcollection)
- .add("key", key)
- .add("elements", committedElements)
- .toString();
- }
-
- @Override
- public CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements) {
- return new CommittedInProcessBundle<>(
- pcollection, key, ImmutableList.copyOf(elements), synchronizedCompletionTime);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
deleted file mode 100644
index bd07040..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-import java.util.Map;
-
-/**
- * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the
- * {@link DirectRunner}.
- */
-public class InProcessBundleOutputManager implements OutputManager {
- private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
-
- public static InProcessBundleOutputManager create(
- Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
- return new InProcessBundleOutputManager(outputBundles);
- }
-
- public InProcessBundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
- this.bundles = bundles;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
- @SuppressWarnings("rawtypes")
- UncommittedBundle bundle = bundles.get(tag);
- bundle.add(output);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
deleted file mode 100644
index 220ff83..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
-import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
-import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Trigger;
-import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.ReadyCheckingSideInputReader;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PValue;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
-
-/**
- * The evaluation context for a specific pipeline being executed by the
- * {@link DirectRunner}. Contains state shared within the execution across all
- * transforms.
- *
- * <p>{@link InProcessEvaluationContext} contains shared state for an execution of the
- * {@link DirectRunner} that can be used while evaluating a {@link PTransform}. This
- * consists of views into underlying state and watermark implementations, access to read and write
- * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and
- * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
- * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and
- * known to be empty).
- *
- * <p>{@link InProcessEvaluationContext} also handles results by committing finalizing bundles based
- * on the current global state and updating the global state appropriately. This includes updating
- * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that
- * can be executed.
- */
-class InProcessEvaluationContext {
- /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */
- private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
-
- /** The options that were used to create this {@link Pipeline}. */
- private final DirectOptions options;
-
- private final BundleFactory bundleFactory;
- /** The current processing time and event time watermarks and timers. */
- private final InMemoryWatermarkManager watermarkManager;
-
- /** Executes callbacks based on the progression of the watermark. */
- private final WatermarkCallbackExecutor callbackExecutor;
-
- /** The stateInternals of the world, by applied PTransform and key. */
- private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>>
- applicationStateInternals;
-
- private final InProcessSideInputContainer sideInputContainer;
-
- private final CounterSet mergedCounters;
-
- public static InProcessEvaluationContext create(
- DirectOptions options,
- BundleFactory bundleFactory,
- Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
- Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
- Map<AppliedPTransform<?, ?, ?>, String> stepNames,
- Collection<PCollectionView<?>> views) {
- return new InProcessEvaluationContext(
- options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
- }
-
- private InProcessEvaluationContext(
- DirectOptions options,
- BundleFactory bundleFactory,
- Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
- Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
- Map<AppliedPTransform<?, ?, ?>, String> stepNames,
- Collection<PCollectionView<?>> views) {
- this.options = checkNotNull(options);
- this.bundleFactory = checkNotNull(bundleFactory);
- checkNotNull(rootTransforms);
- checkNotNull(valueToConsumers);
- checkNotNull(stepNames);
- checkNotNull(views);
- this.stepNames = stepNames;
-
- this.watermarkManager =
- InMemoryWatermarkManager.create(
- NanosOffsetClock.create(), rootTransforms, valueToConsumers);
- this.sideInputContainer = InProcessSideInputContainer.create(this, views);
-
- this.applicationStateInternals = new ConcurrentHashMap<>();
- this.mergedCounters = new CounterSet();
-
- this.callbackExecutor =
- WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
- }
-
- /**
- * Handle the provided {@link InProcessTransformResult}, produced after evaluating the provided
- * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}).
- *
- * <p>The result is the output of running the transform contained in the
- * {@link InProcessTransformResult} on the contents of the provided bundle.
- *
- * @param completedBundle the bundle that was processed to produce the result. Potentially
- * {@code null} if the transform that produced the result is a root
- * transform
- * @param completedTimers the timers that were delivered to produce the {@code completedBundle},
- * or an empty iterable if no timers were delivered
- * @param result the result of evaluating the input bundle
- * @return the committed bundles contained within the handled {@code result}
- */
- public CommittedResult handleResult(
- @Nullable CommittedBundle<?> completedBundle,
- Iterable<TimerData> completedTimers,
- InProcessTransformResult result) {
- Iterable<? extends CommittedBundle<?>> committedBundles =
- commitBundles(result.getOutputBundles());
- // Update watermarks and timers
- CommittedResult committedResult = CommittedResult.create(result,
- completedBundle == null
- ? null
- : completedBundle.withElements((Iterable) result.getUnprocessedElements()),
- committedBundles);
- watermarkManager.updateWatermarks(
- completedBundle,
- result.getTimerUpdate().withCompletedTimers(completedTimers),
- committedResult,
- result.getWatermarkHold());
- // Update counters
- if (result.getCounters() != null) {
- mergedCounters.merge(result.getCounters());
- }
- // Update state internals
- CopyOnAccessInMemoryStateInternals<?> theirState = result.getState();
- if (theirState != null) {
- CopyOnAccessInMemoryStateInternals<?> committedState = theirState.commit();
- StepAndKey stepAndKey =
- StepAndKey.of(
- result.getTransform(), completedBundle == null ? null : completedBundle.getKey());
- if (!committedState.isEmpty()) {
- applicationStateInternals.put(stepAndKey, committedState);
- } else {
- applicationStateInternals.remove(stepAndKey);
- }
- }
- return committedResult;
- }
-
- private Iterable<? extends CommittedBundle<?>> commitBundles(
- Iterable<? extends UncommittedBundle<?>> bundles) {
- ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder();
- for (UncommittedBundle<?> inProgress : bundles) {
- AppliedPTransform<?, ?, ?> producing =
- inProgress.getPCollection().getProducingTransformInternal();
- TransformWatermarks watermarks = watermarkManager.getWatermarks(producing);
- CommittedBundle<?> committed =
- inProgress.commit(watermarks.getSynchronizedProcessingOutputTime());
- // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so
- // filter them out
- if (!Iterables.isEmpty(committed.getElements())) {
- completed.add(committed);
- }
- }
- return completed.build();
- }
-
- private void fireAllAvailableCallbacks() {
- for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
- fireAvailableCallbacks(transform);
- }
- }
-
- private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransform) {
- TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform);
- callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark());
- }
-
- /**
- * Create a {@link UncommittedBundle} for use by a source.
- */
- public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
- return bundleFactory.createRootBundle(output);
- }
-
- /**
- * Create a {@link UncommittedBundle} whose elements belong to the specified {@link
- * PCollection}.
- */
- public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
- return bundleFactory.createBundle(input, output);
- }
-
- /**
- * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by
- * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}.
- */
- public <K, T> UncommittedBundle<T> createKeyedBundle(
- CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
- return bundleFactory.createKeyedBundle(input, key, output);
- }
-
- /**
- * Create a {@link PCollectionViewWriter}, whose elements will be used in the provided
- * {@link PCollectionView}.
- */
- public <ElemT, ViewT> PCollectionViewWriter<ElemT, ViewT> createPCollectionViewWriter(
- PCollection<Iterable<ElemT>> input, final PCollectionView<ViewT> output) {
- return new PCollectionViewWriter<ElemT, ViewT>() {
- @Override
- public void add(Iterable<WindowedValue<ElemT>> values) {
- sideInputContainer.write(output, values);
- }
- };
- }
-
- /**
- * Schedule a callback to be executed after output would be produced for the given window
- * if there had been input.
- *
- * <p>Output would be produced when the watermark for a {@link PValue} passes the point at
- * which the trigger for the specified window (with the specified windowing strategy) must have
- * fired from the perspective of that {@link PValue}, as specified by the value of
- * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the
- * {@link WindowingStrategy}. When the callback has fired, either values will have been produced
- * for a key in that window, the window is empty, or all elements in the window are late. The
- * callback will be executed regardless of whether values have been produced.
- */
- public void scheduleAfterOutputWouldBeProduced(
- PValue value,
- BoundedWindow window,
- WindowingStrategy<?, ?> windowingStrategy,
- Runnable runnable) {
- AppliedPTransform<?, ?, ?> producing = getProducing(value);
- callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable);
-
- fireAvailableCallbacks(lookupProducing(value));
- }
-
- private AppliedPTransform<?, ?, ?> getProducing(PValue value) {
- if (value.getProducingTransformInternal() != null) {
- return value.getProducingTransformInternal();
- }
- return lookupProducing(value);
- }
-
- private AppliedPTransform<?, ?, ?> lookupProducing(PValue value) {
- for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
- if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) {
- return transform;
- }
- }
- return null;
- }
-
- /**
- * Get the options used by this {@link Pipeline}.
- */
- public DirectOptions getPipelineOptions() {
- return options;
- }
-
- /**
- * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key.
- */
- public InProcessExecutionContext getExecutionContext(
- AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) {
- StepAndKey stepAndKey = StepAndKey.of(application, key);
- return new InProcessExecutionContext(
- options.getClock(),
- key,
- (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
- watermarkManager.getWatermarks(application));
- }
-
- /**
- * Get all of the steps used in this {@link Pipeline}.
- */
- public Collection<AppliedPTransform<?, ?, ?>> getSteps() {
- return stepNames.keySet();
- }
-
- /**
- * Get the Step Name for the provided application.
- */
- public String getStepName(AppliedPTransform<?, ?, ?> application) {
- return stepNames.get(application);
- }
-
- /**
- * Returns a {@link ReadyCheckingSideInputReader} capable of reading the provided
- * {@link PCollectionView PCollectionViews}.
- *
- * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to
- * read
- * @return a {@link SideInputReader} that can read all of the provided {@link PCollectionView
- * PCollectionViews}
- */
- public ReadyCheckingSideInputReader createSideInputReader(
- final List<PCollectionView<?>> sideInputs) {
- return sideInputContainer.createReaderForViews(sideInputs);
- }
-
-
- /**
- * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
- * of all other {@link CounterSet CounterSets} created by this call.
- *
- * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in
- * all created {@link CounterSet CounterSets} when the transforms that call this method
- * complete.
- */
- public CounterSet createCounterSet() {
- return new CounterSet();
- }
-
- /**
- * Returns all of the counters that have been merged into this context via calls to
- * {@link CounterSet#merge(CounterSet)}.
- */
- public CounterSet getCounters() {
- return mergedCounters;
- }
-
- @VisibleForTesting
- void forceRefresh() {
- watermarkManager.refreshAll();
- fireAllAvailableCallbacks();
- }
-
- /**
- * Extracts all timers that have been fired and have not already been extracted.
- *
- * <p>This is a destructive operation. Timers will only appear in the result of this method once
- * for each time they are set.
- */
- public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() {
- forceRefresh();
- Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired =
- watermarkManager.extractFiredTimers();
- return fired;
- }
-
- /**
- * Returns true if the step will not produce additional output.
- *
- * <p>If the provided transform produces only {@link IsBounded#BOUNDED}
- * {@link PCollection PCollections}, returns true if the watermark is at
- * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}.
- *
- * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED}
- * {@link PCollection PCollections}, returns the value of
- * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
- */
- public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
- // if the PTransform's watermark isn't at the max value, it isn't done
- if (watermarkManager
- .getWatermarks(transform)
- .getOutputWatermark()
- .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
- return false;
- }
- // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down,
- // the PTransform may produce additional output. It is not done.
- for (PValue output : transform.getOutput().expand()) {
- if (output instanceof PCollection) {
- IsBounded bounded = ((PCollection<?>) output).isBounded();
- if (bounded.equals(IsBounded.UNBOUNDED)
- && !options.isShutdownUnboundedProducersWithMaxWatermark()) {
- return false;
- }
- }
- }
- // The PTransform's watermark was at positive infinity and all of its outputs are known to be
- // done. It is done.
- return true;
- }
-
- /**
- * Returns true if all steps are done.
- */
- public boolean isDone() {
- for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
- if (!isDone(transform)) {
- return false;
- }
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/babddbbc/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
deleted file mode 100644
index d2558ce..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
-import org.apache.beam.sdk.util.BaseExecutionContext;
-import org.apache.beam.sdk.util.ExecutionContext;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
-
-/**
- * Execution Context for the {@link DirectRunner}.
- *
- * This implementation is not thread safe. A new {@link InProcessExecutionContext} must be created
- * for each thread that requires it.
- */
-class InProcessExecutionContext
- extends BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> {
- private final Clock clock;
- private final StructuralKey<?> key;
- private final CopyOnAccessInMemoryStateInternals<Object> existingState;
- private final TransformWatermarks watermarks;
-
- public InProcessExecutionContext(Clock clock, StructuralKey<?> key,
- CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) {
- this.clock = clock;
- this.key = key;
- this.existingState = existingState;
- this.watermarks = watermarks;
- }
-
- @Override
- protected InProcessStepContext createStepContext(String stepName, String transformName) {
- return new InProcessStepContext(this, stepName, transformName);
- }
-
- /**
- * Step Context for the {@link DirectRunner}.
- */
- public class InProcessStepContext
- extends org.apache.beam.sdk.util.BaseExecutionContext.StepContext {
- private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
- private InProcessTimerInternals timerInternals;
-
- public InProcessStepContext(
- ExecutionContext executionContext, String stepName, String transformName) {
- super(executionContext, stepName, transformName);
- }
-
- @Override
- public CopyOnAccessInMemoryStateInternals<Object> stateInternals() {
- if (stateInternals == null) {
- stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState);
- }
- return stateInternals;
- }
-
- @Override
- public InProcessTimerInternals timerInternals() {
- if (timerInternals == null) {
- timerInternals =
- InProcessTimerInternals.create(clock, watermarks, TimerUpdate.builder(key));
- }
- return timerInternals;
- }
-
- /**
- * Commits the state of this step, and returns the committed state. If the step has not
- * accessed any state, return null.
- */
- public CopyOnAccessInMemoryStateInternals<?> commitState() {
- if (stateInternals != null) {
- return stateInternals.commit();
- }
- return null;
- }
-
- /**
- * Gets the timer update of the {@link TimerInternals} of this {@link InProcessStepContext},
- * which is empty if the {@link TimerInternals} were never accessed.
- */
- public TimerUpdate getTimerUpdate() {
- if (timerInternals == null) {
- return TimerUpdate.empty();
- }
- return timerInternals.getTimerUpdate();
- }
- }
-}