You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/20 22:15:55 UTC
[02/50] [abbrv] incubator-beam git commit: Remove the
DirectPipelineRunner from the Core SDK
Remove the DirectPipelineRunner from the Core SDK
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99654ca4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99654ca4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99654ca4
Branch: refs/heads/python-sdk
Commit: 99654ca4bed6758d7128d0f0ad376e8b479d4eba
Parents: 45e57e0
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 14 17:52:49 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Mon Jun 20 15:14:28 2016 -0700
----------------------------------------------------------------------
.../examples/common/DataflowExampleUtils.java | 11 +-
runners/spark/pom.xml | 6 +
.../translation/TransformTranslatorTest.java | 4 +-
.../java/org/apache/beam/sdk/io/PubsubIO.java | 1 -
.../main/java/org/apache/beam/sdk/io/Read.java | 44 -
.../java/org/apache/beam/sdk/io/TextIO.java | 1 -
.../beam/sdk/options/DirectPipelineOptions.java | 1 -
.../sdk/runners/DirectPipelineRegistrar.java | 55 -
.../beam/sdk/runners/DirectPipelineRunner.java | 1298 ------------------
.../org/apache/beam/sdk/transforms/Flatten.java | 32 -
.../org/apache/beam/sdk/transforms/ParDo.java | 302 +---
.../org/apache/beam/sdk/transforms/View.java | 24 -
.../sdk/util/DirectModeExecutionContext.java | 130 --
.../apache/beam/sdk/util/DoFnRunnerBase.java | 1 -
.../java/org/apache/beam/sdk/PipelineTest.java | 4 +-
.../io/BoundedReadFromUnboundedSourceTest.java | 6 -
.../runners/DirectPipelineRegistrarTest.java | 71 -
.../sdk/runners/DirectPipelineRunnerTest.java | 222 ---
.../beam/sdk/runners/PipelineRunnerTest.java | 9 +-
.../apache/beam/sdk/transforms/CombineTest.java | 21 -
.../beam/sdk/transforms/GroupByKeyTest.java | 13 +-
.../apache/beam/sdk/transforms/ViewTest.java | 29 +-
.../main/java/common/DataflowExampleUtils.java | 13 +-
testing/travis/test_wordcount.sh | 4 +-
24 files changed, 40 insertions(+), 2262 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
index fb4f3bf..a0b7319 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/DataflowExampleUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.examples.common;
+import org.apache.beam.runners.dataflow.BlockingDataflowPipelineRunner;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowPipelineRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -25,7 +26,7 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.BigQueryOptions;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
+import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.IntraBundleParallelization;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
@@ -315,11 +316,13 @@ public class DataflowExampleUtils {
/**
* Do some runner setup: check that the DirectPipelineRunner is not used in conjunction with
- * streaming, and if streaming is specified, use the DataflowPipelineRunner. Return the streaming
- * flag value.
+ * streaming, and if streaming is specified, use the DataflowPipelineRunner.
*/
public void setupRunner() {
- if (options.isStreaming() && options.getRunner() != DirectPipelineRunner.class) {
+ Class<? extends PipelineRunner<?>> runner = options.getRunner();
+ if (options.isStreaming()
+ && (runner.equals(DataflowPipelineRunner.class)
+ || runner.equals(BlockingDataflowPipelineRunner.class))) {
// In order to cancel the pipelines automatically,
// {@literal DataflowPipelineRunner} is forced to be used.
options.setRunner(DataflowPipelineRunner.class);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/runners/spark/pom.xml
----------------------------------------------------------------------
diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml
index 4110689..e7d0834 100644
--- a/runners/spark/pom.xml
+++ b/runners/spark/pom.xml
@@ -118,6 +118,12 @@
<artifactId>hamcrest-all</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.beam</groupId>
+ <artifactId>beam-runners-direct-java</artifactId>
+ <version>0.2.0-incubating-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
index 4ef26d3..01f3070 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/TransformTranslatorTest.java
@@ -21,12 +21,12 @@ package org.apache.beam.runners.spark.translation;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertThat;
+import org.apache.beam.runners.direct.InProcessPipelineRunner;
import org.apache.beam.runners.spark.SparkPipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.values.PCollection;
@@ -58,7 +58,7 @@ public class TransformTranslatorTest {
*/
@Test
public void testTextIOReadAndWriteTransforms() throws IOException {
- String directOut = runPipeline(DirectPipelineRunner.class);
+ String directOut = runPipeline(InProcessPipelineRunner.class);
String sparkOut = runPipeline(SparkPipelineRunner.class);
List<String> directOutput =
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
index 7e24253..2a5698c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java
@@ -23,7 +23,6 @@ import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.options.PubsubOptions;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index fb40063..c0440f2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -20,11 +20,9 @@ package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.util.StringUtils.approximateSimpleName;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -32,9 +30,6 @@ import org.apache.beam.sdk.values.PInput;
import org.joda.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-
import javax.annotation.Nullable;
/**
@@ -153,45 +148,6 @@ public class Read {
.withLabel("Read Source"))
.include(source);
}
-
- static {
- registerDefaultTransformEvaluator();
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- private static void registerDefaultTransformEvaluator() {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- Bounded.class,
- new DirectPipelineRunner.TransformEvaluator<Bounded>() {
- @Override
- public void evaluate(
- Bounded transform, DirectPipelineRunner.EvaluationContext context) {
- evaluateReadHelper(transform, context);
- }
-
- private <T> void evaluateReadHelper(
- Read.Bounded<T> transform, DirectPipelineRunner.EvaluationContext context) {
- try {
- List<DirectPipelineRunner.ValueWithMetadata<T>> output = new ArrayList<>();
- BoundedSource<T> source = transform.getSource();
- try (BoundedSource.BoundedReader<T> reader =
- source.createReader(context.getPipelineOptions())) {
- for (boolean available = reader.start();
- available;
- available = reader.advance()) {
- output.add(
- DirectPipelineRunner.ValueWithMetadata.of(
- WindowedValue.timestampedValueInGlobalWindow(
- reader.getCurrent(), reader.getCurrentTimestamp())));
- }
- }
- context.setPCollectionValuesWithMetadata(context.getOutput(transform), output);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- });
- }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 13cb45e..bbef072 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -25,7 +25,6 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.IOChannelUtils;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
index 4cdc0ca..c2095e3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/DirectPipelineOptions.java
@@ -18,7 +18,6 @@
package org.apache.beam.sdk.options;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.values.PCollection;
import com.fasterxml.jackson.annotation.JsonIgnore;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java
deleted file mode 100644
index 7dd0fdd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRegistrar.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import org.apache.beam.sdk.options.DirectPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
-
-import com.google.auto.service.AutoService;
-import com.google.common.collect.ImmutableList;
-
-/**
- * Contains the {@link PipelineOptionsRegistrar} and {@link PipelineRunnerRegistrar} for
- * the {@link DirectPipeline}.
- */
-public class DirectPipelineRegistrar {
- private DirectPipelineRegistrar() { }
-
- /**
- * Register the {@link DirectPipelineRunner}.
- */
- @AutoService(PipelineRunnerRegistrar.class)
- public static class Runner implements PipelineRunnerRegistrar {
- @Override
- public Iterable<Class<? extends PipelineRunner<?>>> getPipelineRunners() {
- return ImmutableList.<Class<? extends PipelineRunner<?>>>of(DirectPipelineRunner.class);
- }
- }
-
- /**
- * Register the {@link DirectPipelineOptions}.
- */
- @AutoService(PipelineOptionsRegistrar.class)
- public static class Options implements PipelineOptionsRegistrar {
- @Override
- public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
- return ImmutableList.<Class<? extends PipelineOptions>>of(DirectPipelineOptions.class);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
deleted file mode 100644
index 1eb25c5..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/DirectPipelineRunner.java
+++ /dev/null
@@ -1,1298 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.runners;
-
-import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkState;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.ListCoder;
-import org.apache.beam.sdk.io.AvroIO;
-import org.apache.beam.sdk.io.FileBasedSink;
-import org.apache.beam.sdk.io.TextIO;
-import org.apache.beam.sdk.options.DirectPipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.Partition;
-import org.apache.beam.sdk.transforms.Partition.PartitionFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AppliedCombineFn;
-import org.apache.beam.sdk.util.AssignWindows;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly;
-import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
-import org.apache.beam.sdk.util.IOChannelUtils;
-import org.apache.beam.sdk.util.MapAggregatorValues;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
-import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
-import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.common.Counter;
-import org.apache.beam.sdk.util.common.CounterSet;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.PDone;
-import org.apache.beam.sdk.values.PInput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TypedPValue;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Lists;
-
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-/**
- * Executes the operations in the pipeline directly, in this process, without
- * any optimization. Useful for small local execution and tests.
- *
- * <p>Throws an exception from {@link #run} if execution fails.
- *
- * <p><h3>Permissions</h3>
- * When reading from a Dataflow source or writing to a Dataflow sink using
- * {@code DirectPipelineRunner}, the Cloud Platform account that you configured with the
- * <a href="https://cloud.google.com/sdk/gcloud">gcloud</a> executable will need access to the
- * corresponding source/sink.
- *
- * <p>Please see <a href="https://cloud.google.com/dataflow/security-and-permissions">Google Cloud
- * Dataflow Security and Permissions</a> for more details.
- */
-@SuppressWarnings({"rawtypes", "unchecked"})
-public class DirectPipelineRunner
- extends PipelineRunner<DirectPipelineRunner.EvaluationResults> {
- private static final Logger LOG = LoggerFactory.getLogger(DirectPipelineRunner.class);
-
- /**
- * A source of random data, which can be seeded if determinism is desired.
- */
- private Random rand;
-
- /**
- * A map from PTransform class to the corresponding
- * TransformEvaluator to use to evaluate that transform.
- *
- * <p>A static map that contains system-wide defaults.
- */
- private static Map<Class, TransformEvaluator> defaultTransformEvaluators =
- new HashMap<>();
-
- /**
- * A map from PTransform class to the corresponding
- * TransformEvaluator to use to evaluate that transform.
- *
- * <p>An instance map that contains bindings for this DirectPipelineRunner.
- * Bindings in this map override those in the default map.
- */
- private Map<Class, TransformEvaluator> localTransformEvaluators =
- new HashMap<>();
-
- /**
- * Records that instances of the specified PTransform class
- * should be evaluated by default by the corresponding
- * TransformEvaluator.
- */
- public static <TransformT extends PTransform<?, ?>>
- void registerDefaultTransformEvaluator(
- Class<TransformT> transformClass,
- TransformEvaluator<? super TransformT> transformEvaluator) {
- if (defaultTransformEvaluators.put(transformClass, transformEvaluator)
- != null) {
- throw new IllegalArgumentException(
- "defining multiple evaluators for " + transformClass);
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * Records that instances of the specified PTransform class
- * should be evaluated by the corresponding TransformEvaluator.
- * Overrides any bindings specified by
- * {@link #registerDefaultTransformEvaluator}.
- */
- public <TransformT extends PTransform<?, ?>>
- void registerTransformEvaluator(
- Class<TransformT> transformClass,
- TransformEvaluator<TransformT> transformEvaluator) {
- if (localTransformEvaluators.put(transformClass, transformEvaluator)
- != null) {
- throw new IllegalArgumentException(
- "defining multiple evaluators for " + transformClass);
- }
- }
-
- /**
- * Returns the TransformEvaluator to use for instances of the
- * specified PTransform class, or null if none registered.
- */
- public <TransformT extends PTransform<?, ?>>
- TransformEvaluator<TransformT> getTransformEvaluator(Class<TransformT> transformClass) {
- TransformEvaluator<TransformT> transformEvaluator =
- localTransformEvaluators.get(transformClass);
- if (transformEvaluator == null) {
- transformEvaluator = defaultTransformEvaluators.get(transformClass);
- }
- return transformEvaluator;
- }
-
- /**
- * Constructs a DirectPipelineRunner from the given options.
- */
- public static DirectPipelineRunner fromOptions(PipelineOptions options) {
- DirectPipelineOptions directOptions =
- PipelineOptionsValidator.validate(DirectPipelineOptions.class, options);
- LOG.debug("Creating DirectPipelineRunner");
- return new DirectPipelineRunner(directOptions);
- }
-
- /**
- * Enable runtime testing to verify that all functions and {@link Coder}
- * instances can be serialized.
- *
- * <p>Enabled by default.
- *
- * <p>This method modifies the {@code DirectPipelineRunner} instance and
- * returns itself.
- */
- public DirectPipelineRunner withSerializabilityTesting(boolean enable) {
- this.testSerializability = enable;
- return this;
- }
-
- /**
- * Enable runtime testing to verify that all values can be encoded.
- *
- * <p>Enabled by default.
- *
- * <p>This method modifies the {@code DirectPipelineRunner} instance and
- * returns itself.
- */
- public DirectPipelineRunner withEncodabilityTesting(boolean enable) {
- this.testEncodability = enable;
- return this;
- }
-
- /**
- * Enable runtime testing to verify that functions do not depend on order
- * of the elements.
- *
- * <p>This is accomplished by randomizing the order of elements.
- *
- * <p>Enabled by default.
- *
- * <p>This method modifies the {@code DirectPipelineRunner} instance and
- * returns itself.
- */
- public DirectPipelineRunner withUnorderednessTesting(boolean enable) {
- this.testUnorderedness = enable;
- return this;
- }
-
- @Override
- public <OutputT extends POutput, InputT extends PInput> OutputT apply(
- PTransform<InputT, OutputT> transform, InputT input) {
- if (transform instanceof Combine.GroupedValues) {
- return (OutputT) applyTestCombine((Combine.GroupedValues) transform, (PCollection) input);
- } else if (transform instanceof TextIO.Write.Bound) {
- return (OutputT) applyTextIOWrite((TextIO.Write.Bound) transform, (PCollection<?>) input);
- } else if (transform instanceof AvroIO.Write.Bound) {
- return (OutputT) applyAvroIOWrite((AvroIO.Write.Bound) transform, (PCollection<?>) input);
- } else if (transform instanceof GroupByKey) {
- return (OutputT)
- ((PCollection) input).apply(new GroupByKeyViaGroupByKeyOnly((GroupByKey) transform));
- } else if (transform instanceof Window.Bound) {
- return (OutputT)
- ((PCollection) input).apply(new AssignWindowsAndSetStrategy((Window.Bound) transform));
- } else {
- return super.apply(transform, input);
- }
- }
-
- private <K, InputT, AccumT, OutputT> PCollection<KV<K, OutputT>> applyTestCombine(
- Combine.GroupedValues<K, InputT, OutputT> transform,
- PCollection<KV<K, Iterable<InputT>>> input) {
-
- PCollection<KV<K, OutputT>> output = input
- .apply(ParDo.of(TestCombineDoFn.create(transform, input, testSerializability, rand))
- .withSideInputs(transform.getSideInputs()));
-
- try {
- output.setCoder(transform.getDefaultOutputCoder(input));
- } catch (CannotProvideCoderException exc) {
- // let coder inference occur later, if it can
- }
- return output;
- }
-
- private static class ElementProcessingOrderPartitionFn<T> implements PartitionFn<T> {
- private int elementNumber;
- @Override
- public int partitionFor(T elem, int numPartitions) {
- return elementNumber++ % numPartitions;
- }
- }
-
- /**
- * Applies TextIO.Write honoring user requested sharding controls (i.e. withNumShards)
- * by applying a partition function based upon the number of shards the user requested.
- */
- private static class DirectTextIOWrite<T> extends PTransform<PCollection<T>, PDone> {
- private final TextIO.Write.Bound<T> transform;
-
- private DirectTextIOWrite(TextIO.Write.Bound<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PDone apply(PCollection<T> input) {
- checkState(transform.getNumShards() > 1,
- "DirectTextIOWrite is expected to only be used when sharding controls are required.");
-
- // Evenly distribute all the elements across the partitions.
- PCollectionList<T> partitionedElements =
- input.apply(Partition.of(transform.getNumShards(),
- new ElementProcessingOrderPartitionFn<T>()));
-
- // For each input PCollection partition, create a write transform that represents
- // one of the specific shards.
- for (int i = 0; i < transform.getNumShards(); ++i) {
- /*
- * This logic mirrors the file naming strategy within
- * {@link FileBasedSink#generateDestinationFilenames()}
- */
- String outputFilename = IOChannelUtils.constructName(
- transform.getFilenamePrefix(),
- transform.getShardNameTemplate(),
- getFileExtension(transform.getFilenameSuffix()),
- i,
- transform.getNumShards());
-
- String transformName = String.format("%s(Shard:%s)", transform.getName(), i);
- partitionedElements.get(i).apply(transformName,
- transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename));
- }
- return PDone.in(input.getPipeline());
- }
- }
-
- /**
- * Returns the file extension to be used. If the user did not request a file
- * extension then this method returns the empty string. Otherwise this method
- * adds a {@code "."} to the beginning of the users extension if one is not present.
- *
- * <p>This is copied from {@link FileBasedSink} to not expose it.
- */
- private static String getFileExtension(String usersExtension) {
- if (usersExtension == null || usersExtension.isEmpty()) {
- return "";
- }
- if (usersExtension.startsWith(".")) {
- return usersExtension;
- }
- return "." + usersExtension;
- }
-
- /**
- * Apply the override for TextIO.Write.Bound if the user requested sharding controls
- * greater than one.
- */
- private <T> PDone applyTextIOWrite(TextIO.Write.Bound<T> transform, PCollection<T> input) {
- if (transform.getNumShards() <= 1) {
- // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never
- // requested sharding controls greater than 1, we default to outputting to 1 file.
- return super.apply(transform.withNumShards(1), input);
- }
- return input.apply(new DirectTextIOWrite<>(transform));
- }
-
- /**
- * Applies AvroIO.Write honoring user requested sharding controls (i.e. withNumShards)
- * by applying a partition function based upon the number of shards the user requested.
- */
- private static class DirectAvroIOWrite<T> extends PTransform<PCollection<T>, PDone> {
- private final AvroIO.Write.Bound<T> transform;
-
- private DirectAvroIOWrite(AvroIO.Write.Bound<T> transform) {
- this.transform = transform;
- }
-
- @Override
- public PDone apply(PCollection<T> input) {
- checkState(transform.getNumShards() > 1,
- "DirectAvroIOWrite is expected to only be used when sharding controls are required.");
-
- // Evenly distribute all the elements across the partitions.
- PCollectionList<T> partitionedElements =
- input.apply(Partition.of(transform.getNumShards(),
- new ElementProcessingOrderPartitionFn<T>()));
-
- // For each input PCollection partition, create a write transform that represents
- // one of the specific shards.
- for (int i = 0; i < transform.getNumShards(); ++i) {
- /*
- * This logic mirrors the file naming strategy within
- * {@link FileBasedSink#generateDestinationFilenames()}
- */
- String outputFilename = IOChannelUtils.constructName(
- transform.getFilenamePrefix(),
- transform.getShardNameTemplate(),
- getFileExtension(transform.getFilenameSuffix()),
- i,
- transform.getNumShards());
-
- String transformName = String.format("%s(Shard:%s)", transform.getName(), i);
- partitionedElements.get(i).apply(transformName,
- transform.withNumShards(1).withShardNameTemplate("").withSuffix("").to(outputFilename));
- }
- return PDone.in(input.getPipeline());
- }
- }
-
- private static class AssignWindowsAndSetStrategy<T, W extends BoundedWindow>
- extends PTransform<PCollection<T>, PCollection<T>> {
-
- private final Window.Bound<T> wrapped;
-
- public AssignWindowsAndSetStrategy(Window.Bound<T> wrapped) {
- this.wrapped = wrapped;
- }
-
- @Override
- public PCollection<T> apply(PCollection<T> input) {
- WindowingStrategy<?, ?> outputStrategy =
- wrapped.getOutputStrategyInternal(input.getWindowingStrategy());
-
- WindowFn<T, BoundedWindow> windowFn =
- (WindowFn<T, BoundedWindow>) outputStrategy.getWindowFn();
-
- // If the Window.Bound transform only changed parts other than the WindowFn, then
- // we skip AssignWindows even though it should be harmless in a perfect world.
- // The world is not perfect, and a GBK may have set it to InvalidWindows to forcibly
- // crash if another GBK is performed without explicitly setting the WindowFn. So we skip
- // AssignWindows in this case.
- if (wrapped.getWindowFn() == null) {
- return input.apply("Identity", ParDo.of(new IdentityFn<T>()))
- .setWindowingStrategyInternal(outputStrategy);
- } else {
- return input
- .apply("AssignWindows", new AssignWindows<T, BoundedWindow>(windowFn))
- .setWindowingStrategyInternal(outputStrategy);
- }
- }
- }
-
- private static class IdentityFn<T> extends DoFn<T, T> {
- @Override
- public void processElement(ProcessContext c) {
- c.output(c.element());
- }
- }
-
- /**
- * Apply the override for AvroIO.Write.Bound if the user requested sharding controls
- * greater than one.
- */
- private <T> PDone applyAvroIOWrite(AvroIO.Write.Bound<T> transform, PCollection<T> input) {
- if (transform.getNumShards() <= 1) {
- // By default, the DirectPipelineRunner outputs to only 1 shard. Since the user never
- // requested sharding controls greater than 1, we default to outputting to 1 file.
- return super.apply(transform.withNumShards(1), input);
- }
- return input.apply(new DirectAvroIOWrite<>(transform));
- }
-
- /**
- * The implementation may split the {@link KeyedCombineFn} into ADD, MERGE and EXTRACT phases (
- * see {@code org.apache.beam.sdk.runners.worker.CombineValuesFn}). In order to emulate
- * this for the {@link DirectPipelineRunner} and provide an experience closer to the service, go
- * through heavy serializability checks for the equivalent of the results of the ADD phase, but
- * after the {@link org.apache.beam.sdk.transforms.GroupByKey} shuffle, and the MERGE
- * phase. Doing these checks ensure that not only is the accumulator coder serializable, but
- * the accumulator coder can actually serialize the data in question.
- */
- public static class TestCombineDoFn<K, InputT, AccumT, OutputT>
- extends DoFn<KV<K, Iterable<InputT>>, KV<K, OutputT>> {
- private final PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, OutputT> fnRunner;
- private final Coder<AccumT> accumCoder;
- private final boolean testSerializability;
- private final Random rand;
-
- public static <K, InputT, AccumT, OutputT> TestCombineDoFn<K, InputT, AccumT, OutputT> create(
- Combine.GroupedValues<K, InputT, OutputT> transform,
- PCollection<KV<K, Iterable<InputT>>> input,
- boolean testSerializability,
- Random rand) {
-
- AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn = transform.getAppliedFn(
- input.getPipeline().getCoderRegistry(), input.getCoder(), input.getWindowingStrategy());
-
- return new TestCombineDoFn(
- PerKeyCombineFnRunners.create(fn.getFn()),
- fn.getAccumulatorCoder(),
- testSerializability,
- rand);
- }
-
- public TestCombineDoFn(
- PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, OutputT> fnRunner,
- Coder<AccumT> accumCoder,
- boolean testSerializability,
- Random rand) {
- this.fnRunner = fnRunner;
- this.accumCoder = accumCoder;
- this.testSerializability = testSerializability;
- this.rand = rand;
-
- // Check that this does not crash, specifically to catch anonymous CustomCoder subclasses.
- this.accumCoder.getEncodingId();
- }
-
- @Override
- public void processElement(ProcessContext c) throws Exception {
- K key = c.element().getKey();
- Iterable<InputT> values = c.element().getValue();
- List<AccumT> groupedPostShuffle =
- ensureSerializableByCoder(ListCoder.of(accumCoder),
- addInputsRandomly(fnRunner, key, values, rand, c),
- "After addInputs of KeyedCombineFn " + fnRunner.fn().toString());
- AccumT merged =
- ensureSerializableByCoder(accumCoder,
- fnRunner.mergeAccumulators(key, groupedPostShuffle, c),
- "After mergeAccumulators of KeyedCombineFn " + fnRunner.fn().toString());
- // Note: The serializability of KV<K, OutputT> is ensured by the
- // runner itself, since it's a transform output.
- c.output(KV.of(key, fnRunner.extractOutput(key, merged, c)));
- }
-
- /**
- * Create a random list of accumulators from the given list of values.
- *
- * <p>Visible for testing purposes only.
- */
- public static <K, AccumT, InputT> List<AccumT> addInputsRandomly(
- PerKeyCombineFnRunner<? super K, ? super InputT, AccumT, ?> fnRunner,
- K key,
- Iterable<InputT> values,
- Random random,
- DoFn<?, ?>.ProcessContext c) {
- List<AccumT> out = new ArrayList<AccumT>();
- int i = 0;
- AccumT accumulator = fnRunner.createAccumulator(key, c);
- boolean hasInput = false;
-
- for (InputT value : values) {
- accumulator = fnRunner.addInput(key, accumulator, value, c);
- hasInput = true;
-
- // For each index i, flip a 1/2^i weighted coin for whether to
- // create a new accumulator after index i is added, i.e. [0]
- // is guaranteed, [1] is an even 1/2, [2] is 1/4, etc. The
- // goal is to partition the inputs into accumulators, and make
- // the accumulators potentially lumpy. Also compact about half
- // of the accumulators.
- if (i == 0 || random.nextInt(1 << Math.min(i, 30)) == 0) {
- if (i % 2 == 0) {
- accumulator = fnRunner.compact(key, accumulator, c);
- }
- out.add(accumulator);
- accumulator = fnRunner.createAccumulator(key, c);
- hasInput = false;
- }
- i++;
- }
- if (hasInput) {
- out.add(accumulator);
- }
-
- Collections.shuffle(out, random);
- return out;
- }
-
- public <T> T ensureSerializableByCoder(
- Coder<T> coder, T value, String errorContext) {
- if (testSerializability) {
- return SerializableUtils.ensureSerializableByCoder(
- coder, value, errorContext);
- }
- return value;
- }
- }
-
- @Override
- public EvaluationResults run(Pipeline pipeline) {
- LOG.info("Executing pipeline using the DirectPipelineRunner.");
-
- Evaluator evaluator = new Evaluator(rand);
- evaluator.run(pipeline);
-
- // Log all counter values for debugging purposes.
- for (Counter counter : evaluator.getCounters()) {
- LOG.info("Final aggregator value: {}", counter);
- }
-
- LOG.info("Pipeline execution complete.");
-
- return evaluator;
- }
-
- /**
- * An evaluator of a PTransform.
- */
- public interface TransformEvaluator<TransformT extends PTransform> {
- public void evaluate(TransformT transform,
- EvaluationContext context);
- }
-
- /**
- * The interface provided to registered callbacks for interacting
- * with the {@code DirectPipelineRunner}, including reading and writing the
- * values of {@link PCollection}s and {@link PCollectionView}s.
- */
- public interface EvaluationResults extends PipelineResult {
- /**
- * Retrieves the value of the given PCollection.
- * Throws an exception if the PCollection's value hasn't already been set.
- */
- <T> List<T> getPCollection(PCollection<T> pc);
-
- /**
- * Retrieves the windowed value of the given PCollection.
- * Throws an exception if the PCollection's value hasn't already been set.
- */
- <T> List<WindowedValue<T>> getPCollectionWindowedValues(PCollection<T> pc);
-
- /**
- * Retrieves the values of each PCollection in the given
- * PCollectionList. Throws an exception if the PCollectionList's
- * value hasn't already been set.
- */
- <T> List<List<T>> getPCollectionList(PCollectionList<T> pcs);
-
- /**
- * Retrieves the values indicated by the given {@link PCollectionView}.
- * Note that within the {@link org.apache.beam.sdk.transforms.DoFn.Context}
- * implementation a {@link PCollectionView} should convert from this representation to a
- * suitable side input value.
- */
- <T, WindowedT> Iterable<WindowedValue<?>> getPCollectionView(PCollectionView<T> view);
- }
-
- /**
- * An immutable (value, timestamp) pair, along with other metadata necessary
- * for the implementation of {@code DirectPipelineRunner}.
- */
- public static class ValueWithMetadata<V> {
- /**
- * Returns a new {@code ValueWithMetadata} with the {@code WindowedValue}.
- * Key is null.
- */
- public static <V> ValueWithMetadata<V> of(WindowedValue<V> windowedValue) {
- return new ValueWithMetadata<>(windowedValue, null);
- }
-
- /**
- * Returns a new {@code ValueWithMetadata} with the implicit key associated
- * with this value set. The key is the last key grouped by in the chain of
- * productions that produced this element.
- * These keys are used internally by {@link DirectPipelineRunner} for keeping
- * persisted state separate across keys.
- */
- public ValueWithMetadata<V> withKey(Object key) {
- return new ValueWithMetadata<>(windowedValue, key);
- }
-
- /**
- * Returns a new {@code ValueWithMetadata} that is a copy of this one, but with
- * a different value.
- */
- public <T> ValueWithMetadata<T> withValue(T value) {
- return new ValueWithMetadata(windowedValue.withValue(value), getKey());
- }
-
- /**
- * Returns the {@code WindowedValue} associated with this element.
- */
- public WindowedValue<V> getWindowedValue() {
- return windowedValue;
- }
-
- /**
- * Returns the value associated with this element.
- *
- * @see #withValue
- */
- public V getValue() {
- return windowedValue.getValue();
- }
-
- /**
- * Returns the timestamp associated with this element.
- */
- public Instant getTimestamp() {
- return windowedValue.getTimestamp();
- }
-
- /**
- * Returns the collection of windows this element has been placed into. May
- * be null if the {@code PCollection} this element is in has not yet been
- * windowed.
- *
- * @see #getWindows()
- */
- public Collection<? extends BoundedWindow> getWindows() {
- return windowedValue.getWindows();
- }
-
-
- /**
- * Returns the key associated with this element. May be null if the
- * {@code PCollection} this element is in is not keyed.
- *
- * @see #withKey
- */
- public Object getKey() {
- return key;
- }
-
- ////////////////////////////////////////////////////////////////////////////
-
- private final Object key;
- private final WindowedValue<V> windowedValue;
-
- private ValueWithMetadata(WindowedValue<V> windowedValue,
- Object key) {
- this.windowedValue = windowedValue;
- this.key = key;
- }
- }
-
- /**
- * The interface provided to registered callbacks for interacting
- * with the {@code DirectPipelineRunner}, including reading and writing the
- * values of {@link PCollection}s and {@link PCollectionView}s.
- */
- public interface EvaluationContext extends EvaluationResults {
- /**
- * Returns the configured pipeline options.
- */
- DirectPipelineOptions getPipelineOptions();
-
- /**
- * Returns the input of the currently being processed transform.
- */
- <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform);
-
- /**
- * Returns the output of the currently being processed transform.
- */
- <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform);
-
- /**
- * Sets the value of the given PCollection, where each element also has a timestamp
- * and collection of windows.
- * Throws an exception if the PCollection's value has already been set.
- */
- <T> void setPCollectionValuesWithMetadata(
- PCollection<T> pc, List<ValueWithMetadata<T>> elements);
-
- /**
- * Sets the value of the given PCollection, where each element also has a timestamp
- * and collection of windows.
- * Throws an exception if the PCollection's value has already been set.
- */
- <T> void setPCollectionWindowedValue(PCollection<T> pc, List<WindowedValue<T>> elements);
-
- /**
- * Shorthand for setting the value of a PCollection where the elements do not have
- * timestamps or windows.
- * Throws an exception if the PCollection's value has already been set.
- */
- <T> void setPCollection(PCollection<T> pc, List<T> elements);
-
- /**
- * Retrieves the value of the given PCollection, along with element metadata
- * such as timestamps and windows.
- * Throws an exception if the PCollection's value hasn't already been set.
- */
- <T> List<ValueWithMetadata<T>> getPCollectionValuesWithMetadata(PCollection<T> pc);
-
- /**
- * Sets the value associated with the given {@link PCollectionView}.
- * Throws an exception if the {@link PCollectionView}'s value has already been set.
- */
- <ElemT, T, WindowedT> void setPCollectionView(
- PCollectionView<T> pc,
- Iterable<WindowedValue<ElemT>> value);
-
- /**
- * Ensures that the element is encodable and decodable using the
- * TypePValue's coder, by encoding it and decoding it, and
- * returning the result.
- */
- <T> T ensureElementEncodable(TypedPValue<T> pvalue, T element);
-
- /**
- * If the evaluation context is testing unorderedness,
- * randomly permutes the order of the elements, in a
- * copy if !inPlaceAllowed, and returns the permuted list,
- * otherwise returns the argument unchanged.
- */
- <T> List<T> randomizeIfUnordered(List<T> elements,
- boolean inPlaceAllowed);
-
- /**
- * If the evaluation context is testing serializability, ensures
- * that the argument function is serializable and deserializable
- * by encoding it and then decoding it, and returning the result.
- * Otherwise returns the argument unchanged.
- */
- <FunctionT extends Serializable> FunctionT ensureSerializable(FunctionT fn);
-
- /**
- * If the evaluation context is testing serializability, ensures
- * that the argument Coder is serializable and deserializable
- * by encoding it and then decoding it, and returning the result.
- * Otherwise returns the argument unchanged.
- */
- <T> Coder<T> ensureCoderSerializable(Coder<T> coder);
-
- /**
- * If the evaluation context is testing serializability, ensures
- * that the given data is serializable and deserializable with the
- * given Coder by encoding it and then decoding it, and returning
- * the result. Otherwise returns the argument unchanged.
- *
- * <p>Error context is prefixed to any thrown exceptions.
- */
- <T> T ensureSerializableByCoder(Coder<T> coder,
- T data, String errorContext);
-
- /**
- * Returns a mutator, which can be used to add additional counters to
- * this EvaluationContext.
- */
- CounterSet.AddCounterMutator getAddCounterMutator();
-
- /**
- * Gets the step name for this transform.
- */
- public String getStepName(PTransform<?, ?> transform);
- }
-
-
- /////////////////////////////////////////////////////////////////////////////
-
- class Evaluator extends PipelineVisitor.Defaults implements EvaluationContext {
- /**
- * A map from PTransform to the step name of that transform. This is the internal name for the
- * transform (e.g. "s2").
- */
- private final Map<PTransform<?, ?>, String> stepNames = new HashMap<>();
- private final Map<PValue, Object> store = new HashMap<>();
- private final CounterSet counters = new CounterSet();
- private AppliedPTransform<?, ?, ?> currentTransform;
-
- private Map<Aggregator<?, ?>, Collection<PTransform<?, ?>>> aggregatorSteps = null;
-
- /**
- * A map from PTransform to the full name of that transform. This is the user name of the
- * transform (e.g. "RemoveDuplicates/Combine/GroupByKey").
- */
- private final Map<PTransform<?, ?>, String> fullNames = new HashMap<>();
-
- private Random rand;
-
- public Evaluator() {
- this(new Random());
- }
-
- public Evaluator(Random rand) {
- this.rand = rand;
- }
-
- public void run(Pipeline pipeline) {
- pipeline.traverseTopologically(this);
- aggregatorSteps = new AggregatorPipelineExtractor(pipeline).getAggregatorSteps();
- }
-
- @Override
- public DirectPipelineOptions getPipelineOptions() {
- return options;
- }
-
- @Override
- public <InputT extends PInput> InputT getInput(PTransform<InputT, ?> transform) {
- checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
- "can only be called with current transform");
- return (InputT) currentTransform.getInput();
- }
-
- @Override
- public <OutputT extends POutput> OutputT getOutput(PTransform<?, OutputT> transform) {
- checkArgument(currentTransform != null && currentTransform.getTransform() == transform,
- "can only be called with current transform");
- return (OutputT) currentTransform.getOutput();
- }
-
- @Override
- public void visitPrimitiveTransform(TransformTreeNode node) {
- PTransform<?, ?> transform = node.getTransform();
- fullNames.put(transform, node.getFullName());
- TransformEvaluator evaluator =
- getTransformEvaluator(transform.getClass());
- if (evaluator == null) {
- throw new IllegalStateException(
- "no evaluator registered for " + transform);
- }
- LOG.debug("Evaluating {}", transform);
- currentTransform = AppliedPTransform.of(
- node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform);
- evaluator.evaluate(transform, this);
- currentTransform = null;
- }
-
- @Override
- public void visitValue(PValue value, TransformTreeNode producer) {
- LOG.debug("Checking evaluation of {}", value);
- if (value.getProducingTransformInternal() == null) {
- throw new RuntimeException(
- "internal error: expecting a PValue to have a producingTransform");
- }
- if (!producer.isCompositeNode()) {
- // Verify that primitive transform outputs are already computed.
- getPValue(value);
- }
- }
-
- /**
- * Sets the value of the given PValue.
- * Throws an exception if the PValue's value has already been set.
- */
- void setPValue(PValue pvalue, Object contents) {
- if (store.containsKey(pvalue)) {
- throw new IllegalStateException(
- "internal error: setting the value of " + pvalue
- + " more than once");
- }
- store.put(pvalue, contents);
- }
-
- /**
- * Retrieves the value of the given PValue.
- * Throws an exception if the PValue's value hasn't already been set.
- */
- Object getPValue(PValue pvalue) {
- if (!store.containsKey(pvalue)) {
- throw new IllegalStateException(
- "internal error: getting the value of " + pvalue
- + " before it has been computed");
- }
- return store.get(pvalue);
- }
-
- /**
- * Convert a list of T to a list of {@code ValueWithMetadata<T>}, with a timestamp of 0
- * and null windows.
- */
- <T> List<ValueWithMetadata<T>> toValueWithMetadata(List<T> values) {
- List<ValueWithMetadata<T>> result = new ArrayList<>(values.size());
- for (T value : values) {
- result.add(ValueWithMetadata.of(WindowedValue.valueInGlobalWindow(value)));
- }
- return result;
- }
-
- /**
- * Convert a list of {@code WindowedValue<T>} to a list of {@code ValueWithMetadata<T>}.
- */
- <T> List<ValueWithMetadata<T>> toValueWithMetadataFromWindowedValue(
- List<WindowedValue<T>> values) {
- List<ValueWithMetadata<T>> result = new ArrayList<>(values.size());
- for (WindowedValue<T> value : values) {
- result.add(ValueWithMetadata.of(value));
- }
- return result;
- }
-
- @Override
- public <T> void setPCollection(PCollection<T> pc, List<T> elements) {
- setPCollectionValuesWithMetadata(pc, toValueWithMetadata(elements));
- }
-
- @Override
- public <T> void setPCollectionWindowedValue(
- PCollection<T> pc, List<WindowedValue<T>> elements) {
- setPCollectionValuesWithMetadata(pc, toValueWithMetadataFromWindowedValue(elements));
- }
-
- @Override
- public <T> void setPCollectionValuesWithMetadata(
- PCollection<T> pc, List<ValueWithMetadata<T>> elements) {
- LOG.debug("Setting {} = {}", pc, elements);
- ensurePCollectionEncodable(pc, elements);
- setPValue(pc, elements);
- }
-
- @Override
- public <ElemT, T, WindowedT> void setPCollectionView(
- PCollectionView<T> view,
- Iterable<WindowedValue<ElemT>> value) {
- LOG.debug("Setting {} = {}", view, value);
- setPValue(view, value);
- }
-
- /**
- * Retrieves the value of the given {@link PCollection}.
- * Throws an exception if the {@link PCollection}'s value hasn't already been set.
- */
- @Override
- public <T> List<T> getPCollection(PCollection<T> pc) {
- List<T> result = new ArrayList<>();
- for (ValueWithMetadata<T> elem : getPCollectionValuesWithMetadata(pc)) {
- result.add(elem.getValue());
- }
- return result;
- }
-
- @Override
- public <T> List<WindowedValue<T>> getPCollectionWindowedValues(PCollection<T> pc) {
- return Lists.transform(
- getPCollectionValuesWithMetadata(pc),
- new Function<ValueWithMetadata<T>, WindowedValue<T>>() {
- @Override
- public WindowedValue<T> apply(ValueWithMetadata<T> input) {
- return input.getWindowedValue();
- }});
- }
-
- @Override
- public <T> List<ValueWithMetadata<T>> getPCollectionValuesWithMetadata(PCollection<T> pc) {
- List<ValueWithMetadata<T>> elements = (List<ValueWithMetadata<T>>) getPValue(pc);
- elements = randomizeIfUnordered(elements, false /* not inPlaceAllowed */);
- LOG.debug("Getting {} = {}", pc, elements);
- return elements;
- }
-
- @Override
- public <T> List<List<T>> getPCollectionList(PCollectionList<T> pcs) {
- List<List<T>> elementsList = new ArrayList<>();
- for (PCollection<T> pc : pcs.getAll()) {
- elementsList.add(getPCollection(pc));
- }
- return elementsList;
- }
-
- /**
- * Retrieves the value indicated by the given {@link PCollectionView}.
- * Note that within the {@link DoFnContext} a {@link PCollectionView}
- * converts from this representation to a suitable side input value.
- */
- @Override
- public <T, WindowedT> Iterable<WindowedValue<?>> getPCollectionView(PCollectionView<T> view) {
- Iterable<WindowedValue<?>> value = (Iterable<WindowedValue<?>>) getPValue(view);
- LOG.debug("Getting {} = {}", view, value);
- return value;
- }
-
- /**
- * If {@code testEncodability}, ensures that the {@link PCollection}'s coder and elements are
- * encodable and decodable by encoding them and decoding them, and returning the result.
- * Otherwise returns the argument elements.
- */
- <T> List<ValueWithMetadata<T>> ensurePCollectionEncodable(
- PCollection<T> pc, List<ValueWithMetadata<T>> elements) {
- ensureCoderSerializable(pc.getCoder());
- if (!testEncodability) {
- return elements;
- }
- List<ValueWithMetadata<T>> elementsCopy = new ArrayList<>(elements.size());
- for (ValueWithMetadata<T> element : elements) {
- elementsCopy.add(
- element.withValue(ensureElementEncodable(pc, element.getValue())));
- }
- return elementsCopy;
- }
-
- @Override
- public <T> T ensureElementEncodable(TypedPValue<T> pvalue, T element) {
- return ensureSerializableByCoder(
- pvalue.getCoder(), element, "Within " + pvalue.toString());
- }
-
- @Override
- public <T> List<T> randomizeIfUnordered(List<T> elements,
- boolean inPlaceAllowed) {
- if (!testUnorderedness) {
- return elements;
- }
- List<T> elementsCopy = new ArrayList<>(elements);
- Collections.shuffle(elementsCopy, rand);
- return elementsCopy;
- }
-
- @Override
- public <FunctionT extends Serializable> FunctionT ensureSerializable(FunctionT fn) {
- if (!testSerializability) {
- return fn;
- }
- return SerializableUtils.ensureSerializable(fn);
- }
-
- @Override
- public <T> Coder<T> ensureCoderSerializable(Coder<T> coder) {
- if (testSerializability) {
- SerializableUtils.ensureSerializable(coder);
- }
- return coder;
- }
-
- @Override
- public <T> T ensureSerializableByCoder(
- Coder<T> coder, T value, String errorContext) {
- if (testSerializability) {
- return SerializableUtils.ensureSerializableByCoder(
- coder, value, errorContext);
- }
- return value;
- }
-
- @Override
- public CounterSet.AddCounterMutator getAddCounterMutator() {
- return counters.getAddCounterMutator();
- }
-
- @Override
- public String getStepName(PTransform<?, ?> transform) {
- String stepName = stepNames.get(transform);
- if (stepName == null) {
- stepName = "s" + (stepNames.size() + 1);
- stepNames.put(transform, stepName);
- }
- return stepName;
- }
-
- /**
- * Returns the CounterSet generated during evaluation, which includes
- * user-defined Aggregators and may include system-defined counters.
- */
- public CounterSet getCounters() {
- return counters;
- }
-
- /**
- * Returns JobState.DONE in all situations. The Evaluator is not returned
- * until the pipeline has been traversed, so it will either be returned
- * after a successful run or the run call will terminate abnormally.
- */
- @Override
- public State getState() {
- return State.DONE;
- }
-
- @Override
- public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator) {
- Map<String, T> stepValues = new HashMap<>();
- for (PTransform<?, ?> step : aggregatorSteps.get(aggregator)) {
- String stepName = String.format("user-%s-%s", stepNames.get(step), aggregator.getName());
- String fullName = fullNames.get(step);
- Counter<?> counter = counters.getExistingCounter(stepName);
- if (counter == null) {
- throw new IllegalArgumentException(
- "Aggregator " + aggregator + " is not used in this pipeline");
- }
- stepValues.put(fullName, (T) counter.getAggregate());
- }
- return new MapAggregatorValues<>(stepValues);
- }
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- /**
- * The key by which GBK groups inputs - elements are grouped by the encoded form of the key,
- * but the original key may be accessed as well.
- */
- private static class GroupingKey<K> {
- private K key;
- private byte[] encodedKey;
-
- public GroupingKey(K key, byte[] encodedKey) {
- this.key = key;
- this.encodedKey = encodedKey;
- }
-
- public K getKey() {
- return key;
- }
-
- @Override
- public boolean equals(Object o) {
- if (o instanceof GroupingKey) {
- GroupingKey<?> that = (GroupingKey<?>) o;
- return Arrays.equals(this.encodedKey, that.encodedKey);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Arrays.hashCode(encodedKey);
- }
- }
-
- private final DirectPipelineOptions options;
- private boolean testSerializability;
- private boolean testEncodability;
- private boolean testUnorderedness;
-
- /** Returns a new DirectPipelineRunner. */
- private DirectPipelineRunner(DirectPipelineOptions options) {
- this.options = options;
- // (Re-)register standard IO factories. Clobbers any prior credentials.
- IOChannelUtils.registerStandardIOFactories(options);
- long randomSeed;
- if (options.getDirectPipelineRunnerRandomSeed() != null) {
- randomSeed = options.getDirectPipelineRunnerRandomSeed();
- } else {
- randomSeed = new Random().nextLong();
- }
-
- LOG.debug("DirectPipelineRunner using random seed {}.", randomSeed);
- rand = new Random(randomSeed);
-
- testSerializability = options.isTestSerializability();
- testEncodability = options.isTestEncodability();
- testUnorderedness = options.isTestUnorderedness();
- }
-
- /**
- * Get the options used in this {@link Pipeline}.
- */
- public DirectPipelineOptions getPipelineOptions() {
- return options;
- }
-
- @Override
- public String toString() {
- return "DirectPipelineRunner#" + hashCode();
- }
-
- public static <K, V> void evaluateGroupByKeyOnly(
- GroupByKeyOnly<K, V> transform,
- EvaluationContext context) {
- PCollection<KV<K, V>> input = context.getInput(transform);
-
- List<ValueWithMetadata<KV<K, V>>> inputElems =
- context.getPCollectionValuesWithMetadata(input);
-
- Coder<K> keyCoder = GroupByKey.getKeyCoder(input.getCoder());
-
- Map<GroupingKey<K>, List<V>> groupingMap = new HashMap<>();
-
- for (ValueWithMetadata<KV<K, V>> elem : inputElems) {
- K key = elem.getValue().getKey();
- V value = elem.getValue().getValue();
- byte[] encodedKey;
- try {
- encodedKey = encodeToByteArray(keyCoder, key);
- } catch (CoderException exn) {
- // TODO: Put in better element printing:
- // truncate if too long.
- throw new IllegalArgumentException(
- "unable to encode key " + key + " of input to " + transform
- + " using " + keyCoder,
- exn);
- }
- GroupingKey<K> groupingKey =
- new GroupingKey<>(key, encodedKey);
- List<V> values = groupingMap.get(groupingKey);
- if (values == null) {
- values = new ArrayList<V>();
- groupingMap.put(groupingKey, values);
- }
- values.add(value);
- }
-
- List<ValueWithMetadata<KV<K, Iterable<V>>>> outputElems =
- new ArrayList<>();
- for (Map.Entry<GroupingKey<K>, List<V>> entry : groupingMap.entrySet()) {
- GroupingKey<K> groupingKey = entry.getKey();
- K key = groupingKey.getKey();
- List<V> values = entry.getValue();
- values = context.randomizeIfUnordered(values, true /* inPlaceAllowed */);
- outputElems.add(ValueWithMetadata
- .of(WindowedValue.valueInEmptyWindows(KV.<K, Iterable<V>>of(key, values)))
- .withKey(key));
- }
-
- context.setPCollectionValuesWithMetadata(context.getOutput(transform),
- outputElems);
- }
-
- @SuppressWarnings({"rawtypes", "unchecked"})
- public
- static <K, V> void registerGroupByKeyOnly() {
- registerDefaultTransformEvaluator(
- GroupByKeyOnly.class,
- new TransformEvaluator<GroupByKeyOnly>() {
- @Override
- public void evaluate(
- GroupByKeyOnly transform,
- EvaluationContext context) {
- evaluateGroupByKeyOnly(transform, context);
- }
- });
- }
-
- static {
- registerGroupByKeyOnly();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 7c6fed3..93917f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -20,16 +20,12 @@ package org.apache.beam.sdk.transforms;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableLikeCoder;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* {@code Flatten<T>} takes multiple {@code PCollection<T>}s bundled
* into a {@code PCollectionList<T>} and returns a single
@@ -189,32 +185,4 @@ public class Flatten {
.setCoder(elemCoder);
}
}
-
- /////////////////////////////////////////////////////////////////////////////
-
- static {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- FlattenPCollectionList.class,
- new DirectPipelineRunner.TransformEvaluator<FlattenPCollectionList>() {
- @Override
- public void evaluate(
- FlattenPCollectionList transform,
- DirectPipelineRunner.EvaluationContext context) {
- evaluateHelper(transform, context);
- }
- });
- }
-
- private static <T> void evaluateHelper(
- FlattenPCollectionList<T> transform,
- DirectPipelineRunner.EvaluationContext context) {
- List<DirectPipelineRunner.ValueWithMetadata<T>> outputElems = new ArrayList<>();
- PCollectionList<T> inputs = context.getInput(transform);
-
- for (PCollection<T> input : inputs.getAll()) {
- outputElems.addAll(context.getPCollectionValuesWithMetadata(input));
- }
-
- context.setPCollectionValuesWithMetadata(context.getOutput(transform), outputElems);
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 511f0d8..cb7d372 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -21,27 +21,12 @@ import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.DirectModeExecutionContext;
-import org.apache.beam.sdk.util.DirectSideInputReader;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunnerBase;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.IllegalMutationException;
-import org.apache.beam.sdk.util.MutationDetector;
-import org.apache.beam.sdk.util.MutationDetectors;
-import org.apache.beam.sdk.util.PTuple;
import org.apache.beam.sdk.util.SerializableUtils;
-import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.StringUtils;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
@@ -50,16 +35,10 @@ import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypedPValue;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
-import javax.annotation.Nullable;
/**
* {@link ParDo} is the core element-wise transform in Google Cloud
@@ -84,7 +63,7 @@ import javax.annotation.Nullable;
* <p>Conceptually, when a {@link ParDo} transform is executed, the
* elements of the input {@link PCollection} are first divided up
* into some number of "bundles". These are farmed off to distributed
- * worker machines (or run locally, if using the {@link DirectPipelineRunner}).
+ * worker machines (or run locally, if using the {@code DirectRunner}).
* For each bundle of input elements processing proceeds as follows:
*
* <ol>
@@ -1072,288 +1051,11 @@ public class ParDo {
}
}
- /////////////////////////////////////////////////////////////////////////////
-
- static {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- Bound.class,
- new DirectPipelineRunner.TransformEvaluator<Bound>() {
- @Override
- public void evaluate(
- Bound transform,
- DirectPipelineRunner.EvaluationContext context) {
- evaluateSingleHelper(transform, context);
- }
- });
- }
-
- private static <InputT, OutputT> void evaluateSingleHelper(
- Bound<InputT, OutputT> transform,
- DirectPipelineRunner.EvaluationContext context) {
- TupleTag<OutputT> mainOutputTag = new TupleTag<>("out");
-
- DirectModeExecutionContext executionContext = DirectModeExecutionContext.create();
-
- PCollectionTuple outputs = PCollectionTuple.of(mainOutputTag, context.getOutput(transform));
-
- evaluateHelper(
- transform.fn,
- context.getStepName(transform),
- context.getInput(transform),
- transform.sideInputs,
- mainOutputTag,
- Collections.<TupleTag<?>>emptyList(),
- outputs,
- context,
- executionContext);
-
- context.setPCollectionValuesWithMetadata(
- context.getOutput(transform),
- executionContext.getOutput(mainOutputTag));
- }
-
- /////////////////////////////////////////////////////////////////////////////
-
- static {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- BoundMulti.class,
- new DirectPipelineRunner.TransformEvaluator<BoundMulti>() {
- @Override
- public void evaluate(
- BoundMulti transform,
- DirectPipelineRunner.EvaluationContext context) {
- evaluateMultiHelper(transform, context);
- }
- });
- }
-
- private static <InputT, OutputT> void evaluateMultiHelper(
- BoundMulti<InputT, OutputT> transform,
- DirectPipelineRunner.EvaluationContext context) {
-
- DirectModeExecutionContext executionContext = DirectModeExecutionContext.create();
-
- evaluateHelper(
- transform.fn,
- context.getStepName(transform),
- context.getInput(transform),
- transform.sideInputs,
- transform.mainOutputTag,
- transform.sideOutputTags.getAll(),
- context.getOutput(transform),
- context,
- executionContext);
-
- for (Map.Entry<TupleTag<?>, PCollection<?>> entry
- : context.getOutput(transform).getAll().entrySet()) {
- @SuppressWarnings("unchecked")
- TupleTag<Object> tag = (TupleTag<Object>) entry.getKey();
- @SuppressWarnings("unchecked")
- PCollection<Object> pc = (PCollection<Object>) entry.getValue();
-
- context.setPCollectionValuesWithMetadata(
- pc,
- (tag == transform.mainOutputTag
- ? executionContext.getOutput(tag)
- : executionContext.getSideOutput(tag)));
- }
- }
-
- /**
- * Evaluates a single-output or multi-output {@link ParDo} directly.
- *
- * <p>This evaluation method is intended for use in testing scenarios; it is designed for clarity
- * and correctness-checking, not speed.
- *
- * <p>Of particular note, this performs best-effort checking that inputs and outputs are not
- * mutated in violation of the requirements upon a {@link DoFn}.
- */
- private static <InputT, OutputT, ActualInputT extends InputT> void evaluateHelper(
- DoFn<InputT, OutputT> doFn,
- String stepName,
- PCollection<ActualInputT> input,
- List<PCollectionView<?>> sideInputs,
- TupleTag<OutputT> mainOutputTag,
- List<TupleTag<?>> sideOutputTags,
- PCollectionTuple outputs,
- DirectPipelineRunner.EvaluationContext context,
- DirectModeExecutionContext executionContext) {
- // TODO: Run multiple shards?
- DoFn<InputT, OutputT> fn = context.ensureSerializable(doFn);
-
- SideInputReader sideInputReader = makeSideInputReader(context, sideInputs);
-
- // When evaluating via the DirectPipelineRunner, this output manager checks each output for
- // illegal mutations when the next output comes along. We then verify again after finishBundle()
- // The common case we expect this to catch is a user mutating an input in order to repeatedly
- // emit "variations".
- ImmutabilityCheckingOutputManager<ActualInputT> outputManager =
- new ImmutabilityCheckingOutputManager<>(
- fn.getClass().getSimpleName(),
- new DoFnRunnerBase.ListOutputManager(),
- outputs);
-
- DoFnRunner<InputT, OutputT> fnRunner =
- DoFnRunners.createDefault(
- context.getPipelineOptions(),
- fn,
- sideInputReader,
- outputManager,
- mainOutputTag,
- sideOutputTags,
- executionContext.getOrCreateStepContext(stepName, stepName),
- context.getAddCounterMutator(),
- input.getWindowingStrategy());
-
- fnRunner.startBundle();
-
- for (DirectPipelineRunner.ValueWithMetadata<ActualInputT> elem
- : context.getPCollectionValuesWithMetadata(input)) {
- if (elem.getValue() instanceof KV) {
- // In case the DoFn needs keyed state, set the implicit keys to the keys
- // in the input elements.
- @SuppressWarnings("unchecked")
- KV<?, ?> kvElem = (KV<?, ?>) elem.getValue();
- executionContext.setKey(kvElem.getKey());
- } else {
- executionContext.setKey(elem.getKey());
- }
-
- // We check the input for mutations only through the call span of processElement.
- // This will miss some cases, but the check is ad hoc and best effort. The common case
- // is that the input is mutated to be used for output.
- try {
- MutationDetector inputMutationDetector = MutationDetectors.forValueWithCoder(
- elem.getWindowedValue().getValue(), input.getCoder());
- @SuppressWarnings("unchecked")
- WindowedValue<InputT> windowedElem = ((WindowedValue<InputT>) elem.getWindowedValue());
- fnRunner.processElement(windowedElem);
- inputMutationDetector.verifyUnmodified();
- } catch (CoderException e) {
- throw UserCodeException.wrap(e);
- } catch (IllegalMutationException exn) {
- throw new IllegalMutationException(
- String.format("DoFn %s mutated input value %s of class %s (new value was %s)."
- + " Input values must not be mutated in any way.",
- fn.getClass().getSimpleName(),
- exn.getSavedValue(), exn.getSavedValue().getClass(), exn.getNewValue()),
- exn.getSavedValue(),
- exn.getNewValue(),
- exn);
- }
- }
-
- // Note that the input could have been retained and mutated prior to this final output,
- // but for now it degrades readability too much to be worth trying to catch that particular
- // corner case.
- fnRunner.finishBundle();
- outputManager.verifyLatestOutputsUnmodified();
- }
-
- private static SideInputReader makeSideInputReader(
- DirectPipelineRunner.EvaluationContext context, List<PCollectionView<?>> sideInputs) {
- PTuple sideInputValues = PTuple.empty();
- for (PCollectionView<?> view : sideInputs) {
- sideInputValues = sideInputValues.and(
- view.getTagInternal(),
- context.getPCollectionView(view));
- }
- return DirectSideInputReader.of(sideInputValues);
- }
-
private static void populateDisplayData(
DisplayData.Builder builder, DoFn<?, ?> fn, Class<?> fnClass) {
builder
.include(fn)
.add(DisplayData.item("fn", fnClass)
- .withLabel("Transform Function"));
- }
-
- /**
- * A {@code DoFnRunner.OutputManager} that provides facilities for checking output values for
- * illegal mutations.
- *
- * <p>When used via the try-with-resources pattern, it is guaranteed that every value passed
- * to {@link #output} will have been checked for illegal mutation.
- */
- private static class ImmutabilityCheckingOutputManager<InputT>
- implements DoFnRunners.OutputManager, AutoCloseable {
-
- private final DoFnRunners.OutputManager underlyingOutputManager;
- private final ConcurrentMap<TupleTag<?>, MutationDetector> mutationDetectorForTag;
- private final PCollectionTuple outputs;
- private String doFnName;
-
- public ImmutabilityCheckingOutputManager(
- String doFnName,
- DoFnRunners.OutputManager underlyingOutputManager,
- PCollectionTuple outputs) {
- this.doFnName = doFnName;
- this.underlyingOutputManager = underlyingOutputManager;
- this.outputs = outputs;
- this.mutationDetectorForTag = Maps.newConcurrentMap();
- }
-
- @Override
- public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
-
- // Skip verifying undeclared outputs, since we don't have coders for them.
- if (outputs.has(tag)) {
- try {
- MutationDetector newDetector =
- MutationDetectors.forValueWithCoder(
- output.getValue(), outputs.get(tag).getCoder());
- MutationDetector priorDetector = mutationDetectorForTag.put(tag, newDetector);
- verifyOutputUnmodified(priorDetector);
- } catch (CoderException e) {
- throw UserCodeException.wrap(e);
- }
- }
-
- // Actually perform the output.
- underlyingOutputManager.output(tag, output);
- }
-
- /**
- * Throws {@link IllegalMutationException} if the prior output for any tag has been mutated
- * since being output.
- */
- public void verifyLatestOutputsUnmodified() {
- for (MutationDetector detector : mutationDetectorForTag.values()) {
- verifyOutputUnmodified(detector);
- }
- }
-
- /**
- * Adapts the error message from the provided {@code detector}.
- *
- * <p>The {@code detector} may be null, in which case no check is performed. This is merely
- * to consolidate null checking to this method.
- */
- private <T> void verifyOutputUnmodified(@Nullable MutationDetector detector) {
- if (detector == null) {
- return;
- }
-
- try {
- detector.verifyUnmodified();
- } catch (IllegalMutationException exn) {
- throw new IllegalMutationException(String.format(
- "DoFn %s mutated value %s after it was output (new value was %s)."
- + " Values must not be mutated in any way after being output.",
- doFnName, exn.getSavedValue(), exn.getNewValue()),
- exn.getSavedValue(), exn.getNewValue(),
- exn);
- }
- }
-
- /**
- * When used in a {@code try}-with-resources block, verifies all of the latest outputs upon
- * {@link #close()}.
- */
- @Override
- public void close() {
- verifyLatestOutputsUnmodified();
- }
+ .withLabel("Transform Function"));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 3df915b..7a97c13 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -17,10 +17,8 @@
*/
package org.apache.beam.sdk.transforms;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.util.PCollectionViews;
-import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -445,27 +443,5 @@ public class View {
public PCollectionView<ViewT> apply(PCollection<ElemT> input) {
return view;
}
-
- static {
- DirectPipelineRunner.registerDefaultTransformEvaluator(
- CreatePCollectionView.class,
- new DirectPipelineRunner.TransformEvaluator<CreatePCollectionView>() {
- @SuppressWarnings("rawtypes")
- @Override
- public void evaluate(
- CreatePCollectionView transform,
- DirectPipelineRunner.EvaluationContext context) {
- evaluateTyped(transform, context);
- }
-
- private <ElemT, ViewT> void evaluateTyped(
- CreatePCollectionView<ElemT, ViewT> transform,
- DirectPipelineRunner.EvaluationContext context) {
- List<WindowedValue<ElemT>> elems =
- context.getPCollectionWindowedValues(context.getInput(transform));
- context.setPCollectionView(context.getOutput(transform), elems);
- }
- });
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java
deleted file mode 100644
index 85e36dd..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DirectModeExecutionContext.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import org.apache.beam.sdk.runners.DirectPipelineRunner.ValueWithMetadata;
-import org.apache.beam.sdk.util.state.InMemoryStateInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * {@link ExecutionContext} for use in direct mode.
- */
-public class DirectModeExecutionContext
- extends BaseExecutionContext<DirectModeExecutionContext.StepContext> {
-
- private Object key;
- private List<ValueWithMetadata<?>> output = Lists.newArrayList();
- private Map<TupleTag<?>, List<ValueWithMetadata<?>>> sideOutputs = Maps.newHashMap();
-
- protected DirectModeExecutionContext() {}
-
- public static DirectModeExecutionContext create() {
- return new DirectModeExecutionContext();
- }
-
- @Override
- protected StepContext createStepContext(String stepName, String transformName) {
- return new StepContext(this, stepName, transformName);
- }
-
- public Object getKey() {
- return key;
- }
-
- public void setKey(Object newKey) {
- // The direct mode runner may reorder elements, so we need to keep
- // around the state used for each key.
- for (ExecutionContext.StepContext stepContext : getAllStepContexts()) {
- ((StepContext) stepContext).switchKey(newKey);
- }
- key = newKey;
- }
-
- @Override
- public void noteOutput(WindowedValue<?> outputElem) {
- output.add(ValueWithMetadata.of(outputElem).withKey(getKey()));
- }
-
- @Override
- public void noteSideOutput(TupleTag<?> tag, WindowedValue<?> outputElem) {
- List<ValueWithMetadata<?>> output = sideOutputs.get(tag);
- if (output == null) {
- output = Lists.newArrayList();
- sideOutputs.put(tag, output);
- }
- output.add(ValueWithMetadata.of(outputElem).withKey(getKey()));
- }
-
- public <T> List<ValueWithMetadata<T>> getOutput(@SuppressWarnings("unused") TupleTag<T> tag) {
- @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes
- List<ValueWithMetadata<T>> typedOutput = (List) output;
- return typedOutput;
- }
-
- public <T> List<ValueWithMetadata<T>> getSideOutput(TupleTag<T> tag) {
- if (sideOutputs.containsKey(tag)) {
- @SuppressWarnings({"unchecked", "rawtypes"}) // Cast not expressible without rawtypes
- List<ValueWithMetadata<T>> typedOutput = (List) sideOutputs.get(tag);
- return typedOutput;
- } else {
- return Lists.newArrayList();
- }
- }
-
- /**
- * {@link ExecutionContext.StepContext} used in direct mode.
- */
- public static class StepContext extends BaseExecutionContext.StepContext {
-
- /** A map from each key to the state associated with it. */
- private final Map<Object, InMemoryStateInternals<Object>> stateInternals = Maps.newHashMap();
- private InMemoryStateInternals<Object> currentStateInternals = null;
-
- private StepContext(ExecutionContext executionContext, String stepName, String transformName) {
- super(executionContext, stepName, transformName);
- switchKey(null);
- }
-
- public void switchKey(Object newKey) {
- currentStateInternals = stateInternals.get(newKey);
- if (currentStateInternals == null) {
- currentStateInternals = InMemoryStateInternals.forKey(newKey);
- stateInternals.put(newKey, currentStateInternals);
- }
- }
-
- @Override
- public StateInternals<Object> stateInternals() {
- return checkNotNull(currentStateInternals);
- }
-
- @Override
- public TimerInternals timerInternals() {
- throw new UnsupportedOperationException("Direct mode cannot return timerInternals");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99654ca4/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
index 75861fe..58b10a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/DoFnRunnerBase.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.util;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.DirectPipelineRunner;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Combine.CombineFn;
import org.apache.beam.sdk.transforms.DoFn;