You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/25 02:21:46 UTC
[4/5] beam git commit: Dehydrate then rehydrate Pipeline before
DirectRunner.run()
Dehydrate then rehydrate Pipeline before DirectRunner.run()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8ca45915
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8ca45915
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8ca45915
Branch: refs/heads/master
Commit: 8ca459158888693839edb14f824fa6835ebe3e67
Parents: 4348159
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 11:23:05 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:53:26 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 11 +++++-
.../runners/direct/ViewOverrideFactoryTest.java | 41 --------------------
2 files changed, 10 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/8ca45915/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 4621224..c5f29e5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -22,6 +22,7 @@ import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -31,6 +32,7 @@ import java.util.Set;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -156,7 +158,14 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
}
@Override
- public DirectPipelineResult run(Pipeline pipeline) {
+ public DirectPipelineResult run(Pipeline originalPipeline) {
+ Pipeline pipeline;
+ try {
+ pipeline = PipelineTranslation.fromProto(
+ PipelineTranslation.toProto(originalPipeline));
+ } catch (IOException exception) {
+ throw new RuntimeException("Error preparing pipeline for direct execution.", exception);
+ }
pipeline.replaceAll(defaultTransformOverrides());
MetricsEnvironment.setMetricsSupported(true);
DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
http://git-wip-us.apache.org/repos/asf/beam/blob/8ca45915/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 6af9273..94d8d70 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -23,22 +23,17 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
-import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
import org.apache.beam.sdk.Pipeline.PipelineVisitor;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
import org.apache.beam.sdk.transforms.ViewFn;
import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
@@ -62,42 +57,6 @@ public class ViewOverrideFactoryTest implements Serializable {
new ViewOverrideFactory<>();
@Test
- public void replacementSucceeds() {
- PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
- final PCollectionView<List<Integer>> view =
- PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
- PTransformReplacement<PCollection<Integer>, PCollection<Integer>>
- replacementTransform =
- factory.getReplacementTransform(
- AppliedPTransform
- .<PCollection<Integer>, PCollection<Integer>,
- PTransform<PCollection<Integer>, PCollection<Integer>>>
- of(
- "foo",
- ints.expand(),
- view.expand(),
- CreatePCollectionView.<Integer, List<Integer>>of(view),
- p));
- ints.apply(replacementTransform.getTransform());
-
- PCollection<Set<Integer>> outputViewContents =
- p.apply("CreateSingleton", Create.of(0))
- .apply(
- "OutputContents",
- ParDo.of(
- new DoFn<Integer, Set<Integer>>() {
- @ProcessElement
- public void outputSideInput(ProcessContext context) {
- context.output(ImmutableSet.copyOf(context.sideInput(view)));
- }
- })
- .withSideInputs(view));
- PAssert.thatSingleton(outputViewContents).isEqualTo(ImmutableSet.of(1, 2, 3));
-
- p.run();
- }
-
- @Test
public void replacementGetViewReturnsOriginal() {
final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
final PCollectionView<List<Integer>> view =