You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by tg...@apache.org on 2017/03/31 21:09:06 UTC
[2/2] beam git commit: Use Batch Replacement in the Apex Runner
Use Batch Replacement in the Apex Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d89e9d7d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d89e9d7d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d89e9d7d
Branch: refs/heads/master
Commit: d89e9d7d3a3ea952e6eb0784f717203460afe90f
Parents: c81f83b
Author: Thomas Groh <tg...@google.com>
Authored: Thu Mar 30 15:55:28 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Mar 31 14:08:55 2017 -0700
----------------------------------------------------------------------
.../apache/beam/runners/apex/ApexRunner.java | 40 ++++++++++----------
1 file changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/d89e9d7d/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index dfc8f63..d23fc14 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -22,7 +22,7 @@ import com.datatorrent.api.Context.DAGContext;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -31,7 +31,6 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.apex.api.EmbeddedAppLauncher;
@@ -48,9 +47,7 @@ import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
-import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Combine.GloballyAsSingletonView;
@@ -96,27 +93,30 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
return new ApexRunner(apexPipelineOptions);
}
- private Map<PTransformMatcher, PTransformOverrideFactory> getOverrides() {
- return ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
- .put(PTransformMatchers.classEqualTo(Create.Values.class), new PrimitiveCreate.Factory())
- .put(
- PTransformMatchers.classEqualTo(View.AsSingleton.class),
- new StreamingViewAsSingleton.Factory())
- .put(
- PTransformMatchers.classEqualTo(View.AsIterable.class),
- new StreamingViewAsIterable.Factory())
- .put(
- PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
- new StreamingCombineGloballyAsSingletonView.Factory())
+ private List<PTransformOverride> getOverrides() {
+ return ImmutableList.<PTransformOverride>builder()
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Create.Values.class),
+ new PrimitiveCreate.Factory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsSingleton.class),
+ new StreamingViewAsSingleton.Factory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(View.AsIterable.class),
+ new StreamingViewAsIterable.Factory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(Combine.GloballyAsSingletonView.class),
+ new StreamingCombineGloballyAsSingletonView.Factory()))
.build();
}
@Override
public ApexRunnerResult run(final Pipeline pipeline) {
- for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
- getOverrides().entrySet()) {
- pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
- }
+ pipeline.replaceAll(getOverrides());
final ApexPipelineTranslator translator = new ApexPipelineTranslator(options);
final AtomicReference<DAG> apexDAG = new AtomicReference<>();