You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/31 21:09:06 UTC

[2/2] beam git commit: Use Batch Replacement in the Apex Runner

Use Batch Replacement in the Apex Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d89e9d7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d89e9d7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d89e9d7d

Branch: refs/heads/master
Commit: d89e9d7d3a3ea952e6eb0784f717203460afe90f
Parents: c81f83b
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 30 15:55:28 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 31 14:08:55 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    | 40 ++++++++++----------
 1 file changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d89e9d7d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index dfc8f63..d23fc14 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -22,7 +22,7 @@ import com.datatorrent.api.Context.DAGContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
@@ -31,7 +31,6 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.apex.api.EmbeddedAppLauncher;
@@ -48,9 +47,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
@@ -96,27 +93,30 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     return new ApexRunner(apexPipelineOptions);
   }
 
-  private Map<PTransformMatcher, PTransformOverrideFactory> getOverrides() {
-    return ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
-        .put(PTransformMatchers.classEqualTo(Create.Values.class), new PrimitiveCreate.Factory())
-        .put(
-            PTransformMatchers.classEqualTo(View.AsSingleton.class),
-            new StreamingViewAsSingleton.Factory())
-        .put(
-            PTransformMatchers.classEqualTo(View.AsIterable.class),
-            new StreamingViewAsIterable.Factory())
-        .put(
-            PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
-            new StreamingCombineGloballyAsSingletonView.Factory())
+  private List<PTransformOverride> getOverrides() {
+    return ImmutableList.<PTransformOverride>builder()
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(Create.Values.class),
+                new PrimitiveCreate.Factory()))
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(View.AsSingleton.class),
+                new StreamingViewAsSingleton.Factory()))
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(View.AsIterable.class),
+                new StreamingViewAsIterable.Factory()))
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+                new StreamingCombineGloballyAsSingletonView.Factory()))
         .build();
   }
 
   @Override
   public ApexRunnerResult run(final Pipeline pipeline) {
-    for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
-        getOverrides().entrySet()) {
-      pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
-    }
+    pipeline.replaceAll(getOverrides());
 
     final ApexPipelineTranslator translator = new ApexPipelineTranslator(options);
     final AtomicReference<DAG> apexDAG = new AtomicReference<>();