You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/25 02:21:43 UTC
[1/5] beam git commit: Fix tests that passed invalid input to
DynamicDestinations
Repository: beam
Updated Branches:
refs/heads/master 0064fb37a -> 01408c864
Fix tests that passed invalid input to DynamicDestinations
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/12c277f3
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/12c277f3
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/12c277f3
Branch: refs/heads/master
Commit: 12c277f31b2b8a295e0a41cab3290943a3eff7cd
Parents: efe2dc1
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Jul 21 20:32:20 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:53:25 2017 -0700
----------------------------------------------------------------------
.../construction/PTransformMatchersTest.java | 25 +++++++++++++++++-
.../direct/WriteWithShardingFactoryTest.java | 27 +++++++++++++++++++-
.../runners/dataflow/DataflowRunnerTest.java | 24 ++++++++++++++++-
.../beam/sdk/io/DynamicFileDestinations.java | 6 ++++-
4 files changed, 78 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 99d3dd1..316645b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -27,6 +27,7 @@ import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.util.Collections;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
@@ -62,7 +63,9 @@ import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
@@ -547,7 +550,8 @@ public class PTransformMatchersTest implements Serializable {
WriteFiles<Integer, Void, Integer> write =
WriteFiles.to(
new FileBasedSink<Integer, Void>(
- StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) {
+ StaticValueProvider.of(outputDirectory),
+ DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
@Override
public WriteOperation<Integer, Void> createWriteOperation() {
return null;
@@ -580,4 +584,23 @@ public class PTransformMatchersTest implements Serializable {
write,
p);
}
+
+ private static class FakeFilenamePolicy extends FilenamePolicy {
+ @Override
+ public ResourceId windowedFilename(
+ int shardNumber,
+ int numShards,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ FileBasedSink.OutputFileHints outputFileHints) {
+ throw new UnsupportedOperationException("should not be called");
+ }
+
+ @Nullable
+ @Override
+ public ResourceId unwindowedFilename(
+ int shardNumber, int numShards, FileBasedSink.OutputFileHints outputFileHints) {
+ throw new UnsupportedOperationException("should not be called");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 546a181..6dd069c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import javax.annotation.Nullable;
import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.coders.VoidCoder;
@@ -55,7 +56,9 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
@@ -141,7 +144,8 @@ public class WriteWithShardingFactoryTest implements Serializable {
PTransform<PCollection<Object>, PDone> original =
WriteFiles.to(
new FileBasedSink<Object, Void>(
- StaticValueProvider.of(outputDirectory), DynamicFileDestinations.constant(null)) {
+ StaticValueProvider.of(outputDirectory),
+ DynamicFileDestinations.constant(new FakeFilenamePolicy())) {
@Override
public WriteOperation<Object, Void> createWriteOperation() {
throw new IllegalArgumentException("Should not be used");
@@ -234,4 +238,25 @@ public class WriteWithShardingFactoryTest implements Serializable {
List<Integer> shards = fnTester.processBundle((long) count);
assertThat(shards, containsInAnyOrder(13));
}
+
+ private static class FakeFilenamePolicy extends FileBasedSink.FilenamePolicy {
+ @Override
+ public ResourceId windowedFilename(
+ int shardNumber,
+ int numShards,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ FileBasedSink.OutputFileHints outputFileHints) {
+ throw new IllegalArgumentException("Should not be used");
+ }
+
+ @Nullable
+ @Override
+ public ResourceId unwindowedFilename(
+ int shardNumber,
+ int numShards,
+ FileBasedSink.OutputFileHints outputFileHints) {
+ throw new IllegalArgumentException("Should not be used");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 94985f8..7556a28 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -64,6 +64,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
+import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.DataflowRunner.StreamingShardedWriteFactory;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
@@ -81,6 +82,7 @@ import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.WriteFiles;
+import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.CheckEnabled;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -102,6 +104,8 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunctions;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Sessions;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.GcsUtil;
@@ -1293,7 +1297,25 @@ public class DataflowRunnerTest implements Serializable {
TestSink(String tmpFolder) {
super(
StaticValueProvider.of(FileSystems.matchNewResource(tmpFolder, true)),
- DynamicFileDestinations.constant(null));
+ DynamicFileDestinations.constant(
+ new FilenamePolicy() {
+ @Override
+ public ResourceId windowedFilename(
+ int shardNumber,
+ int numShards,
+ BoundedWindow window,
+ PaneInfo paneInfo,
+ OutputFileHints outputFileHints) {
+ throw new UnsupportedOperationException("should not be called");
+ }
+
+ @Nullable
+ @Override
+ public ResourceId unwindowedFilename(
+ int shardNumber, int numShards, OutputFileHints outputFileHints) {
+ throw new UnsupportedOperationException("should not be called");
+ }
+ }));
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/12c277f3/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
index e7ef0f6..d05a01a7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DynamicFileDestinations.java
@@ -18,6 +18,9 @@
package org.apache.beam.sdk.io;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
@@ -34,7 +37,7 @@ public class DynamicFileDestinations {
private final FilenamePolicy filenamePolicy;
public ConstantFilenamePolicy(FilenamePolicy filenamePolicy) {
- this.filenamePolicy = filenamePolicy;
+ this.filenamePolicy = checkNotNull(filenamePolicy);
}
@Override
@@ -59,6 +62,7 @@ public class DynamicFileDestinations {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
+ checkState(filenamePolicy != null);
filenamePolicy.populateDisplayData(builder);
}
}
[5/5] beam git commit: This closes #3334: [BEAM-2333] Go to proto and
back before running a pipeline in Java DirectRunner
Posted by ke...@apache.org.
This closes #3334: [BEAM-2333] Go to proto and back before running a pipeline in Java DirectRunner
Dehydrate then rehydrate Pipeline before DirectRunner.run()
Add Pipeline rehydration from proto
Fix tests that passed invalid input to DynamicDestinations
Add stub DisplayDataTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/01408c86
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/01408c86
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/01408c86
Branch: refs/heads/master
Commit: 01408c864e9d844f4ffb74cc3f18276ff6a5c447
Parents: 0064fb3 8ca4591
Author: Kenneth Knowles <kl...@google.com>
Authored: Mon Jul 24 18:59:57 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:59:57 2017 -0700
----------------------------------------------------------------------
.../construction/DisplayDataTranslation.java | 39 +++
.../construction/PTransformTranslation.java | 12 +-
.../core/construction/PipelineTranslation.java | 280 +++++++++++++++++++
.../core/construction/RehydratedComponents.java | 3 +-
.../core/construction/SdkComponents.java | 52 ----
.../construction/PTransformMatchersTest.java | 25 +-
.../construction/PipelineTranslationTest.java | 199 +++++++++++++
.../core/construction/SdkComponentsTest.java | 107 -------
.../beam/runners/direct/DirectRunner.java | 11 +-
.../runners/direct/ViewOverrideFactoryTest.java | 41 ---
.../direct/WriteWithShardingFactoryTest.java | 27 +-
.../runners/dataflow/DataflowRunnerTest.java | 24 +-
.../src/main/proto/beam_runner_api.proto | 4 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 15 +-
.../beam/sdk/io/DynamicFileDestinations.java | 6 +-
.../beam/sdk/runners/TransformHierarchy.java | 69 +++++
16 files changed, 704 insertions(+), 210 deletions(-)
----------------------------------------------------------------------
[3/5] beam git commit: Add Pipeline rehydration from proto
Posted by ke...@apache.org.
Add Pipeline rehydration from proto
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43481595
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43481595
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43481595
Branch: refs/heads/master
Commit: 43481595ebc854f4a7188609fd53267497e68124
Parents: 12c277f
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 11:22:50 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:53:26 2017 -0700
----------------------------------------------------------------------
.../construction/PTransformTranslation.java | 8 +
.../core/construction/PipelineTranslation.java | 280 +++++++++++++++++++
.../core/construction/RehydratedComponents.java | 3 +-
.../core/construction/SdkComponents.java | 52 ----
.../construction/PipelineTranslationTest.java | 199 +++++++++++++
.../core/construction/SdkComponentsTest.java | 107 -------
.../src/main/proto/beam_runner_api.proto | 4 +-
.../main/java/org/apache/beam/sdk/Pipeline.java | 15 +-
.../beam/sdk/runners/TransformHierarchy.java | 69 +++++
9 files changed, 574 insertions(+), 163 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index d459645..b8365c9 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -92,6 +92,7 @@ public class PTransformTranslation {
List<AppliedPTransform<?, ?, ?>> subtransforms,
SdkComponents components)
throws IOException {
+ // TODO include DisplayData https://issues.apache.org/jira/browse/BEAM-2645
RunnerApi.PTransform.Builder transformBuilder = RunnerApi.PTransform.newBuilder();
for (Map.Entry<TupleTag<?>, PValue> taggedInput : appliedPTransform.getInputs().entrySet()) {
checkArgument(
@@ -136,6 +137,7 @@ public class PTransformTranslation {
}
transformBuilder.setSpec(payload);
}
+ rawPTransform.registerComponents(components);
} else if (KNOWN_PAYLOAD_TRANSLATORS.containsKey(transform.getClass())) {
FunctionSpec payload =
KNOWN_PAYLOAD_TRANSLATORS
@@ -225,6 +227,8 @@ public class PTransformTranslation {
public Any getPayload() {
return null;
}
+
+ public void registerComponents(SdkComponents components) {}
}
/**
@@ -255,6 +259,10 @@ public class PTransformTranslation {
transformSpec.setParameter(payload);
}
+ // Transforms like Combine may have Coders that need to be added but do not
+ // occur in a black-box traversal
+ transform.getTransform().registerComponents(components);
+
return transformSpec.build();
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
new file mode 100644
index 0000000..9e4839a
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.auto.value.AutoValue;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.protobuf.Any;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.AppliedPTransform;
+import org.apache.beam.sdk.runners.TransformHierarchy;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PCollectionViews;
+import org.apache.beam.sdk.values.PInput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/** Utilities for going to/from Runner API pipelines. */
+public class PipelineTranslation {
+
+ public static RunnerApi.Pipeline toProto(final Pipeline pipeline) {
+ final SdkComponents components = SdkComponents.create();
+ final Collection<String> rootIds = new HashSet<>();
+ pipeline.traverseTopologically(
+ new PipelineVisitor.Defaults() {
+ private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children =
+ ArrayListMultimap.create();
+
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ if (node.isRootNode()) {
+ for (AppliedPTransform<?, ?, ?> pipelineRoot : children.get(node)) {
+ rootIds.add(components.getExistingPTransformId(pipelineRoot));
+ }
+ } else {
+ // TODO: Include DisplayData in the proto
+ children.put(node.getEnclosingNode(), node.toAppliedPTransform(pipeline));
+ try {
+ components.registerPTransform(
+ node.toAppliedPTransform(pipeline), children.get(node));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ // TODO: Include DisplayData in the proto
+ children.put(node.getEnclosingNode(), node.toAppliedPTransform(pipeline));
+ try {
+ components.registerPTransform(
+ node.toAppliedPTransform(pipeline),
+ Collections.<AppliedPTransform<?, ?, ?>>emptyList());
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+ });
+ return RunnerApi.Pipeline.newBuilder()
+ .setComponents(components.toComponents())
+ .addAllRootTransformIds(rootIds)
+ .build();
+ }
+
+ private static DisplayData evaluateDisplayData(HasDisplayData component) {
+ return DisplayData.from(component);
+ }
+
+ public static Pipeline fromProto(final RunnerApi.Pipeline pipelineProto)
+ throws IOException {
+ TransformHierarchy transforms = new TransformHierarchy();
+ Pipeline pipeline = Pipeline.forTransformHierarchy(transforms, PipelineOptionsFactory.create());
+
+ // Keeping the PCollections straight is a semantic necessity, but being careful not to explode
+ // the number of coders and windowing strategies is also nice, and helps testing.
+ RehydratedComponents rehydratedComponents =
+ RehydratedComponents.forComponents(pipelineProto.getComponents()).withPipeline(pipeline);
+
+ for (String rootId : pipelineProto.getRootTransformIdsList()) {
+ addRehydratedTransform(
+ transforms,
+ pipelineProto.getComponents().getTransformsOrThrow(rootId),
+ pipeline,
+ pipelineProto.getComponents().getTransformsMap(),
+ rehydratedComponents);
+ }
+
+ return pipeline;
+ }
+
+ private static void addRehydratedTransform(
+ TransformHierarchy transforms,
+ RunnerApi.PTransform transformProto,
+ Pipeline pipeline,
+ Map<String, RunnerApi.PTransform> transformProtos,
+ RehydratedComponents rehydratedComponents)
+ throws IOException {
+
+ Map<TupleTag<?>, PValue> rehydratedInputs = new HashMap<>();
+ for (Map.Entry<String, String> inputEntry : transformProto.getInputsMap().entrySet()) {
+ rehydratedInputs.put(
+ new TupleTag<>(inputEntry.getKey()),
+ rehydratedComponents.getPCollection(inputEntry.getValue()));
+ }
+
+ Map<TupleTag<?>, PValue> rehydratedOutputs = new HashMap<>();
+ for (Map.Entry<String, String> outputEntry : transformProto.getOutputsMap().entrySet()) {
+ rehydratedOutputs.put(
+ new TupleTag<>(outputEntry.getKey()),
+ rehydratedComponents.getPCollection(outputEntry.getValue()));
+ }
+
+ RunnerApi.FunctionSpec transformSpec = transformProto.getSpec();
+
+ // By default, no "additional" inputs, since that is an SDK-specific thing.
+ // Only ParDo really separates main from side inputs
+ Map<TupleTag<?>, PValue> additionalInputs = Collections.emptyMap();
+
+ // TODO: ParDoTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
+ if (transformSpec.getUrn().equals(PTransformTranslation.PAR_DO_TRANSFORM_URN)) {
+ RunnerApi.ParDoPayload payload =
+ transformSpec.getParameter().unpack(RunnerApi.ParDoPayload.class);
+
+ List<PCollectionView<?>> views = new ArrayList<>();
+ for (Map.Entry<String, RunnerApi.SideInput> sideInputEntry :
+ payload.getSideInputsMap().entrySet()) {
+ String localName = sideInputEntry.getKey();
+ RunnerApi.SideInput sideInput = sideInputEntry.getValue();
+ PCollection<?> pCollection =
+ (PCollection<?>) checkNotNull(rehydratedInputs.get(new TupleTag<>(localName)));
+ views.add(
+ ParDoTranslation.viewFromProto(
+ sideInputEntry.getValue(),
+ sideInputEntry.getKey(),
+ pCollection,
+ transformProto,
+ rehydratedComponents));
+ }
+ additionalInputs = PCollectionViews.toAdditionalInputs(views);
+ }
+
+ // TODO: CombineTranslator should own it - https://issues.apache.org/jira/browse/BEAM-2674
+ List<Coder<?>> additionalCoders = Collections.emptyList();
+ if (transformSpec.getUrn().equals(PTransformTranslation.COMBINE_TRANSFORM_URN)) {
+ RunnerApi.CombinePayload payload =
+ transformSpec.getParameter().unpack(RunnerApi.CombinePayload.class);
+ additionalCoders =
+ (List)
+ Collections.singletonList(
+ rehydratedComponents.getCoder(payload.getAccumulatorCoderId()));
+ }
+
+ RehydratedPTransform transform =
+ RehydratedPTransform.of(
+ transformSpec.getUrn(),
+ transformSpec.getParameter(),
+ additionalInputs,
+ additionalCoders);
+
+ if (isPrimitive(transformProto)) {
+ transforms.addFinalizedPrimitiveNode(
+ transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs);
+ } else {
+ transforms.pushFinalizedNode(
+ transformProto.getUniqueName(), rehydratedInputs, transform, rehydratedOutputs);
+
+ for (String childTransformId : transformProto.getSubtransformsList()) {
+ addRehydratedTransform(
+ transforms,
+ transformProtos.get(childTransformId),
+ pipeline,
+ transformProtos,
+ rehydratedComponents);
+ }
+
+ transforms.popNode();
+ }
+ }
+
+ // A primitive transform is one with outputs that are not in its input and also
+ // not produced by a subtransform.
+ private static boolean isPrimitive(RunnerApi.PTransform transformProto) {
+ return transformProto.getSubtransformsCount() == 0
+ && !transformProto
+ .getInputsMap()
+ .values()
+ .containsAll(transformProto.getOutputsMap().values());
+ }
+
+ @AutoValue
+ abstract static class RehydratedPTransform extends RawPTransform<PInput, POutput> {
+
+ @Nullable
+ public abstract String getUrn();
+
+ @Nullable
+ public abstract Any getPayload();
+
+ @Override
+ public abstract Map<TupleTag<?>, PValue> getAdditionalInputs();
+
+ public abstract List<Coder<?>> getCoders();
+
+ public static RehydratedPTransform of(
+ String urn,
+ Any payload,
+ Map<TupleTag<?>, PValue> additionalInputs,
+ List<Coder<?>> additionalCoders) {
+ return new AutoValue_PipelineTranslation_RehydratedPTransform(
+ urn, payload, additionalInputs, additionalCoders);
+ }
+
+ @Override
+ public POutput expand(PInput input) {
+ throw new IllegalStateException(
+ String.format(
+ "%s should never be asked to expand;"
+ + " it is the result of deserializing an already-constructed Pipeline",
+ getClass().getSimpleName()));
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("urn", getUrn())
+ .add("payload", getPayload())
+ .toString();
+ }
+
+ @Override
+ public void registerComponents(SdkComponents components) {
+ for (Coder<?> coder : getCoders()) {
+ try {
+ components.registerCoder(coder);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
index a9a34d7..ccdd4a7 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/RehydratedComponents.java
@@ -93,7 +93,8 @@ public class RehydratedComponents {
PCollection.class.getSimpleName(),
Pipeline.class.getSimpleName());
return PCollectionTranslation.fromProto(
- components.getPcollectionsOrThrow(id), pipeline, RehydratedComponents.this);
+ components.getPcollectionsOrThrow(id), pipeline, RehydratedComponents.this)
+ .setName(id);
}
});
http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
index 0d3ba60..54d2e9d 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SdkComponents.java
@@ -22,24 +22,16 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import com.google.common.base.Equivalence;
-import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
-import com.google.common.collect.ListMultimap;
import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.NameUtils;
import org.apache.beam.sdk.values.PCollection;
@@ -62,50 +54,6 @@ public class SdkComponents {
return new SdkComponents();
}
- public static RunnerApi.Pipeline translatePipeline(Pipeline pipeline) {
- final SdkComponents components = create();
- final Collection<String> rootIds = new HashSet<>();
- pipeline.traverseTopologically(
- new PipelineVisitor.Defaults() {
- private final ListMultimap<Node, AppliedPTransform<?, ?, ?>> children =
- ArrayListMultimap.create();
-
- @Override
- public void leaveCompositeTransform(Node node) {
- if (node.isRootNode()) {
- for (AppliedPTransform<?, ?, ?> pipelineRoot : children.get(node)) {
- rootIds.add(components.getExistingPTransformId(pipelineRoot));
- }
- } else {
- children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline()));
- try {
- components.registerPTransform(
- node.toAppliedPTransform(getPipeline()), children.get(node));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- @Override
- public void visitPrimitiveTransform(Node node) {
- children.put(node.getEnclosingNode(), node.toAppliedPTransform(getPipeline()));
- try {
- components.registerPTransform(
- node.toAppliedPTransform(getPipeline()),
- Collections.<AppliedPTransform<?, ?, ?>>emptyList());
- } catch (IOException e) {
- throw new IllegalStateException(e);
- }
- }
- });
- // TODO: Display Data
- return RunnerApi.Pipeline.newBuilder()
- .setComponents(components.toComponents())
- .addAllRootTransformIds(rootIds)
- .build();
- }
-
private SdkComponents() {
this.componentsBuilder = RunnerApi.Components.newBuilder();
this.transformIds = HashBiMap.create();
http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
new file mode 100644
index 0000000..9e6dff4
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PipelineTranslationTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.Equivalence;
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.Pipeline.PipelineVisitor;
+import org.apache.beam.sdk.coders.BigEndianLongCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StructuredCoder;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.runners.TransformHierarchy.Node;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.AfterPane;
+import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.joda.time.Duration;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+
+/** Tests for {@link PipelineTranslation}. */
+@RunWith(Parameterized.class)
+public class PipelineTranslationTest {
+ @Parameter(0)
+ public Pipeline pipeline;
+
+ @Parameters(name = "{index}")
+ public static Iterable<Pipeline> testPipelines() {
+ Pipeline trivialPipeline = Pipeline.create();
+ trivialPipeline.apply(Create.of(1, 2, 3));
+
+ Pipeline sideInputPipeline = Pipeline.create();
+ final PCollectionView<String> singletonView =
+ sideInputPipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
+ sideInputPipeline
+ .apply(Create.of("main input"))
+ .apply(
+ ParDo.of(
+ new DoFn<String, String>() {
+ @ProcessElement
+ public void process(ProcessContext c) {
+ // actually never executed and no effect on translation
+ c.sideInput(singletonView);
+ }
+ })
+ .withSideInputs(singletonView));
+
+ Pipeline complexPipeline = Pipeline.create();
+ BigEndianLongCoder customCoder = BigEndianLongCoder.of();
+ PCollection<Long> elems = complexPipeline.apply(GenerateSequence.from(0L).to(207L));
+ PCollection<Long> counted = elems.apply(Count.<Long>globally()).setCoder(customCoder);
+ PCollection<Long> windowed =
+ counted.apply(
+ Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7)))
+ .triggering(
+ AfterWatermark.pastEndOfWindow()
+ .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
+ .accumulatingFiredPanes()
+ .withAllowedLateness(Duration.standardMinutes(3L)));
+ final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy();
+ PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo"));
+ PCollection<KV<String, Iterable<Long>>> grouped =
+ keyed.apply(GroupByKey.<String, Long>create());
+
+ return ImmutableList.of(trivialPipeline, sideInputPipeline, complexPipeline);
+ }
+
+ @Test
+ public void testProtoDirectly() {
+ final RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+ pipeline.traverseTopologically(
+ new PipelineProtoVerificationVisitor(pipelineProto));
+ }
+
+ @Test
+ public void testProtoAgainstRehydrated() throws Exception {
+ RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
+ Pipeline rehydrated = PipelineTranslation.fromProto(pipelineProto);
+
+ rehydrated.traverseTopologically(
+ new PipelineProtoVerificationVisitor(pipelineProto));
+ }
+
+ private static class PipelineProtoVerificationVisitor extends PipelineVisitor.Defaults {
+
+ private final RunnerApi.Pipeline pipelineProto;
+ Set<Node> transforms;
+ Set<PCollection<?>> pcollections;
+ Set<Equivalence.Wrapper<? extends Coder<?>>> coders;
+ Set<WindowingStrategy<?, ?>> windowingStrategies;
+
+ public PipelineProtoVerificationVisitor(RunnerApi.Pipeline pipelineProto) {
+ this.pipelineProto = pipelineProto;
+ transforms = new HashSet<>();
+ pcollections = new HashSet<>();
+ coders = new HashSet<>();
+ windowingStrategies = new HashSet<>();
+ }
+
+ @Override
+ public void leaveCompositeTransform(Node node) {
+ if (node.isRootNode()) {
+ assertThat(
+ "Unexpected number of PTransforms",
+ pipelineProto.getComponents().getTransformsCount(),
+ equalTo(transforms.size()));
+ assertThat(
+ "Unexpected number of PCollections",
+ pipelineProto.getComponents().getPcollectionsCount(),
+ equalTo(pcollections.size()));
+ assertThat(
+ "Unexpected number of Coders",
+ pipelineProto.getComponents().getCodersCount(),
+ equalTo(coders.size()));
+ assertThat(
+ "Unexpected number of Windowing Strategies",
+ pipelineProto.getComponents().getWindowingStrategiesCount(),
+ equalTo(windowingStrategies.size()));
+ } else {
+ transforms.add(node);
+ if (PTransformTranslation.COMBINE_TRANSFORM_URN.equals(
+ PTransformTranslation.urnForTransformOrNull(node.getTransform()))) {
+ // Combine translation introduces a coder that is not assigned to any PCollection
+ // in the default expansion, and must be explicitly added here.
+ try {
+ addCoders(
+ CombineTranslation.getAccumulatorCoder(node.toAppliedPTransform(getPipeline())));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void visitPrimitiveTransform(Node node) {
+ transforms.add(node);
+ }
+
+ @Override
+ public void visitValue(PValue value, Node producer) {
+ if (value instanceof PCollection) {
+ PCollection pc = (PCollection) value;
+ pcollections.add(pc);
+ addCoders(pc.getCoder());
+ windowingStrategies.add(pc.getWindowingStrategy());
+ addCoders(pc.getWindowingStrategy().getWindowFn().windowCoder());
+ }
+ }
+
+ private void addCoders(Coder<?> coder) {
+ coders.add(Equivalence.<Coder<?>>identity().wrap(coder));
+ if (CoderTranslation.KNOWN_CODER_URNS.containsKey(coder.getClass())) {
+ for (Coder<?> component : ((StructuredCoder<?>) coder).getComponents()) {
+ addCoders(component);
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
index ce6a99f..82840d6 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SdkComponentsTest.java
@@ -24,43 +24,25 @@ import static org.hamcrest.Matchers.isEmptyOrNullString;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
-import com.google.common.base.Equivalence;
import java.io.IOException;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-import org.apache.beam.sdk.Pipeline.PipelineVisitor;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SetCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.Components;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.runners.AppliedPTransform;
-import org.apache.beam.sdk.runners.TransformHierarchy.Node;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.WithKeys;
-import org.apache.beam.sdk.transforms.windowing.AfterPane;
-import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode;
import org.hamcrest.Matchers;
-import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -78,95 +60,6 @@ public class SdkComponentsTest {
private SdkComponents components = SdkComponents.create();
@Test
- public void translatePipeline() {
- BigEndianLongCoder customCoder = BigEndianLongCoder.of();
- PCollection<Long> elems = pipeline.apply(GenerateSequence.from(0L).to(207L));
- PCollection<Long> counted = elems.apply(Count.<Long>globally()).setCoder(customCoder);
- PCollection<Long> windowed =
- counted.apply(
- Window.<Long>into(FixedWindows.of(Duration.standardMinutes(7)))
- .triggering(
- AfterWatermark.pastEndOfWindow()
- .withEarlyFirings(AfterPane.elementCountAtLeast(19)))
- .accumulatingFiredPanes()
- .withAllowedLateness(Duration.standardMinutes(3L)));
- final WindowingStrategy<?, ?> windowedStrategy = windowed.getWindowingStrategy();
- PCollection<KV<String, Long>> keyed = windowed.apply(WithKeys.<String, Long>of("foo"));
- PCollection<KV<String, Iterable<Long>>> grouped =
- keyed.apply(GroupByKey.<String, Long>create());
-
- final RunnerApi.Pipeline pipelineProto = SdkComponents.translatePipeline(pipeline);
- pipeline.traverseTopologically(
- new PipelineVisitor.Defaults() {
- Set<Node> transforms = new HashSet<>();
- Set<PCollection<?>> pcollections = new HashSet<>();
- Set<Equivalence.Wrapper<? extends Coder<?>>> coders = new HashSet<>();
- Set<WindowingStrategy<?, ?>> windowingStrategies = new HashSet<>();
-
- @Override
- public void leaveCompositeTransform(Node node) {
- if (node.isRootNode()) {
- assertThat(
- "Unexpected number of PTransforms",
- pipelineProto.getComponents().getTransformsCount(),
- equalTo(transforms.size()));
- assertThat(
- "Unexpected number of PCollections",
- pipelineProto.getComponents().getPcollectionsCount(),
- equalTo(pcollections.size()));
- assertThat(
- "Unexpected number of Coders",
- pipelineProto.getComponents().getCodersCount(),
- equalTo(coders.size()));
- assertThat(
- "Unexpected number of Windowing Strategies",
- pipelineProto.getComponents().getWindowingStrategiesCount(),
- equalTo(windowingStrategies.size()));
- } else {
- transforms.add(node);
- if (PTransformTranslation.COMBINE_TRANSFORM_URN.equals(
- PTransformTranslation.urnForTransformOrNull(node.getTransform()))) {
- // Combine translation introduces a coder that is not assigned to any PCollection
- // in the default expansion, and must be explicitly added here.
- try {
- addCoders(
- CombineTranslation.getAccumulatorCoder(
- node.toAppliedPTransform(getPipeline())));
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
- }
-
- @Override
- public void visitPrimitiveTransform(Node node) {
- transforms.add(node);
- }
-
- @Override
- public void visitValue(PValue value, Node producer) {
- if (value instanceof PCollection) {
- PCollection pc = (PCollection) value;
- pcollections.add(pc);
- addCoders(pc.getCoder());
- windowingStrategies.add(pc.getWindowingStrategy());
- addCoders(pc.getWindowingStrategy().getWindowFn().windowCoder());
- }
- }
-
- private void addCoders(Coder<?> coder) {
- coders.add(Equivalence.<Coder<?>>identity().wrap(coder));
- if (coder instanceof StructuredCoder) {
- for (Coder<?> component : ((StructuredCoder<?>) coder).getComponents()) {
- addCoders(component);
- }
- }
- }
- });
- }
-
- @Test
public void registerCoder() throws IOException {
Coder<?> coder =
KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(SetCoder.of(ByteArrayCoder.of())));
http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
----------------------------------------------------------------------
diff --git a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
index 711da2a..0c433fa 100644
--- a/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
+++ b/sdks/common/runner-api/src/main/proto/beam_runner_api.proto
@@ -92,7 +92,9 @@ message Pipeline {
// this pipeline.
Components components = 1;
- // (Required) The ids of all PTransforms that are not contained within another PTransform
+ // (Required) The ids of all PTransforms that are not contained within another PTransform.
+ // These must be in shallow topological order, so that traversing them recursively
+ // in this order yields a recursively topological traversal.
repeated string root_transform_ids = 2;
// (Optional) Static display data for the pipeline. If there is none,
http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index bdf8a12..760efb3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -180,6 +180,12 @@ public class Pipeline {
return begin().apply(name, root);
}
+ @Internal
+ public static Pipeline forTransformHierarchy(
+ TransformHierarchy transforms, PipelineOptions options) {
+ return new Pipeline(transforms, options);
+ }
+
/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
@@ -476,16 +482,21 @@ public class Pipeline {
/////////////////////////////////////////////////////////////////////////////
// Below here are internal operations, never called by users.
- private final TransformHierarchy transforms = new TransformHierarchy();
+ private final TransformHierarchy transforms;
private Set<String> usedFullNames = new HashSet<>();
private CoderRegistry coderRegistry;
private final List<String> unstableNames = new ArrayList<>();
private final PipelineOptions defaultOptions;
- protected Pipeline(PipelineOptions options) {
+ private Pipeline(TransformHierarchy transforms, PipelineOptions options) {
+ this.transforms = transforms;
this.defaultOptions = options;
}
+ protected Pipeline(PipelineOptions options) {
+ this(new TransformHierarchy(), options);
+ }
+
@Override
public String toString() {
return "Pipeline#" + hashCode();
http://git-wip-us.apache.org/repos/asf/beam/blob/43481595/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
index d8ff59e..c2d5771 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/TransformHierarchy.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.Pipeline.PipelineVisitor.CompositeBehavior;
@@ -98,6 +99,48 @@ public class TransformHierarchy {
return current;
}
+ @Internal
+ public Node pushFinalizedNode(
+ String name,
+ Map<TupleTag<?>, PValue> inputs,
+ PTransform<?, ?> transform,
+ Map<TupleTag<?>, PValue> outputs) {
+ checkNotNull(
+ transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName());
+ checkNotNull(
+ name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName());
+ checkNotNull(
+ inputs, "An input must be provided for all %s Nodes", PTransform.class.getSimpleName());
+ Node node = new Node(current, transform, name, inputs, outputs);
+ node.finishedSpecifying = true;
+ current.addComposite(node);
+ current = node;
+ return current;
+ }
+
+ @Internal
+ public Node addFinalizedPrimitiveNode(
+ String name,
+ Map<TupleTag<?>, PValue> inputs,
+ PTransform<?, ?> transform,
+ Map<TupleTag<?>, PValue> outputs) {
+ checkNotNull(
+ transform, "A %s must be provided for all Nodes", PTransform.class.getSimpleName());
+ checkNotNull(
+ name, "A name must be provided for all %s Nodes", PTransform.class.getSimpleName());
+ checkNotNull(
+ inputs, "Inputs must be provided for all %s Nodes", PTransform.class.getSimpleName());
+ checkNotNull(
+ outputs, "Outputs must be provided for all %s Nodes", PTransform.class.getSimpleName());
+ Node node = new Node(current, transform, name, inputs, outputs);
+ node.finishedSpecifying = true;
+ for (PValue output : outputs.values()) {
+ producers.put(output, node);
+ }
+ current.addComposite(node);
+ return node;
+ }
+
public Node replaceNode(Node existing, PInput input, PTransform<?, ?> transform) {
checkNotNull(existing);
checkNotNull(input);
@@ -321,6 +364,32 @@ public class TransformHierarchy {
}
/**
+ * Creates a new {@link Node} with the given parent and transform, where inputs and outputs
+ * are already known.
+ *
+ * <p>EnclosingNode and transform may both be null for a root-level node, which holds all other
+ * nodes.
+ *
+ * @param enclosingNode the composite node containing this node
+ * @param transform the PTransform tracked by this node
+ * @param fullName the fully qualified name of the transform
+ * @param inputs the expanded inputs to the transform
+ * @param outputs the expanded outputs of the transform
+ */
+ private Node(
+ @Nullable Node enclosingNode,
+ @Nullable PTransform<?, ?> transform,
+ String fullName,
+ @Nullable Map<TupleTag<?>, PValue> inputs,
+ @Nullable Map<TupleTag<?>, PValue> outputs) {
+ this.enclosingNode = enclosingNode;
+ this.transform = transform;
+ this.fullName = fullName;
+ this.inputs = inputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : inputs;
+ this.outputs = outputs == null ? Collections.<TupleTag<?>, PValue>emptyMap() : outputs;
+ }
+
+ /**
* Returns the transform associated with this transform node.
*/
public PTransform<?, ?> getTransform() {
[4/5] beam git commit: Dehydrate then rehydrate Pipeline before
DirectRunner.run()
Posted by ke...@apache.org.
Dehydrate then rehydrate Pipeline before DirectRunner.run()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8ca45915
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8ca45915
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8ca45915
Branch: refs/heads/master
Commit: 8ca459158888693839edb14f824fa6835ebe3e67
Parents: 4348159
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 11:23:05 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:53:26 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 11 +++++-
.../runners/direct/ViewOverrideFactoryTest.java | 41 --------------------
2 files changed, 10 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8ca45915/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 4621224..c5f29e5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -22,6 +22,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -31,6 +32,7 @@ import java.util.Set;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -156,7 +158,14 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
}
@Override
- public DirectPipelineResult run(Pipeline pipeline) {
+ public DirectPipelineResult run(Pipeline originalPipeline) {
+ Pipeline pipeline;
+ try {
+ pipeline = PipelineTranslation.fromProto(
+ PipelineTranslation.toProto(originalPipeline));
+ } catch (IOException exception) {
+ throw new RuntimeException("Error preparing pipeline for direct execution.", exception);
+ }
pipeline.replaceAll(defaultTransformOverrides());
MetricsEnvironment.setMetricsSupported(true);
DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
http://git-wip-us.apache.org/repos/asf/beam/blob/8ca45915/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 6af9273..94d8d70 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -23,22 +23,17 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
-import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
@@ -62,42 +57,6 @@ public class ViewOverrideFactoryTest implements Serializable {
new ViewOverrideFactory<>();
@Test
- public void replacementSucceeds() {
- PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
- final PCollectionView<List<Integer>> view =
- PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
- PTransformReplacement<PCollection<Integer>, PCollection<Integer>>
- replacementTransform =
- factory.getReplacementTransform(
- AppliedPTransform
- .<PCollection<Integer>, PCollection<Integer>,
- PTransform<PCollection<Integer>, PCollection<Integer>>>
- of(
- "foo",
- ints.expand(),
- view.expand(),
- CreatePCollectionView.<Integer, List<Integer>>of(view),
- p));
- ints.apply(replacementTransform.getTransform());
-
- PCollection<Set<Integer>> outputViewContents =
- p.apply("CreateSingleton", Create.of(0))
- .apply(
- "OutputContents",
- ParDo.of(
- new DoFn<Integer, Set<Integer>>() {
- @ProcessElement
- public void outputSideInput(ProcessContext context) {
- context.output(ImmutableSet.copyOf(context.sideInput(view)));
- }
- })
- .withSideInputs(view));
- PAssert.thatSingleton(outputViewContents).isEqualTo(ImmutableSet.of(1, 2, 3));
-
- p.run();
- }
-
- @Test
public void replacementGetViewReturnsOriginal() {
final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
final PCollectionView<List<Integer>> view =
[2/5] beam git commit: Add stub DisplayDataTranslation
Posted by ke...@apache.org.
Add stub DisplayDataTranslation
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/efe2dc17
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/efe2dc17
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/efe2dc17
Branch: refs/heads/master
Commit: efe2dc17af2a66c5c33076467cb46f73bb7fb9ab
Parents: 0064fb3
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Jul 19 20:09:52 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:53:25 2017 -0700
----------------------------------------------------------------------
.../construction/DisplayDataTranslation.java | 39 ++++++++++++++++++++
.../construction/PTransformTranslation.java | 4 +-
2 files changed, 42 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/efe2dc17/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java
new file mode 100644
index 0000000..ff7f9f2
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DisplayDataTranslation.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.google.protobuf.Any;
+import com.google.protobuf.BoolValue;
+import org.apache.beam.sdk.common.runner.v1.RunnerApi;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+
+/** Utilities for going to/from DisplayData protos. */
+public class DisplayDataTranslation {
+ public static RunnerApi.DisplayData toProto(DisplayData displayData) {
+ // TODO https://issues.apache.org/jira/browse/BEAM-2645
+ return RunnerApi.DisplayData.newBuilder()
+ .addItems(
+ RunnerApi.DisplayData.Item.newBuilder()
+ .setId(RunnerApi.DisplayData.Identifier.newBuilder().setKey("stubImplementation"))
+ .setLabel("Stub implementation")
+ .setType(RunnerApi.DisplayData.Type.BOOLEAN)
+ .setValue(Any.pack(BoolValue.newBuilder().setValue(true).build())))
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/efe2dc17/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
index 3b94724..d459645 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformTranslation.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.common.runner.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
@@ -118,7 +119,8 @@ public class PTransformTranslation {
}
transformBuilder.setUniqueName(appliedPTransform.getFullName());
- // TODO: Display Data
+ transformBuilder.setDisplayData(
+ DisplayDataTranslation.toProto(DisplayData.from(appliedPTransform.getTransform())));
PTransform<?, ?> transform = appliedPTransform.getTransform();
// A RawPTransform directly vends its payload. Because it will generally be