You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/04/29 23:56:12 UTC
[14/17] incubator-beam git commit: Move InProcessRunner to its own
module
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
new file mode 100644
index 0000000..f374f99
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+import java.util.Map;
+
+/**
+ * An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the
+ * {@link InProcessPipelineRunner}.
+ */
+public class InProcessBundleOutputManager implements OutputManager {
+ private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
+
+ public static InProcessBundleOutputManager create(
+ Map<TupleTag<?>, UncommittedBundle<?>> outputBundles) {
+ return new InProcessBundleOutputManager(outputBundles);
+ }
+
+ public InProcessBundleOutputManager(Map<TupleTag<?>, UncommittedBundle<?>> bundles) {
+ this.bundles = bundles;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
+ @SuppressWarnings("rawtypes")
+ UncommittedBundle bundle = bundles.get(tag);
+ bundle.add(output);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
new file mode 100644
index 0000000..d9a7ff0
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.Trigger;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.annotation.Nullable;
+
+/**
+ * The evaluation context for a specific pipeline being executed by the
+ * {@link InProcessPipelineRunner}. Contains state shared within the execution across all
+ * transforms.
+ *
+ * <p>{@link InProcessEvaluationContext} contains shared state for an execution of the
+ * {@link InProcessPipelineRunner} that can be used while evaluating a {@link PTransform}. This
+ * consists of views into underlying state and watermark implementations, access to read and write
+ * {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and
+ * {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
+ * state changes to the appropriate point (e.g. when a {@link PCollectionView} is requested and
+ * known to be empty).
+ *
+ * <p>{@link InProcessEvaluationContext} also handles results by committing finalizing bundles based
+ * on the current global state and updating the global state appropriately. This includes updating
+ * the per-{@link StepAndKey} state, updating global watermarks, and executing any callbacks that
+ * can be executed.
+ */
+class InProcessEvaluationContext {
+ /** The step name for each {@link AppliedPTransform} in the {@link Pipeline}. */
+ private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
+
+ /** The options that were used to create this {@link Pipeline}. */
+ private final InProcessPipelineOptions options;
+
+ private final BundleFactory bundleFactory;
+ /** The current processing time and event time watermarks and timers. */
+ private final InMemoryWatermarkManager watermarkManager;
+
+ /** Executes callbacks based on the progression of the watermark. */
+ private final WatermarkCallbackExecutor callbackExecutor;
+
+ /** The stateInternals of the world, by applied PTransform and key. */
+ private final ConcurrentMap<StepAndKey, CopyOnAccessInMemoryStateInternals<?>>
+ applicationStateInternals;
+
+ private final InProcessSideInputContainer sideInputContainer;
+
+ private final CounterSet mergedCounters;
+
+ public static InProcessEvaluationContext create(
+ InProcessPipelineOptions options,
+ BundleFactory bundleFactory,
+ Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+ Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+ Map<AppliedPTransform<?, ?, ?>, String> stepNames,
+ Collection<PCollectionView<?>> views) {
+ return new InProcessEvaluationContext(
+ options, bundleFactory, rootTransforms, valueToConsumers, stepNames, views);
+ }
+
+ private InProcessEvaluationContext(
+ InProcessPipelineOptions options,
+ BundleFactory bundleFactory,
+ Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
+ Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
+ Map<AppliedPTransform<?, ?, ?>, String> stepNames,
+ Collection<PCollectionView<?>> views) {
+ this.options = checkNotNull(options);
+ this.bundleFactory = checkNotNull(bundleFactory);
+ checkNotNull(rootTransforms);
+ checkNotNull(valueToConsumers);
+ checkNotNull(stepNames);
+ checkNotNull(views);
+ this.stepNames = stepNames;
+
+ this.watermarkManager =
+ InMemoryWatermarkManager.create(
+ NanosOffsetClock.create(), rootTransforms, valueToConsumers);
+ this.sideInputContainer = InProcessSideInputContainer.create(this, views);
+
+ this.applicationStateInternals = new ConcurrentHashMap<>();
+ this.mergedCounters = new CounterSet();
+
+ this.callbackExecutor = WatermarkCallbackExecutor.create();
+ }
+
+ /**
+ * Handle the provided {@link InProcessTransformResult}, produced after evaluating the provided
+ * {@link CommittedBundle} (potentially null, if the result of a root {@link PTransform}).
+ *
+ * <p>The result is the output of running the transform contained in the
+ * {@link InProcessTransformResult} on the contents of the provided bundle.
+ *
+ * @param completedBundle the bundle that was processed to produce the result. Potentially
+ * {@code null} if the transform that produced the result is a root
+ * transform
+ * @param completedTimers the timers that were delivered to produce the {@code completedBundle},
+ * or an empty iterable if no timers were delivered
+ * @param result the result of evaluating the input bundle
+ * @return the committed bundles contained within the handled {@code result}
+ */
+ public synchronized CommittedResult handleResult(
+ @Nullable CommittedBundle<?> completedBundle,
+ Iterable<TimerData> completedTimers,
+ InProcessTransformResult result) {
+ Iterable<? extends CommittedBundle<?>> committedBundles =
+ commitBundles(result.getOutputBundles());
+ // Update watermarks and timers
+ watermarkManager.updateWatermarks(
+ completedBundle,
+ result.getTransform(),
+ result.getTimerUpdate().withCompletedTimers(completedTimers),
+ committedBundles,
+ result.getWatermarkHold());
+ fireAllAvailableCallbacks();
+ // Update counters
+ if (result.getCounters() != null) {
+ mergedCounters.merge(result.getCounters());
+ }
+ // Update state internals
+ CopyOnAccessInMemoryStateInternals<?> theirState = result.getState();
+ if (theirState != null) {
+ CopyOnAccessInMemoryStateInternals<?> committedState = theirState.commit();
+ StepAndKey stepAndKey =
+ StepAndKey.of(
+ result.getTransform(), completedBundle == null ? null : completedBundle.getKey());
+ if (!committedState.isEmpty()) {
+ applicationStateInternals.put(stepAndKey, committedState);
+ } else {
+ applicationStateInternals.remove(stepAndKey);
+ }
+ }
+ return CommittedResult.create(result, committedBundles);
+ }
+
+ private Iterable<? extends CommittedBundle<?>> commitBundles(
+ Iterable<? extends UncommittedBundle<?>> bundles) {
+ ImmutableList.Builder<CommittedBundle<?>> completed = ImmutableList.builder();
+ for (UncommittedBundle<?> inProgress : bundles) {
+ AppliedPTransform<?, ?, ?> producing =
+ inProgress.getPCollection().getProducingTransformInternal();
+ TransformWatermarks watermarks = watermarkManager.getWatermarks(producing);
+ CommittedBundle<?> committed =
+ inProgress.commit(watermarks.getSynchronizedProcessingOutputTime());
+ // Empty bundles don't impact watermarks and shouldn't trigger downstream execution, so
+ // filter them out
+ if (!Iterables.isEmpty(committed.getElements())) {
+ completed.add(committed);
+ }
+ }
+ return completed.build();
+ }
+
+ private void fireAllAvailableCallbacks() {
+ for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+ fireAvailableCallbacks(transform);
+ }
+ }
+
+ private void fireAvailableCallbacks(AppliedPTransform<?, ?, ?> producingTransform) {
+ TransformWatermarks watermarks = watermarkManager.getWatermarks(producingTransform);
+ callbackExecutor.fireForWatermark(producingTransform, watermarks.getOutputWatermark());
+ }
+
+ /**
+ * Create a {@link UncommittedBundle} for use by a source.
+ */
+ public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
+ return bundleFactory.createRootBundle(output);
+ }
+
+ /**
+ * Create a {@link UncommittedBundle} whose elements belong to the specified {@link
+ * PCollection}.
+ */
+ public <T> UncommittedBundle<T> createBundle(CommittedBundle<?> input, PCollection<T> output) {
+ return bundleFactory.createBundle(input, output);
+ }
+
+ /**
+ * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by
+ * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}.
+ */
+ public <T> UncommittedBundle<T> createKeyedBundle(
+ CommittedBundle<?> input, Object key, PCollection<T> output) {
+ return bundleFactory.createKeyedBundle(input, key, output);
+ }
+
+ /**
+ * Create a {@link PCollectionViewWriter}, whose elements will be used in the provided
+ * {@link PCollectionView}.
+ */
+ public <ElemT, ViewT> PCollectionViewWriter<ElemT, ViewT> createPCollectionViewWriter(
+ PCollection<Iterable<ElemT>> input, final PCollectionView<ViewT> output) {
+ return new PCollectionViewWriter<ElemT, ViewT>() {
+ @Override
+ public void add(Iterable<WindowedValue<ElemT>> values) {
+ sideInputContainer.write(output, values);
+ }
+ };
+ }
+
+ /**
+ * Schedule a callback to be executed after output would be produced for the given window
+ * if there had been input.
+ *
+ * <p>Output would be produced when the watermark for a {@link PValue} passes the point at
+ * which the trigger for the specified window (with the specified windowing strategy) must have
+ * fired from the perspective of that {@link PValue}, as specified by the value of
+ * {@link Trigger#getWatermarkThatGuaranteesFiring(BoundedWindow)} for the trigger of the
+ * {@link WindowingStrategy}. When the callback has fired, either values will have been produced
+ * for a key in that window, the window is empty, or all elements in the window are late. The
+ * callback will be executed regardless of whether values have been produced.
+ */
+ public void scheduleAfterOutputWouldBeProduced(
+ PValue value,
+ BoundedWindow window,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Runnable runnable) {
+ AppliedPTransform<?, ?, ?> producing = getProducing(value);
+ callbackExecutor.callOnGuaranteedFiring(producing, window, windowingStrategy, runnable);
+
+ fireAvailableCallbacks(lookupProducing(value));
+ }
+
+ private AppliedPTransform<?, ?, ?> getProducing(PValue value) {
+ if (value.getProducingTransformInternal() != null) {
+ return value.getProducingTransformInternal();
+ }
+ return lookupProducing(value);
+ }
+
+ private AppliedPTransform<?, ?, ?> lookupProducing(PValue value) {
+ for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+ if (transform.getOutput().equals(value) || transform.getOutput().expand().contains(value)) {
+ return transform;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Get the options used by this {@link Pipeline}.
+ */
+ public InProcessPipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ /**
+ * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key.
+ */
+ public InProcessExecutionContext getExecutionContext(
+ AppliedPTransform<?, ?, ?> application, Object key) {
+ StepAndKey stepAndKey = StepAndKey.of(application, key);
+ return new InProcessExecutionContext(
+ options.getClock(),
+ key,
+ (CopyOnAccessInMemoryStateInternals<Object>) applicationStateInternals.get(stepAndKey),
+ watermarkManager.getWatermarks(application));
+ }
+
+ /**
+ * Get all of the steps used in this {@link Pipeline}.
+ */
+ public Collection<AppliedPTransform<?, ?, ?>> getSteps() {
+ return stepNames.keySet();
+ }
+
+ /**
+ * Get the Step Name for the provided application.
+ */
+ public String getStepName(AppliedPTransform<?, ?, ?> application) {
+ return stepNames.get(application);
+ }
+
+ /**
+ * Returns a {@link ReadyCheckingSideInputReader} capable of reading the provided
+ * {@link PCollectionView PCollectionViews}.
+ *
+ * @param sideInputs the {@link PCollectionView PCollectionViews} the result should be able to
+ * read
+ * @return a {@link SideInputReader} that can read all of the provided {@link PCollectionView
+ * PCollectionViews}
+ */
+ public ReadyCheckingSideInputReader createSideInputReader(
+ final List<PCollectionView<?>> sideInputs) {
+ return sideInputContainer.createReaderForViews(sideInputs);
+ }
+
+ /**
+ * A {@link SideInputReader} that allows callers to check to see if a {@link PCollectionView} has
+ * had its contents set in a window.
+ */
+ static interface ReadyCheckingSideInputReader extends SideInputReader {
+ /**
+ * Returns true if the {@link PCollectionView} is ready in the provided {@link BoundedWindow}.
+ */
+ boolean isReady(PCollectionView<?> view, BoundedWindow window);
+ }
+
+ /**
+ * Create a {@link CounterSet} for this {@link Pipeline}. The {@link CounterSet} is independent
+ * of all other {@link CounterSet CounterSets} created by this call.
+ *
+ * The {@link InProcessEvaluationContext} is responsible for unifying the counters present in
+ * all created {@link CounterSet CounterSets} when the transforms that call this method
+ * complete.
+ */
+ public CounterSet createCounterSet() {
+ return new CounterSet();
+ }
+
+ /**
+ * Returns all of the counters that have been merged into this context via calls to
+ * {@link CounterSet#merge(CounterSet)}.
+ */
+ public CounterSet getCounters() {
+ return mergedCounters;
+ }
+
+ /**
+ * Extracts all timers that have been fired and have not already been extracted.
+ *
+ * <p>This is a destructive operation. Timers will only appear in the result of this method once
+ * for each time they are set.
+ */
+ public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
+ Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired =
+ watermarkManager.extractFiredTimers();
+ return fired;
+ }
+
+ /**
+ * Returns true if the step will not produce additional output.
+ *
+ * <p>If the provided transform produces only {@link IsBounded#BOUNDED}
+ * {@link PCollection PCollections}, returns true if the watermark is at
+ * {@link BoundedWindow#TIMESTAMP_MAX_VALUE positive infinity}.
+ *
+ * <p>If the provided transform produces any {@link IsBounded#UNBOUNDED}
+ * {@link PCollection PCollections}, returns the value of
+ * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
+ */
+ public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
+ // if the PTransform's watermark isn't at the max value, it isn't done
+ if (watermarkManager
+ .getWatermarks(transform)
+ .getOutputWatermark()
+ .isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+ return false;
+ }
+ // If the PTransform has any unbounded outputs, and unbounded producers should not be shut down,
+ // the PTransform may produce additional output. It is not done.
+ for (PValue output : transform.getOutput().expand()) {
+ if (output instanceof PCollection) {
+ IsBounded bounded = ((PCollection<?>) output).isBounded();
+ if (bounded.equals(IsBounded.UNBOUNDED)
+ && !options.isShutdownUnboundedProducersWithMaxWatermark()) {
+ return false;
+ }
+ }
+ }
+ // The PTransform's watermark was at positive infinity and all of its outputs are known to be
+ // done. It is done.
+ return true;
+ }
+
+ /**
+ * Returns true if all steps are done.
+ */
+ public boolean isDone() {
+ for (AppliedPTransform<?, ?, ?> transform : stepNames.keySet()) {
+ if (!isDone(transform)) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
new file mode 100644
index 0000000..44d8bd9
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.util.BaseExecutionContext;
+import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+
+/**
+ * Execution Context for the {@link InProcessPipelineRunner}.
+ *
+ * This implementation is not thread safe. A new {@link InProcessExecutionContext} must be created
+ * for each thread that requires it.
+ */
+class InProcessExecutionContext
+ extends BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> {
+ private final Clock clock;
+ private final Object key;
+ private final CopyOnAccessInMemoryStateInternals<Object> existingState;
+ private final TransformWatermarks watermarks;
+
+ public InProcessExecutionContext(Clock clock, Object key,
+ CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) {
+ this.clock = clock;
+ this.key = key;
+ this.existingState = existingState;
+ this.watermarks = watermarks;
+ }
+
+ @Override
+ protected InProcessStepContext createStepContext(String stepName, String transformName) {
+ return new InProcessStepContext(this, stepName, transformName);
+ }
+
+ /**
+ * Step Context for the {@link InProcessPipelineRunner}.
+ */
+ public class InProcessStepContext
+ extends org.apache.beam.sdk.util.BaseExecutionContext.StepContext {
+ private CopyOnAccessInMemoryStateInternals<Object> stateInternals;
+ private InProcessTimerInternals timerInternals;
+
+ public InProcessStepContext(
+ ExecutionContext executionContext, String stepName, String transformName) {
+ super(executionContext, stepName, transformName);
+ }
+
+ @Override
+ public CopyOnAccessInMemoryStateInternals<Object> stateInternals() {
+ if (stateInternals == null) {
+ stateInternals = CopyOnAccessInMemoryStateInternals.withUnderlying(key, existingState);
+ }
+ return stateInternals;
+ }
+
+ @Override
+ public InProcessTimerInternals timerInternals() {
+ if (timerInternals == null) {
+ timerInternals =
+ InProcessTimerInternals.create(clock, watermarks, TimerUpdate.builder(key));
+ }
+ return timerInternals;
+ }
+
+ /**
+ * Commits the state of this step, and returns the committed state. If the step has not
+ * accessed any state, return null.
+ */
+ public CopyOnAccessInMemoryStateInternals<?> commitState() {
+ if (stateInternals != null) {
+ return stateInternals.commit();
+ }
+ return null;
+ }
+
+ /**
+ * Gets the timer update of the {@link TimerInternals} of this {@link InProcessStepContext},
+ * which is empty if the {@link TimerInternals} were never accessed.
+ */
+ public TimerUpdate getTimerUpdate() {
+ if (timerInternals == null) {
+ return TimerUpdate.empty();
+ }
+ return timerInternals.getTimerUpdate();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
new file mode 100644
index 0000000..d811e1b
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import java.util.Collection;
+
+/**
+ * An executor that schedules and executes {@link AppliedPTransform AppliedPTransforms} for both
+ * source and intermediate {@link PTransform PTransforms}.
+ */
+interface InProcessExecutor {
+ /**
+ * Starts this executor. The provided collection is the collection of root transforms to
+ * initially schedule.
+ *
+ * @param rootTransforms
+ */
+ void start(Collection<AppliedPTransform<?, ?, ?>> rootTransforms);
+
+ /**
+ * Blocks until the job being executed enters a terminal state. A job is completed after all
+ * root {@link AppliedPTransform AppliedPTransforms} have completed, and all
+ * {@link CommittedBundle Bundles} have been consumed. Jobs may also terminate abnormally.
+ *
+ * @throws Throwable whenever an executor thread throws anything, transfers the throwable to the
+ * waiting thread and rethrows it
+ */
+ void awaitCompletion() throws Throwable;
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
new file mode 100644
index 0000000..512b3bd
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Options that can be used to configure the {@link InProcessPipelineRunner}.
+ */
+public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNameOptions {
+ /**
+ * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService}
+ * to execute {@link PTransform PTransforms}.
+ *
+ * <p>Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that
+ * it cannot enter a state in which it will not schedule additional pending work unless currently
+ * scheduled work completes, as this may cause the {@link Pipeline} to cease processing.
+ *
+ * <p>Defaults to a {@link CachedThreadPoolExecutorServiceFactory}, which produces instances of
+ * {@link Executors#newCachedThreadPool()}.
+ */
+ @JsonIgnore
+ @Required
+ @Hidden
+ @Default.InstanceFactory(CachedThreadPoolExecutorServiceFactory.class)
+ ExecutorServiceFactory getExecutorServiceFactory();
+
+ void setExecutorServiceFactory(ExecutorServiceFactory executorService);
+
+ /**
+ * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the
+ * system time when time values are required by the evaluator.
+ */
+ @Default.InstanceFactory(NanosOffsetClock.Factory.class)
+ @JsonIgnore
+ @Required
+ @Hidden
+ @Description(
+ "The processing time source used by the pipeline. When the current time is "
+ + "needed by the evaluator, the result of clock#now() is used.")
+ Clock getClock();
+
+ void setClock(Clock clock);
+
+ @Default.Boolean(false)
+ @Description(
+ "If the pipeline should shut down producers which have reached the maximum "
+ + "representable watermark. If this is set to true, a pipeline in which all PTransforms "
+ + "have reached the maximum watermark will be shut down, even if there are unbounded "
+ + "sources that could produce additional (late) data. By default, if the pipeline "
+ + "contains any unbounded PCollections, it will run until explicitly shut down.")
+ boolean isShutdownUnboundedProducersWithMaxWatermark();
+
+ void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown);
+
+ @Default.Boolean(true)
+ @Description(
+ "If the pipeline should block awaiting completion of the pipeline. If set to true, "
+ + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, "
+ + "the Pipeline will execute asynchronously. If set to false, the completion of the "
+ + "pipeline can be awaited on by use of InProcessPipelineResult#awaitCompletion().")
+ boolean isBlockOnRun();
+
+ void setBlockOnRun(boolean b);
+
+ @Default.Boolean(true)
+ @Description(
+ "Controls whether the runner should ensure that all of the elements of every "
+ + "PCollection are not mutated. PTransforms are not permitted to mutate input elements "
+ + "at any point, or output elements after they are output.")
+ boolean isTestImmutability();
+
+ void setTestImmutability(boolean test);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
new file mode 100644
index 0000000..bb8c0de
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.GroupByKeyEvaluatorFactory.InProcessGroupByKeyOverrideFactory;
+import org.apache.beam.runners.direct.ViewEvaluatorFactory.InProcessViewOverrideFactory;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.util.MapAggregatorValues;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.Counter;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import javax.annotation.Nullable;
+
+/**
+ * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
+ * {@link PCollection PCollections}.
+ */
+@Experimental
+public class InProcessPipelineRunner
+ extends PipelineRunner<InProcessPipelineRunner.InProcessPipelineResult> {
+ /**
+ * The default set of transform overrides to use in the {@link InProcessPipelineRunner}.
+ *
+ * <p>A transform override must have a single-argument constructor that takes an instance of the
+ * type of transform it is overriding.
+ */
+ @SuppressWarnings("rawtypes")
+ private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
+ defaultTransformOverrides =
+ ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
+ .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
+ .put(CreatePCollectionView.class, new InProcessViewOverrideFactory())
+ .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())
+ .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory())
+ .build();
+
+ /**
+ * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
+ * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
+ * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
+ *
+ * @param <T> the type of elements that can be added to this bundle
+ */
+ public static interface UncommittedBundle<T> {
+ /**
+ * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
+ */
+ PCollection<T> getPCollection();
+
+ /**
+ * Outputs an element to this bundle.
+ *
+ * @param element the element to add to this bundle
+ * @return this bundle
+ */
+ UncommittedBundle<T> add(WindowedValue<T> element);
+
+ /**
+ * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle}
+ * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method
+ * will throw an {@link IllegalStateException} if called after a call to commit.
+ * @param synchronizedProcessingTime the synchronized processing time at which this bundle was
+ * committed
+ */
+ CommittedBundle<T> commit(Instant synchronizedProcessingTime);
+ }
+
+ /**
+ * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
+ * eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
+ * that consume the {@link PCollection} this bundle is
+ * a part of at a later point.
+ * @param <T> the type of elements contained within this bundle
+ */
+ public static interface CommittedBundle<T> {
+ /**
+ * Returns the PCollection that the elements of this bundle belong to.
+ */
+ PCollection<T> getPCollection();
+
+ /**
+ * Returns the (possibly null) key that was output in the most recent {@link GroupByKey} in the
+ * execution of this bundle.
+ */
+ @Nullable
+ Object getKey();
+
+ /**
+ * Returns an {@link Iterable} containing all of the elements that have been added to this
+ * {@link CommittedBundle}.
+ */
+ Iterable<WindowedValue<T>> getElements();
+
+ /**
+ * Returns the processing time output watermark at the time the producing {@link PTransform}
+ * committed this bundle. Downstream synchronized processing time watermarks cannot progress
+ * past this point before consuming this bundle.
+ *
+ * <p>This value is no greater than the earliest incomplete processing time or synchronized
+ * processing time {@link TimerData timer} at the time this bundle was committed, including any
+ * timers that fired to produce this bundle.
+ */
+ Instant getSynchronizedProcessingOutputWatermark();
+
+ /**
+ * Return a new {@link CommittedBundle} that is like this one, except calls to
+ * {@link #getElements()} will return the provided elements. This bundle is unchanged.
+ *
+ * <p>
+ * The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing
+ * output watermark} of the returned {@link CommittedBundle} is equal to the value returned from
+ * the current bundle. This is used to ensure a {@link PTransform} that could not complete
+ * processing on input elements properly holds the synchronized processing time to the
+ * appropriate value.
+ */
+ CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
+ }
+
+ /**
+ * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
+ * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
+ * @param <ElemT> the type of elements the input {@link PCollection} contains.
+ * @param <ViewT> the type of the PCollectionView this writer writes to.
+ */
+ public static interface PCollectionViewWriter<ElemT, ViewT> {
+ void add(Iterable<WindowedValue<ElemT>> values);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////
+ private final InProcessPipelineOptions options;
+
+ public static InProcessPipelineRunner fromOptions(PipelineOptions options) {
+ return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class));
+ }
+
+ private InProcessPipelineRunner(InProcessPipelineOptions options) {
+ this.options = options;
+ }
+
+ /**
+ * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}.
+ */
+ public InProcessPipelineOptions getPipelineOptions() {
+ return options;
+ }
+
+ @Override
+ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+ PTransform<InputT, OutputT> transform, InputT input) {
+ PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass());
+ if (overrideFactory != null) {
+ PTransform<InputT, OutputT> customTransform = overrideFactory.override(transform);
+
+ return super.apply(customTransform, input);
+ }
+ // If there is no override, or we should not apply the override, apply the original transform
+ return super.apply(transform, input);
+ }
+
+ @Override
+ public InProcessPipelineResult run(Pipeline pipeline) {
+ ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor();
+ pipeline.traverseTopologically(consumerTrackingVisitor);
+ for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) {
+ unfinalized.finishSpecifying();
+ }
+ @SuppressWarnings("rawtypes")
+ KeyedPValueTrackingVisitor keyedPValueVisitor =
+ KeyedPValueTrackingVisitor.create(
+ ImmutableSet.<Class<? extends PTransform>>of(
+ GroupByKey.class, InProcessGroupByKeyOnly.class));
+ pipeline.traverseTopologically(keyedPValueVisitor);
+
+ InProcessEvaluationContext context =
+ InProcessEvaluationContext.create(
+ getPipelineOptions(),
+ createBundleFactory(getPipelineOptions()),
+ consumerTrackingVisitor.getRootTransforms(),
+ consumerTrackingVisitor.getValueToConsumers(),
+ consumerTrackingVisitor.getStepNames(),
+ consumerTrackingVisitor.getViews());
+
+ // independent executor service for each run
+ ExecutorService executorService =
+ context.getPipelineOptions().getExecutorServiceFactory().create();
+ InProcessExecutor executor =
+ ExecutorServiceParallelExecutor.create(
+ executorService,
+ consumerTrackingVisitor.getValueToConsumers(),
+ keyedPValueVisitor.getKeyedPValues(),
+ TransformEvaluatorRegistry.defaultRegistry(),
+ defaultModelEnforcements(options),
+ context);
+ executor.start(consumerTrackingVisitor.getRootTransforms());
+
+ Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
+ new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
+ InProcessPipelineResult result =
+ new InProcessPipelineResult(executor, context, aggregatorSteps);
+ if (options.isBlockOnRun()) {
+ try {
+ result.awaitCompletion();
+ } catch (UserCodeException userException) {
+ throw new PipelineExecutionException(userException.getCause());
+ } catch (Throwable t) {
+ Throwables.propagate(t);
+ }
+ }
+ return result;
+ }
+
+ private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+ defaultModelEnforcements(InProcessPipelineOptions options) {
+ ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+ enforcements = ImmutableMap.builder();
+ Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options);
+ enforcements.put(ParDo.Bound.class, parDoEnforcements);
+ enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
+ return enforcements.build();
+ }
+
+ private Collection<ModelEnforcementFactory> createParDoEnforcements(
+ InProcessPipelineOptions options) {
+ ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder();
+ if (options.isTestImmutability()) {
+ enforcements.add(ImmutabilityEnforcementFactory.create());
+ }
+ return enforcements.build();
+ }
+
+ private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) {
+ BundleFactory bundleFactory = InProcessBundleFactory.create();
+ if (pipelineOptions.isTestImmutability()) {
+ bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
+ }
+ return bundleFactory;
+ }
+
+ /**
+ * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}.
+ *
+ * Throws {@link UnsupportedOperationException} for all methods.
+ */
+ public static class InProcessPipelineResult implements PipelineResult {
+ private final InProcessExecutor executor;
+ private final InProcessEvaluationContext evaluationContext;
+ private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;
+ private State state;
+
+ private InProcessPipelineResult(
+ InProcessExecutor executor,
+ InProcessEvaluationContext evaluationContext,
+ Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) {
+ this.executor = executor;
+ this.evaluationContext = evaluationContext;
+ this.aggregatorSteps = aggregatorSteps;
+ // Only ever constructed after the executor has started.
+ this.state = State.RUNNING;
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
+ throws AggregatorRetrievalException {
+ CounterSet counters = evaluationContext.getCounters();
+ Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
+ Map<String, T> stepValues = new HashMap<>();
+ for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
+ if (steps.contains(transform.getTransform())) {
+ String stepName =
+ String.format(
+ "user-%s-%s", evaluationContext.getStepName(transform), aggregator.getName());
+ Counter<T> counter = (Counter<T>) counters.getExistingCounter(stepName);
+ if (counter != null) {
+ stepValues.put(transform.getFullName(), counter.getAggregate());
+ }
+ }
+ }
+ return new MapAggregatorValues<>(stepValues);
+ }
+
+ /**
+ * Blocks until the {@link Pipeline} execution represented by this
+ * {@link InProcessPipelineResult} is complete, returning the terminal state.
+ *
+ * <p>If the pipeline terminates abnormally by throwing an exception, this will rethrow the
+ * exception. Future calls to {@link #getState()} will return
+ * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}.
+ *
+ * <p>NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED unbounded}
+ * {@link PCollection}, and the {@link PipelineRunner} was created with
+ * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false,
+ * this method will never return.
+ *
+ * See also {@link InProcessExecutor#awaitCompletion()}.
+ */
+ public State awaitCompletion() throws Throwable {
+ if (!state.isTerminal()) {
+ try {
+ executor.awaitCompletion();
+ state = State.DONE;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
+ } catch (Throwable t) {
+ state = State.FAILED;
+ throw t;
+ }
+ }
+ return state;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
new file mode 100644
index 0000000..4a09de7
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
+ * {@link InProcessPipelineRunner}.
+ */
+public class InProcessRegistrar {
+ private InProcessRegistrar() {}
+ /**
+ * Registers the {@link InProcessPipelineRunner}.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class InProcessRunner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(InProcessPipelineRunner.class);
+ }
+ }
+
+ /**
+ * Registers the {@link InProcessPipelineOptions}.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class InProcessOptions implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(InProcessPipelineOptions.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
new file mode 100644
index 0000000..f4980ef
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessSideInputContainer.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import org.apache.beam.runners.direct.InProcessEvaluationContext.ReadyCheckingSideInputReader;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PCollectionViewWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Throwables;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import javax.annotation.Nullable;
+
+/**
+ * An in-process container for {@link PCollectionView PCollectionViews}, which provides methods for
+ * constructing {@link SideInputReader SideInputReaders} which block until a side input is
+ * available and writing to a {@link PCollectionView}.
+ */
+class InProcessSideInputContainer {
+ private final InProcessEvaluationContext evaluationContext;
+ private final Collection<PCollectionView<?>> containedViews;
+ private final LoadingCache<PCollectionViewWindow<?>,
+ SettableFuture<Iterable<? extends WindowedValue<?>>>> viewByWindows;
+
+ /**
+ * Create a new {@link InProcessSideInputContainer} with the provided views and the provided
+ * context.
+ */
+ public static InProcessSideInputContainer create(
+ InProcessEvaluationContext context, Collection<PCollectionView<?>> containedViews) {
+ CacheLoader<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
+ loader = new CacheLoader<PCollectionViewWindow<?>,
+ SettableFuture<Iterable<? extends WindowedValue<?>>>>() {
+ @Override
+ public SettableFuture<Iterable<? extends WindowedValue<?>>> load(
+ PCollectionViewWindow<?> view) {
+ return SettableFuture.create();
+ }
+ };
+ LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
+ viewByWindows = CacheBuilder.newBuilder().build(loader);
+ return new InProcessSideInputContainer(context, containedViews, viewByWindows);
+ }
+
+ private InProcessSideInputContainer(InProcessEvaluationContext context,
+ Collection<PCollectionView<?>> containedViews,
+ LoadingCache<PCollectionViewWindow<?>, SettableFuture<Iterable<? extends WindowedValue<?>>>>
+ viewByWindows) {
+ this.evaluationContext = context;
+ this.containedViews = ImmutableSet.copyOf(containedViews);
+ this.viewByWindows = viewByWindows;
+ }
+
+ /**
+ * Return a view of this {@link InProcessSideInputContainer} that contains only the views in the
+ * provided argument. The returned {@link InProcessSideInputContainer} is unmodifiable without
+ * casting, but will change as this {@link InProcessSideInputContainer} is modified.
+ */
+ public ReadyCheckingSideInputReader createReaderForViews(
+ Collection<PCollectionView<?>> newContainedViews) {
+ if (!containedViews.containsAll(newContainedViews)) {
+ Set<PCollectionView<?>> currentlyContained = ImmutableSet.copyOf(containedViews);
+ Set<PCollectionView<?>> newRequested = ImmutableSet.copyOf(newContainedViews);
+ throw new IllegalArgumentException("Can't create a SideInputReader with unknown views "
+ + Sets.difference(newRequested, currentlyContained));
+ }
+ return new SideInputContainerSideInputReader(newContainedViews);
+ }
+
+ /**
+ * Write the provided values to the provided view.
+ *
+ * <p>The windowed values are first exploded, then for each window the pane is determined. For
+ * each window, if the pane is later than the current pane stored within this container, write
+ * all of the values to the container as the new values of the {@link PCollectionView}.
+ *
+ * <p>The provided iterable is expected to contain only a single window and pane.
+ */
+ public void write(PCollectionView<?> view, Iterable<? extends WindowedValue<?>> values) {
+ Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow =
+ indexValuesByWindow(values);
+ for (Map.Entry<BoundedWindow, Collection<WindowedValue<?>>> windowValues :
+ valuesPerWindow.entrySet()) {
+ updatePCollectionViewWindowValues(view, windowValues.getKey(), windowValues.getValue());
+ }
+ }
+
+ /**
+ * Index the provided values by all {@link BoundedWindow windows} in which they appear.
+ */
+ private Map<BoundedWindow, Collection<WindowedValue<?>>> indexValuesByWindow(
+ Iterable<? extends WindowedValue<?>> values) {
+ Map<BoundedWindow, Collection<WindowedValue<?>>> valuesPerWindow = new HashMap<>();
+ for (WindowedValue<?> value : values) {
+ for (BoundedWindow window : value.getWindows()) {
+ Collection<WindowedValue<?>> windowValues = valuesPerWindow.get(window);
+ if (windowValues == null) {
+ windowValues = new ArrayList<>();
+ valuesPerWindow.put(window, windowValues);
+ }
+ windowValues.add(value);
+ }
+ }
+ return valuesPerWindow;
+ }
+
+ /**
+ * Set the value of the {@link PCollectionView} in the {@link BoundedWindow} to be based on the
+ * specified values, if the values are part of a later pane than currently exist within the
+ * {@link PCollectionViewWindow}.
+ */
+ private void updatePCollectionViewWindowValues(
+ PCollectionView<?> view, BoundedWindow window, Collection<WindowedValue<?>> windowValues) {
+ PCollectionViewWindow<?> windowedView = PCollectionViewWindow.of(view, window);
+ SettableFuture<Iterable<? extends WindowedValue<?>>> future = null;
+ try {
+ future = viewByWindows.get(windowedView);
+ if (future.isDone()) {
+ Iterator<? extends WindowedValue<?>> existingValues = future.get().iterator();
+ PaneInfo newPane = windowValues.iterator().next().getPane();
+ // The current value may have no elements, if no elements were produced for the window,
+ // but we are recieving late data.
+ if (!existingValues.hasNext()
+ || newPane.getIndex() > existingValues.next().getPane().getIndex()) {
+ viewByWindows.invalidate(windowedView);
+ viewByWindows.get(windowedView).set(windowValues);
+ }
+ } else {
+ future.set(windowValues);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (future != null && !future.isDone()) {
+ future.set(Collections.<WindowedValue<?>>emptyList());
+ }
+ } catch (ExecutionException e) {
+ Throwables.propagate(e.getCause());
+ }
+ }
+
+ private final class SideInputContainerSideInputReader implements ReadyCheckingSideInputReader {
+ private final Collection<PCollectionView<?>> readerViews;
+
+ private SideInputContainerSideInputReader(Collection<PCollectionView<?>> readerViews) {
+ this.readerViews = ImmutableSet.copyOf(readerViews);
+ }
+
+ @Override
+ public boolean isReady(final PCollectionView<?> view, final BoundedWindow window) {
+ checkArgument(
+ readerViews.contains(view),
+ "Tried to check if view %s was ready in a SideInputReader that does not contain it. "
+ + "Contained views; %s",
+ view,
+ readerViews);
+ return getViewFuture(view, window).isDone();
+ }
+
+ @Override
+ @Nullable
+ public <T> T get(final PCollectionView<T> view, final BoundedWindow window) {
+ checkArgument(
+ readerViews.contains(view), "calling get(PCollectionView) with unknown view: " + view);
+ try {
+ final Future<Iterable<? extends WindowedValue<?>>> future = getViewFuture(view, window);
+ // Safe covariant cast
+ @SuppressWarnings("unchecked")
+ Iterable<WindowedValue<?>> values = (Iterable<WindowedValue<?>>) future.get();
+ return view.fromIterableInternal(values);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return null;
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Gets the future containing the contents of the provided {@link PCollectionView} in the
+ * provided {@link BoundedWindow}, setting up a callback to populate the future with empty
+ * contents if necessary.
+ */
+ private <T> Future<Iterable<? extends WindowedValue<?>>> getViewFuture(
+ final PCollectionView<T> view, final BoundedWindow window) {
+ PCollectionViewWindow<T> windowedView = PCollectionViewWindow.of(view, window);
+ final SettableFuture<Iterable<? extends WindowedValue<?>>> future =
+ viewByWindows.getUnchecked(windowedView);
+
+ WindowingStrategy<?, ?> windowingStrategy = view.getWindowingStrategyInternal();
+ evaluationContext.scheduleAfterOutputWouldBeProduced(
+ view, window, windowingStrategy, new WriteEmptyViewContents(view, window, future));
+ return future;
+ }
+
+ @Override
+ public <T> boolean contains(PCollectionView<T> view) {
+ return readerViews.contains(view);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return readerViews.isEmpty();
+ }
+ }
+
+ private static class WriteEmptyViewContents implements Runnable {
+ private final PCollectionView<?> view;
+ private final BoundedWindow window;
+ private final SettableFuture<Iterable<? extends WindowedValue<?>>> future;
+
+ private WriteEmptyViewContents(PCollectionView<?> view, BoundedWindow window,
+ SettableFuture<Iterable<? extends WindowedValue<?>>> future) {
+ this.future = future;
+ this.view = view;
+ this.window = window;
+ }
+
+ @Override
+ public void run() {
+ // The requested window has closed without producing elements, so reflect that in
+ // the PCollectionView. If set has already been called, will do nothing.
+ future.set(Collections.<WindowedValue<?>>emptyList());
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("view", view)
+ .add("window", window)
+ .toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
new file mode 100644
index 0000000..cd54f59
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTimerInternals.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.util.TimerInternals;
+
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+/**
+ * An implementation of {@link TimerInternals} where all relevant data exists in memory.
+ */
+public class InProcessTimerInternals implements TimerInternals {
+ private final Clock processingTimeClock;
+ private final TransformWatermarks watermarks;
+ private final TimerUpdateBuilder timerUpdateBuilder;
+
+ public static InProcessTimerInternals create(
+ Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
+ return new InProcessTimerInternals(clock, watermarks, timerUpdateBuilder);
+ }
+
+ private InProcessTimerInternals(
+ Clock clock, TransformWatermarks watermarks, TimerUpdateBuilder timerUpdateBuilder) {
+ this.processingTimeClock = clock;
+ this.watermarks = watermarks;
+ this.timerUpdateBuilder = timerUpdateBuilder;
+ }
+
+ @Override
+ public void setTimer(TimerData timerKey) {
+ timerUpdateBuilder.setTimer(timerKey);
+ }
+
+ @Override
+ public void deleteTimer(TimerData timerKey) {
+ timerUpdateBuilder.deletedTimer(timerKey);
+ }
+
+ public TimerUpdate getTimerUpdate() {
+ return timerUpdateBuilder.build();
+ }
+
+ @Override
+ public Instant currentProcessingTime() {
+ return processingTimeClock.now();
+ }
+
+ @Override
+ @Nullable
+ public Instant currentSynchronizedProcessingTime() {
+ return watermarks.getSynchronizedProcessingInputTime();
+ }
+
+ @Override
+ public Instant currentInputWatermarkTime() {
+ return watermarks.getInputWatermark();
+ }
+
+ @Override
+ @Nullable
+ public Instant currentOutputWatermarkTime() {
+ return watermarks.getOutputWatermark();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
new file mode 100644
index 0000000..a132c33
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
+import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
+
+import org.joda.time.Instant;
+
+import javax.annotation.Nullable;
+
+/**
+ * The result of evaluating an {@link AppliedPTransform} with a {@link TransformEvaluator}.
+ */
+public interface InProcessTransformResult {
+ /**
+ * Returns the {@link AppliedPTransform} that produced this result.
+ */
+ AppliedPTransform<?, ?, ?> getTransform();
+
+ /**
+ * Returns the {@link UncommittedBundle (uncommitted) Bundles} output by this transform. These
+ * will be committed by the evaluation context as part of completing this result.
+ */
+ Iterable<? extends UncommittedBundle<?>> getOutputBundles();
+
+ /**
+ * Returns the {@link CounterSet} used by this {@link PTransform}, or null if this transform did
+ * not use a {@link CounterSet}.
+ */
+ @Nullable CounterSet getCounters();
+
+ /**
+ * Returns the Watermark Hold for the transform at the time this result was produced.
+ *
+ * If the transform does not set any watermark hold, returns
+ * {@link BoundedWindow#TIMESTAMP_MAX_VALUE}.
+ */
+ Instant getWatermarkHold();
+
+ /**
+ * Returns the State used by the transform.
+ *
+ * If this evaluation did not access state, this may return null.
+ */
+ @Nullable
+ CopyOnAccessInMemoryStateInternals<?> getState();
+
+ /**
+ * Returns a TimerUpdateBuilder that was produced as a result of this evaluation. If the
+ * evaluation was triggered due to the delivery of one or more timers, those timers must be added
+ * to the builder before it is complete.
+ *
+ * <p>If this evaluation did not add or remove any timers, returns an empty TimerUpdate.
+ */
+ TimerUpdate getTimerUpdate();
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
new file mode 100644
index 0000000..b7c755e
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.runners.TransformTreeNode;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it
+ * is the result of a {@link PTransform} that produces keyed outputs. A {@link PTransform} that
+ * produces keyed outputs is assumed to colocate output elements that share a key.
+ *
+ * <p>All {@link GroupByKey} transforms, or their runner-specific implementation primitive, produce
+ * keyed output.
+ */
+// TODO: Handle Key-preserving transforms when appropriate and more aggressively make PTransforms
+// unkeyed
+class KeyedPValueTrackingVisitor implements PipelineVisitor {
+ @SuppressWarnings("rawtypes")
+ private final Set<Class<? extends PTransform>> producesKeyedOutputs;
+ private final Set<PValue> keyedValues;
+ private boolean finalized;
+
+ public static KeyedPValueTrackingVisitor create(
+ @SuppressWarnings("rawtypes") Set<Class<? extends PTransform>> producesKeyedOutputs) {
+ return new KeyedPValueTrackingVisitor(producesKeyedOutputs);
+ }
+
+ private KeyedPValueTrackingVisitor(
+ @SuppressWarnings("rawtypes") Set<Class<? extends PTransform>> producesKeyedOutputs) {
+ this.producesKeyedOutputs = producesKeyedOutputs;
+ this.keyedValues = new HashSet<>();
+ }
+
+ @Override
+ public void enterCompositeTransform(TransformTreeNode node) {
+ checkState(
+ !finalized,
+ "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)",
+ KeyedPValueTrackingVisitor.class.getSimpleName(),
+ node);
+ }
+
+ @Override
+ public void leaveCompositeTransform(TransformTreeNode node) {
+ checkState(
+ !finalized,
+ "Attempted to use a %s that has already been finalized on a pipeline (visiting node %s)",
+ KeyedPValueTrackingVisitor.class.getSimpleName(),
+ node);
+ if (node.isRootNode()) {
+ finalized = true;
+ } else if (producesKeyedOutputs.contains(node.getTransform().getClass())) {
+ keyedValues.addAll(node.getExpandedOutputs());
+ }
+ }
+
+ @Override
+ public void visitTransform(TransformTreeNode node) {}
+
+ @Override
+ public void visitValue(PValue value, TransformTreeNode producer) {
+ if (producesKeyedOutputs.contains(producer.getTransform().getClass())) {
+ keyedValues.addAll(value.expand());
+ }
+ }
+
+ public Set<PValue> getKeyedPValues() {
+ checkState(
+ finalized, "can't call getKeyedPValues before a Pipeline has been completely traversed");
+ return keyedValues;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
new file mode 100644
index 0000000..cc9b6da
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.PCollection;
+
+/**
+ * Enforcement tools that verify that executing code conforms to the model.
+ *
+ * <p>ModelEnforcement is performed on a per-element and per-bundle basis. The
+ * {@link ModelEnforcement} is provided with the input bundle as part of
+ * {@link ModelEnforcementFactory#forBundle(CommittedBundle, AppliedPTransform)}, each element
+ * before and after that element is provided to an underlying {@link TransformEvaluator}, and the
+ * output {@link InProcessTransformResult} and committed output bundles after the
+ * {@link TransformEvaluator} has completed.
+ *
+ * <p>Typically, {@link ModelEnforcement} will obtain required metadata (such as the {@link Coder}
+ * of the input {@link PCollection} on construction, and then enforce per-element behavior
+ * (such as the immutability of input elements). When the element is output or the bundle is
+ * completed, the required conditions can be enforced across all elements.
+ */
+public interface ModelEnforcement<T> {
+ /**
+ * Called before a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
+ * provided {@link WindowedValue}.
+ */
+ void beforeElement(WindowedValue<T> element);
+
+ /**
+ * Called after a call to {@link TransformEvaluator#processElement(WindowedValue)} on the
+ * provided {@link WindowedValue}.
+ */
+ void afterElement(WindowedValue<T> element);
+
+ /**
+ * Called after a bundle has been completed and {@link TransformEvaluator#finishBundle()} has been
+ * called, producing the provided {@link InProcessTransformResult} and
+ * {@link CommittedBundle output bundles}.
+ */
+ void afterFinish(
+ CommittedBundle<T> input,
+ InProcessTransformResult result,
+ Iterable<? extends CommittedBundle<?>> outputs);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
new file mode 100644
index 0000000..6162ba0
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+
+/**
+ * Creates {@link ModelEnforcement} instances for an {@link AppliedPTransform} on an input
+ * {@link CommittedBundle bundle}. {@link ModelEnforcement} instances are created before the
+ * {@link TransformEvaluator} is created.
+ */
+public interface ModelEnforcementFactory {
+ <T> ModelEnforcement<T> forBundle(CommittedBundle<T> input, AppliedPTransform<?, ?, ?> consumer);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
new file mode 100644
index 0000000..ffdee9d
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/NanosOffsetClock.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.PipelineOptions;
+
+import org.joda.time.Instant;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A {@link Clock} that uses {@link System#nanoTime()} to track the progress of time.
+ */
+public class NanosOffsetClock implements Clock {
+ private final long baseMillis;
+ private final long nanosAtBaseMillis;
+
+ public static NanosOffsetClock create() {
+ return new NanosOffsetClock();
+ }
+
+ private NanosOffsetClock() {
+ baseMillis = System.currentTimeMillis();
+ nanosAtBaseMillis = System.nanoTime();
+ }
+
+ @Override
+ public Instant now() {
+ return new Instant(
+ baseMillis + (TimeUnit.MILLISECONDS.convert(
+ System.nanoTime() - nanosAtBaseMillis, TimeUnit.NANOSECONDS)));
+ }
+
+ /**
+ * Creates instances of {@link NanosOffsetClock}.
+ */
+ public static class Factory implements DefaultValueFactory<Clock> {
+ @Override
+ public Clock create(PipelineOptions options) {
+ return new NanosOffsetClock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e13cacb8/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
new file mode 100644
index 0000000..81e4863
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PTransformOverrideFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+
+interface PTransformOverrideFactory {
+ /**
+ * Create a {@link PTransform} override for the provided {@link PTransform} if applicable.
+ * Otherwise, return the input {@link PTransform}.
+ *
+ * <p>The returned PTransform must be semantically equivalent to the input {@link PTransform}.
+ */
+ <InputT extends PInput, OutputT extends POutput> PTransform<InputT, OutputT> override(
+ PTransform<InputT, OutputT> transform);
+}