You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/07/25 02:21:46 UTC

[4/5] beam git commit: Dehydrate then rehydrate Pipeline before DirectRunner.run()

Dehydrate then rehydrate Pipeline before DirectRunner.run()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/8ca45915
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/8ca45915
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/8ca45915

Branch: refs/heads/master
Commit: 8ca459158888693839edb14f824fa6835ebe3e67
Parents: 4348159
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri May 26 11:23:05 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Mon Jul 24 18:53:26 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 11 +++++-
 .../runners/direct/ViewOverrideFactoryTest.java | 41 --------------------
 2 files changed, 10 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/8ca45915/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 4621224..c5f29e5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -22,6 +22,7 @@ import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -31,6 +32,7 @@ import java.util.Set;
 import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
 import org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
+import org.apache.beam.runners.core.construction.PipelineTranslation;
 import org.apache.beam.runners.core.construction.SplittableParDo;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -156,7 +158,14 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult> {
   }
 
   @Override
-  public DirectPipelineResult run(Pipeline pipeline) {
+  public DirectPipelineResult run(Pipeline originalPipeline) {
+    Pipeline pipeline;
+    try {
+      pipeline = PipelineTranslation.fromProto(
+          PipelineTranslation.toProto(originalPipeline));
+    } catch (IOException exception) {
+      throw new RuntimeException("Error preparing pipeline for direct execution.", exception);
+    }
     pipeline.replaceAll(defaultTransformOverrides());
     MetricsEnvironment.setMetricsSupported(true);
     DirectGraphVisitor graphVisitor = new DirectGraphVisitor();

http://git-wip-us.apache.org/repos/asf/beam/blob/8ca45915/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
index 6af9273..94d8d70 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java
@@ -23,22 +23,17 @@ import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 
-import com.google.common.collect.ImmutableSet;
 import java.io.Serializable;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.runners.direct.ViewOverrideFactory.WriteView;
 import org.apache.beam.sdk.Pipeline.PipelineVisitor;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory.PTransformReplacement;
 import org.apache.beam.sdk.runners.TransformHierarchy.Node;
-import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View.CreatePCollectionView;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
@@ -62,42 +57,6 @@ public class ViewOverrideFactoryTest implements Serializable {
       new ViewOverrideFactory<>();
 
   @Test
-  public void replacementSucceeds() {
-    PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
-    final PCollectionView<List<Integer>> view =
-        PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder());
-    PTransformReplacement<PCollection<Integer>, PCollection<Integer>>
-        replacementTransform =
-            factory.getReplacementTransform(
-                AppliedPTransform
-                    .<PCollection<Integer>, PCollection<Integer>,
-                        PTransform<PCollection<Integer>, PCollection<Integer>>>
-                        of(
-                            "foo",
-                            ints.expand(),
-                            view.expand(),
-                            CreatePCollectionView.<Integer, List<Integer>>of(view),
-                            p));
-    ints.apply(replacementTransform.getTransform());
-
-    PCollection<Set<Integer>> outputViewContents =
-        p.apply("CreateSingleton", Create.of(0))
-            .apply(
-                "OutputContents",
-                ParDo.of(
-                        new DoFn<Integer, Set<Integer>>() {
-                          @ProcessElement
-                          public void outputSideInput(ProcessContext context) {
-                            context.output(ImmutableSet.copyOf(context.sideInput(view)));
-                          }
-                        })
-                    .withSideInputs(view));
-    PAssert.thatSingleton(outputViewContents).isEqualTo(ImmutableSet.of(1, 2, 3));
-
-    p.run();
-  }
-
-  @Test
   public void replacementGetViewReturnsOriginal() {
     final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3));
     final PCollectionView<List<Integer>> view =