You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:16:13 UTC
[20/50] [abbrv] incubator-beam git commit: Rename
InProcessPipelineRunner to DirectRunner
Rename InProcessPipelineRunner to DirectRunner
Completes BEAM-243
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9400fc9a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9400fc9a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9400fc9a
Branch: refs/heads/python-sdk
Commit: 9400fc9a699f218a7948c21639428f5f00134ec5
Parents: d2ceaf5
Author: Thomas Groh <tg...@google.com>
Authored: Wed Jun 15 10:45:15 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:29 2016 -0700
----------------------------------------------------------------------
runners/direct-java/pom.xml | 2 +-
.../direct/AbstractModelEnforcement.java | 2 +-
.../direct/BoundedReadEvaluatorFactory.java | 4 +-
.../beam/runners/direct/BundleFactory.java | 4 +-
.../beam/runners/direct/CommittedResult.java | 2 +-
.../beam/runners/direct/CompletionCallback.java | 2 +-
.../beam/runners/direct/DirectOptions.java | 101 +++++
.../beam/runners/direct/DirectRegistrar.java | 55 +++
.../beam/runners/direct/DirectRunner.java | 371 +++++++++++++++++++
.../direct/EncodabilityEnforcementFactory.java | 2 +-
.../direct/ExecutorServiceParallelExecutor.java | 2 +-
.../runners/direct/FlattenEvaluatorFactory.java | 6 +-
.../ImmutabilityCheckingBundleFactory.java | 4 +-
.../direct/ImmutabilityEnforcementFactory.java | 2 +-
.../direct/InMemoryWatermarkManager.java | 2 +-
.../runners/direct/InProcessBundleFactory.java | 4 +-
.../direct/InProcessBundleOutputManager.java | 6 +-
.../direct/InProcessEvaluationContext.java | 20 +-
.../direct/InProcessExecutionContext.java | 4 +-
.../beam/runners/direct/InProcessExecutor.java | 2 +-
...rocessGroupAlsoByWindowEvaluatorFactory.java | 4 +-
...InProcessGroupByKeyOnlyEvaluatorFactory.java | 6 +-
.../direct/InProcessPipelineOptions.java | 101 -----
.../runners/direct/InProcessPipelineRunner.java | 370 ------------------
.../beam/runners/direct/InProcessRegistrar.java | 55 ---
.../direct/InProcessTransformResult.java | 2 +-
.../beam/runners/direct/ModelEnforcement.java | 2 +-
.../runners/direct/ModelEnforcementFactory.java | 2 +-
.../runners/direct/ParDoInProcessEvaluator.java | 4 +-
.../direct/ParDoMultiEvaluatorFactory.java | 4 +-
.../direct/ParDoSingleEvaluatorFactory.java | 4 +-
.../direct/PassthroughTransformEvaluator.java | 2 +-
.../runners/direct/StepTransformResult.java | 2 +-
.../beam/runners/direct/TransformEvaluator.java | 2 +-
.../direct/TransformEvaluatorFactory.java | 2 +-
.../direct/TransformEvaluatorRegistry.java | 2 +-
.../beam/runners/direct/TransformExecutor.java | 2 +-
.../direct/UnboundedReadEvaluatorFactory.java | 4 +-
.../runners/direct/ViewEvaluatorFactory.java | 6 +-
.../runners/direct/WindowEvaluatorFactory.java | 6 +-
.../direct/AvroIOShardedWriteFactoryTest.java | 2 +-
.../direct/BoundedReadEvaluatorFactoryTest.java | 4 +-
.../runners/direct/CommittedResultTest.java | 12 +-
.../runners/direct/DirectRegistrarTest.java | 74 ++++
.../beam/runners/direct/DirectRunnerTest.java | 339 +++++++++++++++++
.../EncodabilityEnforcementFactoryTest.java | 2 +-
.../direct/FlattenEvaluatorFactoryTest.java | 4 +-
.../direct/GroupByKeyEvaluatorFactoryTest.java | 4 +-
.../ImmutabilityCheckingBundleFactoryTest.java | 4 +-
.../ImmutabilityEnforcementFactoryTest.java | 2 +-
.../direct/InMemoryWatermarkManagerTest.java | 4 +-
.../direct/InProcessBundleFactoryTest.java | 4 +-
.../direct/InProcessEvaluationContextTest.java | 10 +-
...ocessGroupByKeyOnlyEvaluatorFactoryTest.java | 4 +-
.../direct/InProcessPipelineRegistrarTest.java | 74 ----
.../direct/InProcessPipelineRunnerTest.java | 339 -----------------
.../direct/ParDoInProcessEvaluatorTest.java | 6 +-
.../direct/ParDoMultiEvaluatorFactoryTest.java | 4 +-
.../direct/ParDoSingleEvaluatorFactoryTest.java | 4 +-
.../direct/TextIOShardedWriteFactoryTest.java | 2 +-
.../runners/direct/TransformExecutorTest.java | 2 +-
.../UnboundedReadEvaluatorFactoryTest.java | 4 +-
.../direct/ViewEvaluatorFactoryTest.java | 4 +-
.../direct/WindowEvaluatorFactoryTest.java | 4 +-
.../translation/TransformTranslatorTest.java | 4 +-
.../beam/sdk/options/PipelineOptions.java | 2 +-
testing/travis/test_wordcount.sh | 4 +-
67 files changed, 1051 insertions(+), 1050 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 8667aee..3010757 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -102,7 +102,7 @@
<systemPropertyVariables>
<beamTestPipelineOptions>
[
- "--runner=org.apache.beam.runners.direct.InProcessPipelineRunner"
+ "--runner=DirectRunner"
]
</beamTestPipelineOptions>
</systemPropertyVariables>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
index 948beb6..2ae0275 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AbstractModelEnforcement.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.util.WindowedValue;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index f15d446..63d248a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.Read.Bounded;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
index a0511df..a546cfb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
@@ -18,8 +18,8 @@
package org.apache.beam.runners.direct;
import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
index 4a42e34..b241493 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CommittedResult.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import com.google.auto.value.AutoValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
index 7c2c068..8ee4b44 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/CompletionCallback.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
/**
* A callback for completing a bundle of input.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
new file mode 100644
index 0000000..3901c04
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectOptions.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.ApplicationNameOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.Hidden;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation.Required;
+import org.apache.beam.sdk.transforms.PTransform;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * Options that can be used to configure the {@link org.apache.beam.runners.direct.DirectRunner}.
+ */
+public interface DirectOptions extends PipelineOptions, ApplicationNameOptions {
+ /**
+ * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService}
+ * to execute {@link PTransform PTransforms}.
+ *
+ * <p>Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that
+ * it cannot enter a state in which it will not schedule additional pending work unless currently
+ * scheduled work completes, as this may cause the {@link Pipeline} to cease processing.
+ *
+ * <p>Defaults to a {@link FixedThreadPoolExecutorServiceFactory}, which produces instances of
+ * {@link Executors#newCachedThreadPool()}.
+ */
+ @JsonIgnore
+ @Required
+ @Hidden
+ @Default.InstanceFactory(FixedThreadPoolExecutorServiceFactory.class)
+ ExecutorServiceFactory getExecutorServiceFactory();
+
+ void setExecutorServiceFactory(ExecutorServiceFactory executorService);
+
+ /**
+ * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the
+ * system time when time values are required by the evaluator.
+ */
+ @Default.InstanceFactory(NanosOffsetClock.Factory.class)
+ @JsonIgnore
+ @Required
+ @Hidden
+ @Description(
+ "The processing time source used by the pipeline. When the current time is "
+ + "needed by the evaluator, the result of clock#now() is used.")
+ Clock getClock();
+
+ void setClock(Clock clock);
+
+ @Default.Boolean(false)
+ @Description(
+ "If the pipeline should shut down producers which have reached the maximum "
+ + "representable watermark. If this is set to true, a pipeline in which all PTransforms "
+ + "have reached the maximum watermark will be shut down, even if there are unbounded "
+ + "sources that could produce additional (late) data. By default, if the pipeline "
+ + "contains any unbounded PCollections, it will run until explicitly shut down.")
+ boolean isShutdownUnboundedProducersWithMaxWatermark();
+
+ void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown);
+
+ @Default.Boolean(true)
+ @Description(
+ "If the pipeline should block awaiting completion of the pipeline. If set to true, "
+ + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, "
+ + "the Pipeline will execute asynchronously. If set to false, the completion of the "
+ + "pipeline can be awaited on by use of DirectPipelineResult#awaitCompletion().")
+ boolean isBlockOnRun();
+
+ void setBlockOnRun(boolean b);
+
+ @Default.Boolean(true)
+ @Description(
+ "Controls whether the runner should ensure that all of the elements of every "
+ + "PCollection are not mutated. PTransforms are not permitted to mutate input elements "
+ + "at any point, or output elements after they are output.")
+ boolean isTestImmutability();
+
+ void setTestImmutability(boolean test);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
new file mode 100644
index 0000000..eb027fa
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRegistrar.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
+ * {@link DirectRunner}.
+ */
+public class DirectRegistrar {
+ private DirectRegistrar() {}
+ /**
+ * Registers the {@link DirectRunner}.
+ */
+ @AutoService(PipelineRunnerRegistrar.class)
+ public static class InProcessRunner implements PipelineRunnerRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
+ return ImmutableList.<Class<? extends PipelineRunner<?>>>of(DirectRunner.class);
+ }
+ }
+
+ /**
+ * Registers the {@link DirectOptions}.
+ */
+ @AutoService(PipelineOptionsRegistrar.class)
+ public static class InProcessOptions implements PipelineOptionsRegistrar {
+ @Override
+ public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
+ return ImmutableList.<Class<? extends PipelineOptions>>of(DirectOptions.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
new file mode 100644
index 0000000..2f5a0bc
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -0,0 +1,371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
+import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
+import org.apache.beam.runners.direct.ViewEvaluatorFactory.InProcessViewOverrideFactory;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.AvroIO;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
+import org.apache.beam.sdk.runners.AggregatorRetrievalException;
+import org.apache.beam.sdk.runners.AggregatorValues;
+import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
+import org.apache.beam.sdk.util.MapAggregatorValues;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.common.Counter;
+import org.apache.beam.sdk.util.common.CounterSet;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
+ * {@link PCollection PCollections}.
+ */
+@Experimental
+public class DirectRunner
+ extends PipelineRunner<DirectPipelineResult> {
+ /**
+ * The default set of transform overrides to use in the {@link DirectRunner}.
+ *
+ * <p>A transform override must have a single-argument constructor that takes an instance of the
+ * type of transform it is overriding.
+ */
+ @SuppressWarnings("rawtypes")
+ private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
+ defaultTransformOverrides =
+ ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
+ .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
+ .put(CreatePCollectionView.class, new InProcessViewOverrideFactory())
+ .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())
+ .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory())
+ .build();
+
+ /**
+ * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
+ * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
+ * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
+ *
+ * @param <T> the type of elements that can be added to this bundle
+ */
+ public static interface UncommittedBundle<T> {
+ /**
+ * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
+ */
+ PCollection<T> getPCollection();
+
+ /**
+ * Outputs an element to this bundle.
+ *
+ * @param element the element to add to this bundle
+ * @return this bundle
+ */
+ UncommittedBundle<T> add(WindowedValue<T> element);
+
+ /**
+ * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle}
+ * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method
+ * will throw an {@link IllegalStateException} if called after a call to commit.
+ * @param synchronizedProcessingTime the synchronized processing time at which this bundle was
+ * committed
+ */
+ CommittedBundle<T> commit(Instant synchronizedProcessingTime);
+ }
+
+ /**
+ * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
+ * eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
+ * that consume the {@link PCollection} this bundle is
+ * a part of at a later point.
+ * @param <T> the type of elements contained within this bundle
+ */
+ public static interface CommittedBundle<T> {
+ /**
+ * Returns the PCollection that the elements of this bundle belong to.
+ */
+ PCollection<T> getPCollection();
+
+ /**
+ * Returns the key that was output in the most recent {@link GroupByKey} in the
+ * execution of this bundle.
+ */
+ StructuralKey<?> getKey();
+
+ /**
+ * Returns an {@link Iterable} containing all of the elements that have been added to this
+ * {@link CommittedBundle}.
+ */
+ Iterable<WindowedValue<T>> getElements();
+
+ /**
+ * Returns the processing time output watermark at the time the producing {@link PTransform}
+ * committed this bundle. Downstream synchronized processing time watermarks cannot progress
+ * past this point before consuming this bundle.
+ *
+ * <p>This value is no greater than the earliest incomplete processing time or synchronized
+ * processing time {@link TimerData timer} at the time this bundle was committed, including any
+ * timers that fired to produce this bundle.
+ */
+ Instant getSynchronizedProcessingOutputWatermark();
+
+ /**
+ * Return a new {@link CommittedBundle} that is like this one, except calls to
+ * {@link #getElements()} will return the provided elements. This bundle is unchanged.
+ *
+ * <p>
+ * The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing
+ * output watermark} of the returned {@link CommittedBundle} is equal to the value returned from
+ * the current bundle. This is used to ensure a {@link PTransform} that could not complete
+ * processing on input elements properly holds the synchronized processing time to the
+ * appropriate value.
+ */
+ CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
+ }
+
+ /**
+ * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
+ * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
+ * @param <ElemT> the type of elements the input {@link PCollection} contains.
+ * @param <ViewT> the type of the PCollectionView this writer writes to.
+ */
+ public static interface PCollectionViewWriter<ElemT, ViewT> {
+ void add(Iterable<WindowedValue<ElemT>> values);
+ }
+
+ ////////////////////////////////////////////////////////////////////////////////////////////////
+ private final DirectOptions options;
+
+ public static DirectRunner fromOptions(PipelineOptions options) {
+ return new DirectRunner(options.as(DirectOptions.class));
+ }
+
+ private DirectRunner(DirectOptions options) {
+ this.options = options;
+ }
+
+ /**
+ * Returns the {@link PipelineOptions} used to create this {@link DirectRunner}.
+ */
+ public DirectOptions getPipelineOptions() {
+ return options;
+ }
+
+ @Override
+ public <OutputT extends POutput, InputT extends PInput> OutputT apply(
+ PTransform<InputT, OutputT> transform, InputT input) {
+ PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass());
+ if (overrideFactory != null) {
+ PTransform<InputT, OutputT> customTransform = overrideFactory.override(transform);
+
+ return super.apply(customTransform, input);
+ }
+ // If there is no override, or we should not apply the override, apply the original transform
+ return super.apply(transform, input);
+ }
+
+ @Override
+ public DirectPipelineResult run(Pipeline pipeline) {
+ ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor();
+ pipeline.traverseTopologically(consumerTrackingVisitor);
+ for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) {
+ unfinalized.finishSpecifying();
+ }
+ @SuppressWarnings("rawtypes")
+ KeyedPValueTrackingVisitor keyedPValueVisitor =
+ KeyedPValueTrackingVisitor.create(
+ ImmutableSet.<Class<? extends PTransform>>of(
+ GroupByKey.class, InProcessGroupByKeyOnly.class));
+ pipeline.traverseTopologically(keyedPValueVisitor);
+
+ DisplayDataValidator.validatePipeline(pipeline);
+
+ InProcessEvaluationContext context =
+ InProcessEvaluationContext.create(
+ getPipelineOptions(),
+ createBundleFactory(getPipelineOptions()),
+ consumerTrackingVisitor.getRootTransforms(),
+ consumerTrackingVisitor.getValueToConsumers(),
+ consumerTrackingVisitor.getStepNames(),
+ consumerTrackingVisitor.getViews());
+
+ // independent executor service for each run
+ ExecutorService executorService =
+ context.getPipelineOptions().getExecutorServiceFactory().create();
+ InProcessExecutor executor =
+ ExecutorServiceParallelExecutor.create(
+ executorService,
+ consumerTrackingVisitor.getValueToConsumers(),
+ keyedPValueVisitor.getKeyedPValues(),
+ TransformEvaluatorRegistry.defaultRegistry(),
+ defaultModelEnforcements(options),
+ context);
+ executor.start(consumerTrackingVisitor.getRootTransforms());
+
+ Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
+ new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
+ DirectPipelineResult result =
+ new DirectPipelineResult(executor, context, aggregatorSteps);
+ if (options.isBlockOnRun()) {
+ try {
+ result.awaitCompletion();
+ } catch (UserCodeException userException) {
+ throw new PipelineExecutionException(userException.getCause());
+ } catch (Throwable t) {
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ }
+ throw new RuntimeException(t);
+ }
+ }
+ return result;
+ }
+
+ private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+ defaultModelEnforcements(DirectOptions options) {
+ ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
+ enforcements = ImmutableMap.builder();
+ Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options);
+ enforcements.put(ParDo.Bound.class, parDoEnforcements);
+ enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
+ return enforcements.build();
+ }
+
+ private Collection<ModelEnforcementFactory> createParDoEnforcements(
+ DirectOptions options) {
+ ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder();
+ if (options.isTestImmutability()) {
+ enforcements.add(ImmutabilityEnforcementFactory.create());
+ }
+ return enforcements.build();
+ }
+
+ private BundleFactory createBundleFactory(DirectOptions pipelineOptions) {
+ BundleFactory bundleFactory = InProcessBundleFactory.create();
+ if (pipelineOptions.isTestImmutability()) {
+ bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
+ }
+ return bundleFactory;
+ }
+
+ /**
+ * The result of running a {@link Pipeline} with the {@link DirectRunner}.
+ *
+ * Throws {@link UnsupportedOperationException} for all methods.
+ */
+ public static class DirectPipelineResult implements PipelineResult {
+ private final InProcessExecutor executor;
+ private final InProcessEvaluationContext evaluationContext;
+ private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;
+ private State state;
+
+ private DirectPipelineResult(
+ InProcessExecutor executor,
+ InProcessEvaluationContext evaluationContext,
+ Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) {
+ this.executor = executor;
+ this.evaluationContext = evaluationContext;
+ this.aggregatorSteps = aggregatorSteps;
+ // Only ever constructed after the executor has started.
+ this.state = State.RUNNING;
+ }
+
+ @Override
+ public State getState() {
+ return state;
+ }
+
+ @Override
+ public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
+ throws AggregatorRetrievalException {
+ CounterSet counters = evaluationContext.getCounters();
+ Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
+ Map<String, T> stepValues = new HashMap<>();
+ for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
+ if (steps.contains(transform.getTransform())) {
+ String stepName =
+ String.format(
+ "user-%s-%s", evaluationContext.getStepName(transform), aggregator.getName());
+ Counter<T> counter = (Counter<T>) counters.getExistingCounter(stepName);
+ if (counter != null) {
+ stepValues.put(transform.getFullName(), counter.getAggregate());
+ }
+ }
+ }
+ return new MapAggregatorValues<>(stepValues);
+ }
+
+ /**
+ * Blocks until the {@link Pipeline} execution represented by this
+ * {@link DirectPipelineResult} is complete, returning the terminal state.
+ *
+ * <p>If the pipeline terminates abnormally by throwing an exception, this will rethrow the
+ * exception. Future calls to {@link #getState()} will return
+ * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}.
+ *
+ * <p>NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED unbounded}
+ * {@link PCollection}, and the {@link PipelineRunner} was created with
+ * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false,
+ * this method will never return.
+ *
+ * See also {@link InProcessExecutor#awaitCompletion()}.
+ */
+ public State awaitCompletion() throws Throwable {
+ if (!state.isTerminal()) {
+ try {
+ executor.awaitCompletion();
+ state = State.DONE;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw e;
+ } catch (Throwable t) {
+ state = State.FAILED;
+ throw t;
+ }
+ }
+ return state;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
index ccf4c2b..bed61ec 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EncodabilityEnforcementFactory.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkArgument;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.util.CoderUtils;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 980d764..14570a5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.direct;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
index 2efaad3..bbe8787 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.Flatten.FlattenPCollectionList;
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the {@link Flatten}
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the {@link Flatten}
* {@link PTransform}.
*/
class FlattenEvaluatorFactory implements TransformEvaluatorFactory {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index 2a965ed..dcbe3d1 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.transforms.DoFn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
index bfecc9d..d121442 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityEnforcementFactory.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.transforms.AppliedPTransform;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
index 95095fa..fb8eb7c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
index 52bc575..0c7449c 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
@@ -19,8 +19,8 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkState;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PCollection;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
index f374f99..bd07040 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleOutputManager.java
@@ -17,8 +17,8 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
@@ -27,7 +27,7 @@ import java.util.Map;
/**
* An {@link OutputManager} that outputs to {@link CommittedBundle Bundles} used by the
- * {@link InProcessPipelineRunner}.
+ * {@link DirectRunner}.
*/
public class InProcessBundleOutputManager implements OutputManager {
private final Map<TupleTag<?>, UncommittedBundle<?>> bundles;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index db8baa0..220ff83 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -22,9 +22,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.FiredTimers;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
@@ -58,11 +58,11 @@ import javax.annotation.Nullable;
/**
* The evaluation context for a specific pipeline being executed by the
- * {@link InProcessPipelineRunner}. Contains state shared within the execution across all
+ * {@link DirectRunner}. Contains state shared within the execution across all
* transforms.
*
* <p>{@link InProcessEvaluationContext} contains shared state for an execution of the
- * {@link InProcessPipelineRunner} that can be used while evaluating a {@link PTransform}. This
+ * {@link DirectRunner} that can be used while evaluating a {@link PTransform}. This
* consists of views into underlying state and watermark implementations, access to read and write
* {@link PCollectionView PCollectionViews}, and constructing {@link CounterSet CounterSets} and
* {@link ExecutionContext ExecutionContexts}. This includes executing callbacks asynchronously when
@@ -79,7 +79,7 @@ class InProcessEvaluationContext {
private final Map<AppliedPTransform<?, ?, ?>, String> stepNames;
/** The options that were used to create this {@link Pipeline}. */
- private final InProcessPipelineOptions options;
+ private final DirectOptions options;
private final BundleFactory bundleFactory;
/** The current processing time and event time watermarks and timers. */
@@ -97,7 +97,7 @@ class InProcessEvaluationContext {
private final CounterSet mergedCounters;
public static InProcessEvaluationContext create(
- InProcessPipelineOptions options,
+ DirectOptions options,
BundleFactory bundleFactory,
Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
@@ -108,7 +108,7 @@ class InProcessEvaluationContext {
}
private InProcessEvaluationContext(
- InProcessPipelineOptions options,
+ DirectOptions options,
BundleFactory bundleFactory,
Collection<AppliedPTransform<?, ?, ?>> rootTransforms,
Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
@@ -295,7 +295,7 @@ class InProcessEvaluationContext {
/**
* Get the options used by this {@link Pipeline}.
*/
- public InProcessPipelineOptions getPipelineOptions() {
+ public DirectOptions getPipelineOptions() {
return options;
}
@@ -389,7 +389,7 @@ class InProcessEvaluationContext {
*
* <p>If the provided transform produces any {@link IsBounded#UNBOUNDED}
* {@link PCollection PCollections}, returns the value of
- * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
+ * {@link DirectOptions#isShutdownUnboundedProducersWithMaxWatermark()}.
*/
public boolean isDone(AppliedPTransform<?, ?, ?> transform) {
// if the PTransform's watermark isn't at the max value, it isn't done
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
index 4f10b3a..d2558ce 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.util.TimerInternals;
import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
/**
- * Execution Context for the {@link InProcessPipelineRunner}.
+ * Execution Context for the {@link DirectRunner}.
*
* This implementation is not thread safe. A new {@link InProcessExecutionContext} must be created
* for each thread that requires it.
@@ -51,7 +51,7 @@ class InProcessExecutionContext
}
/**
- * Step Context for the {@link InProcessPipelineRunner}.
+ * Step Context for the {@link DirectRunner}.
*/
public class InProcessStepContext
extends org.apache.beam.sdk.util.BaseExecutionContext.StepContext {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
index d811e1b..1cfa544 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutor.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
index 5ded8b6..53b93d0 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupAlsoByWindowEvaluatorFactory.java
@@ -19,7 +19,7 @@ package org.apache.beam.runners.direct;
import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupAlsoByWindow;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
@@ -41,7 +41,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Collections;
/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
* {@link GroupByKeyOnly} {@link PTransform}.
*/
class InProcessGroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
index a10d496..3604bca 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
@@ -22,8 +22,8 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
import static com.google.common.base.Preconditions.checkState;
import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.runners.direct.StepTransformResult.Builder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -45,7 +45,7 @@ import java.util.List;
import java.util.Map;
/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
* {@link GroupByKeyOnly} {@link PTransform}.
*/
class InProcessGroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
deleted file mode 100644
index 0498521..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineOptions.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.options.ApplicationNameOptions;
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.options.Hidden;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.Validation.Required;
-import org.apache.beam.sdk.transforms.PTransform;
-
-import com.fasterxml.jackson.annotation.JsonIgnore;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
-/**
- * Options that can be used to configure the {@link InProcessPipelineRunner}.
- */
-public interface InProcessPipelineOptions extends PipelineOptions, ApplicationNameOptions {
- /**
- * Gets the {@link ExecutorServiceFactory} to use to create instances of {@link ExecutorService}
- * to execute {@link PTransform PTransforms}.
- *
- * <p>Note that {@link ExecutorService ExecutorServices} returned by the factory must ensure that
- * it cannot enter a state in which it will not schedule additional pending work unless currently
- * scheduled work completes, as this may cause the {@link Pipeline} to cease processing.
- *
- * <p>Defaults to a {@link FixedThreadPoolExecutorServiceFactory}, which produces instances of
- * {@link Executors#newCachedThreadPool()}.
- */
- @JsonIgnore
- @Required
- @Hidden
- @Default.InstanceFactory(FixedThreadPoolExecutorServiceFactory.class)
- ExecutorServiceFactory getExecutorServiceFactory();
-
- void setExecutorServiceFactory(ExecutorServiceFactory executorService);
-
- /**
- * Gets the {@link Clock} used by this pipeline. The clock is used in place of accessing the
- * system time when time values are required by the evaluator.
- */
- @Default.InstanceFactory(NanosOffsetClock.Factory.class)
- @JsonIgnore
- @Required
- @Hidden
- @Description(
- "The processing time source used by the pipeline. When the current time is "
- + "needed by the evaluator, the result of clock#now() is used.")
- Clock getClock();
-
- void setClock(Clock clock);
-
- @Default.Boolean(false)
- @Description(
- "If the pipeline should shut down producers which have reached the maximum "
- + "representable watermark. If this is set to true, a pipeline in which all PTransforms "
- + "have reached the maximum watermark will be shut down, even if there are unbounded "
- + "sources that could produce additional (late) data. By default, if the pipeline "
- + "contains any unbounded PCollections, it will run until explicitly shut down.")
- boolean isShutdownUnboundedProducersWithMaxWatermark();
-
- void setShutdownUnboundedProducersWithMaxWatermark(boolean shutdown);
-
- @Default.Boolean(true)
- @Description(
- "If the pipeline should block awaiting completion of the pipeline. If set to true, "
- + "a call to Pipeline#run() will block until all PTransforms are complete. Otherwise, "
- + "the Pipeline will execute asynchronously. If set to false, the completion of the "
- + "pipeline can be awaited on by use of InProcessPipelineResult#awaitCompletion().")
- boolean isBlockOnRun();
-
- void setBlockOnRun(boolean b);
-
- @Default.Boolean(true)
- @Description(
- "Controls whether the runner should ensure that all of the elements of every "
- + "PCollection are not mutated. PTransforms are not permitted to mutate input elements "
- + "at any point, or output elements after they are output.")
- boolean isTestImmutability();
-
- void setTestImmutability(boolean test);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
deleted file mode 100644
index 8847c58..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.runners.direct.InProcessGroupByKey.InProcessGroupByKeyOnly;
-import org.apache.beam.runners.direct.ViewEvaluatorFactory.InProcessViewOverrideFactory;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.AggregatorPipelineExtractor;
-import org.apache.beam.sdk.runners.AggregatorRetrievalException;
-import org.apache.beam.sdk.runners.AggregatorValues;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
-import org.apache.beam.sdk.util.MapAggregatorValues;
-import org.apache.beam.sdk.util.TimerInternals.TimerData;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-
-/**
- * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
- * {@link PCollection PCollections}.
- */
-@Experimental
-public class InProcessPipelineRunner
- extends PipelineRunner<InProcessPipelineRunner.InProcessPipelineResult> {
- /**
- * The default set of transform overrides to use in the {@link InProcessPipelineRunner}.
- *
- * <p>A transform override must have a single-argument constructor that takes an instance of the
- * type of transform it is overriding.
- */
- @SuppressWarnings("rawtypes")
- private static Map<Class<? extends PTransform>, PTransformOverrideFactory>
- defaultTransformOverrides =
- ImmutableMap.<Class<? extends PTransform>, PTransformOverrideFactory>builder()
- .put(GroupByKey.class, new InProcessGroupByKeyOverrideFactory())
- .put(CreatePCollectionView.class, new InProcessViewOverrideFactory())
- .put(AvroIO.Write.Bound.class, new AvroIOShardedWriteFactory())
- .put(TextIO.Write.Bound.class, new TextIOShardedWriteFactory())
- .build();
-
- /**
- * Part of a {@link PCollection}. Elements are output to a bundle, which will cause them to be
- * executed by {@link PTransform PTransforms} that consume the {@link PCollection} this bundle is
- * a part of at a later point. This is an uncommitted bundle and can have elements added to it.
- *
- * @param <T> the type of elements that can be added to this bundle
- */
- public static interface UncommittedBundle<T> {
- /**
- * Returns the PCollection that the elements of this {@link UncommittedBundle} belong to.
- */
- PCollection<T> getPCollection();
-
- /**
- * Outputs an element to this bundle.
- *
- * @param element the element to add to this bundle
- * @return this bundle
- */
- UncommittedBundle<T> add(WindowedValue<T> element);
-
- /**
- * Commits this {@link UncommittedBundle}, returning an immutable {@link CommittedBundle}
- * containing all of the elements that were added to it. The {@link #add(WindowedValue)} method
- * will throw an {@link IllegalStateException} if called after a call to commit.
- * @param synchronizedProcessingTime the synchronized processing time at which this bundle was
- * committed
- */
- CommittedBundle<T> commit(Instant synchronizedProcessingTime);
- }
-
- /**
- * Part of a {@link PCollection}. Elements are output to an {@link UncommittedBundle}, which will
- * eventually committed. Committed elements are executed by the {@link PTransform PTransforms}
- * that consume the {@link PCollection} this bundle is
- * a part of at a later point.
- * @param <T> the type of elements contained within this bundle
- */
- public static interface CommittedBundle<T> {
- /**
- * Returns the PCollection that the elements of this bundle belong to.
- */
- PCollection<T> getPCollection();
-
- /**
- * Returns the key that was output in the most recent {@link GroupByKey} in the
- * execution of this bundle.
- */
- StructuralKey<?> getKey();
-
- /**
- * Returns an {@link Iterable} containing all of the elements that have been added to this
- * {@link CommittedBundle}.
- */
- Iterable<WindowedValue<T>> getElements();
-
- /**
- * Returns the processing time output watermark at the time the producing {@link PTransform}
- * committed this bundle. Downstream synchronized processing time watermarks cannot progress
- * past this point before consuming this bundle.
- *
- * <p>This value is no greater than the earliest incomplete processing time or synchronized
- * processing time {@link TimerData timer} at the time this bundle was committed, including any
- * timers that fired to produce this bundle.
- */
- Instant getSynchronizedProcessingOutputWatermark();
-
- /**
- * Return a new {@link CommittedBundle} that is like this one, except calls to
- * {@link #getElements()} will return the provided elements. This bundle is unchanged.
- *
- * <p>
- * The value of the {@link #getSynchronizedProcessingOutputWatermark() synchronized processing
- * output watermark} of the returned {@link CommittedBundle} is equal to the value returned from
- * the current bundle. This is used to ensure a {@link PTransform} that could not complete
- * processing on input elements properly holds the synchronized processing time to the
- * appropriate value.
- */
- CommittedBundle<T> withElements(Iterable<WindowedValue<T>> elements);
- }
-
- /**
- * A {@link PCollectionViewWriter} is responsible for writing contents of a {@link PCollection} to
- * a storage mechanism that can be read from while constructing a {@link PCollectionView}.
- * @param <ElemT> the type of elements the input {@link PCollection} contains.
- * @param <ViewT> the type of the PCollectionView this writer writes to.
- */
- public static interface PCollectionViewWriter<ElemT, ViewT> {
- void add(Iterable<WindowedValue<ElemT>> values);
- }
-
- ////////////////////////////////////////////////////////////////////////////////////////////////
- private final InProcessPipelineOptions options;
-
- public static InProcessPipelineRunner fromOptions(PipelineOptions options) {
- return new InProcessPipelineRunner(options.as(InProcessPipelineOptions.class));
- }
-
- private InProcessPipelineRunner(InProcessPipelineOptions options) {
- this.options = options;
- }
-
- /**
- * Returns the {@link PipelineOptions} used to create this {@link InProcessPipelineRunner}.
- */
- public InProcessPipelineOptions getPipelineOptions() {
- return options;
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- PTransformOverrideFactory overrideFactory = defaultTransformOverrides.get(transform.getClass());
- if (overrideFactory != null) {
- PTransform<InputT, OutputT> customTransform = overrideFactory.override(transform);
-
- return super.apply(customTransform, input);
- }
- // If there is no override, or we should not apply the override, apply the original transform
- return super.apply(transform, input);
- }
-
- @Override
- public InProcessPipelineResult run(Pipeline pipeline) {
- ConsumerTrackingPipelineVisitor consumerTrackingVisitor = new ConsumerTrackingPipelineVisitor();
- pipeline.traverseTopologically(consumerTrackingVisitor);
- for (PValue unfinalized : consumerTrackingVisitor.getUnfinalizedPValues()) {
- unfinalized.finishSpecifying();
- }
- @SuppressWarnings("rawtypes")
- KeyedPValueTrackingVisitor keyedPValueVisitor =
- KeyedPValueTrackingVisitor.create(
- ImmutableSet.<Class<? extends PTransform>>of(
- GroupByKey.class, InProcessGroupByKeyOnly.class));
- pipeline.traverseTopologically(keyedPValueVisitor);
-
- DisplayDataValidator.validatePipeline(pipeline);
-
- InProcessEvaluationContext context =
- InProcessEvaluationContext.create(
- getPipelineOptions(),
- createBundleFactory(getPipelineOptions()),
- consumerTrackingVisitor.getRootTransforms(),
- consumerTrackingVisitor.getValueToConsumers(),
- consumerTrackingVisitor.getStepNames(),
- consumerTrackingVisitor.getViews());
-
- // independent executor service for each run
- ExecutorService executorService =
- context.getPipelineOptions().getExecutorServiceFactory().create();
- InProcessExecutor executor =
- ExecutorServiceParallelExecutor.create(
- executorService,
- consumerTrackingVisitor.getValueToConsumers(),
- keyedPValueVisitor.getKeyedPValues(),
- TransformEvaluatorRegistry.defaultRegistry(),
- defaultModelEnforcements(options),
- context);
- executor.start(consumerTrackingVisitor.getRootTransforms());
-
- Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps =
- new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
- InProcessPipelineResult result =
- new InProcessPipelineResult(executor, context, aggregatorSteps);
- if (options.isBlockOnRun()) {
- try {
- result.awaitCompletion();
- } catch (UserCodeException userException) {
- throw new PipelineExecutionException(userException.getCause());
- } catch (Throwable t) {
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
- }
- throw new RuntimeException(t);
- }
- }
- return result;
- }
-
- private Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
- defaultModelEnforcements(InProcessPipelineOptions options) {
- ImmutableMap.Builder<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
- enforcements = ImmutableMap.builder();
- Collection<ModelEnforcementFactory> parDoEnforcements = createParDoEnforcements(options);
- enforcements.put(ParDo.Bound.class, parDoEnforcements);
- enforcements.put(ParDo.BoundMulti.class, parDoEnforcements);
- return enforcements.build();
- }
-
- private Collection<ModelEnforcementFactory> createParDoEnforcements(
- InProcessPipelineOptions options) {
- ImmutableList.Builder<ModelEnforcementFactory> enforcements = ImmutableList.builder();
- if (options.isTestImmutability()) {
- enforcements.add(ImmutabilityEnforcementFactory.create());
- }
- return enforcements.build();
- }
-
- private BundleFactory createBundleFactory(InProcessPipelineOptions pipelineOptions) {
- BundleFactory bundleFactory = InProcessBundleFactory.create();
- if (pipelineOptions.isTestImmutability()) {
- bundleFactory = ImmutabilityCheckingBundleFactory.create(bundleFactory);
- }
- return bundleFactory;
- }
-
- /**
- * The result of running a {@link Pipeline} with the {@link InProcessPipelineRunner}.
- *
- * Throws {@link UnsupportedOperationException} for all methods.
- */
- public static class InProcessPipelineResult implements PipelineResult {
- private final InProcessExecutor executor;
- private final InProcessEvaluationContext evaluationContext;
- private final Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps;
- private State state;
-
- private InProcessPipelineResult(
- InProcessExecutor executor,
- InProcessEvaluationContext evaluationContext,
- Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps) {
- this.executor = executor;
- this.evaluationContext = evaluationContext;
- this.aggregatorSteps = aggregatorSteps;
- // Only ever constructed after the executor has started.
- this.state = State.RUNNING;
- }
-
- @Override
- public State getState() {
- return state;
- }
-
- @Override
- public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
- throws AggregatorRetrievalException {
- CounterSet counters = evaluationContext.getCounters();
- Collection<PTransform<?, ?>> steps = aggregatorSteps.get(aggregator);
- Map<String, T> stepValues = new HashMap<>();
- for (AppliedPTransform<?, ?, ?> transform : evaluationContext.getSteps()) {
- if (steps.contains(transform.getTransform())) {
- String stepName =
- String.format(
- "user-%s-%s", evaluationContext.getStepName(transform), aggregator.getName());
- Counter<T> counter = (Counter<T>) counters.getExistingCounter(stepName);
- if (counter != null) {
- stepValues.put(transform.getFullName(), counter.getAggregate());
- }
- }
- }
- return new MapAggregatorValues<>(stepValues);
- }
-
- /**
- * Blocks until the {@link Pipeline} execution represented by this
- * {@link InProcessPipelineResult} is complete, returning the terminal state.
- *
- * <p>If the pipeline terminates abnormally by throwing an exception, this will rethrow the
- * exception. Future calls to {@link #getState()} will return
- * {@link org.apache.beam.sdk.PipelineResult.State#FAILED}.
- *
- * <p>NOTE: if the {@link Pipeline} contains an {@link IsBounded#UNBOUNDED unbounded}
- * {@link PCollection}, and the {@link PipelineRunner} was created with
- * {@link InProcessPipelineOptions#isShutdownUnboundedProducersWithMaxWatermark()} set to false,
- * this method will never return.
- *
- * See also {@link InProcessExecutor#awaitCompletion()}.
- */
- public State awaitCompletion() throws Throwable {
- if (!state.isTerminal()) {
- try {
- executor.awaitCompletion();
- state = State.DONE;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw e;
- } catch (Throwable t) {
- state = State.FAILED;
- throw t;
- }
- }
- return state;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
deleted file mode 100644
index 4a09de7..0000000
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessRegistrar.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.direct;
-
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-import org.apache.beam.sdk.runners.PipelineRunner;
-import org.apache.beam.sdk.runners.PipelineRunnerRegistrar;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
-/**
- * Contains the {@link PipelineRunnerRegistrar} and {@link PipelineOptionsRegistrar} for the
- * {@link InProcessPipelineRunner}.
- */
-public class InProcessRegistrar {
- private InProcessRegistrar() {}
- /**
- * Registers the {@link InProcessPipelineRunner}.
- */
- @AutoService(PipelineRunnerRegistrar.class)
- public static class InProcessRunner implements PipelineRunnerRegistrar {
- @Override
- public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>>of(InProcessPipelineRunner.class);
- }
- }
-
- /**
- * Registers the {@link InProcessPipelineOptions}.
- */
- @AutoService(PipelineOptionsRegistrar.class)
- public static class InProcessOptions implements PipelineOptionsRegistrar {
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>of(InProcessPipelineOptions.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
index 0bc3ea1..92127b4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessTransformResult.java
@@ -18,7 +18,7 @@
package org.apache.beam.runners.direct;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
index cc9b6da..758ee24 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcement.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
index 6162ba0..e0bbfcb 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ModelEnforcementFactory.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
index a3e2f18..b9f4808 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoInProcessEvaluator.java
@@ -18,8 +18,8 @@
package org.apache.beam.runners.direct;
import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepContext;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.util.DoFnRunner;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
index 8945242..58d6f00 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactory.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -32,7 +32,7 @@ import com.google.common.cache.LoadingCache;
import java.util.Map;
/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
* {@link BoundMulti} primitive {@link PTransform}.
*/
class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
index 15704d7..afbb6ed 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactory.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
@@ -32,7 +32,7 @@ import com.google.common.collect.ImmutableMap;
import java.util.Collections;
/**
- * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
* {@link Bound ParDo.Bound} primitive {@link PTransform}.
*/
class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
index aef62b2..ba792d3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/PassthroughTransformEvaluator.java
@@ -17,7 +17,7 @@
*/
package org.apache.beam.runners.direct;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.util.WindowedValue;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9400fc9a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
index b2e3897..eacea91 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepTransformResult.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.direct;
import static com.google.common.base.Preconditions.checkNotNull;
import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
-import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
import org.apache.beam.sdk.transforms.AppliedPTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;