You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/03/21 08:19:35 UTC

[beam] branch master updated: [BEAM-6865] DefaultJobBundleFactory: create using default environmentFactoryProviderMap

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 06054cc  [BEAM-6865] DefaultJobBundleFactory: create using default environmentFactoryProviderMap
     new c961139  Merge pull request #8086 [BEAM-6865] DefaultJobBundleFactory and environmentFactoryProviderMap
06054cc is described below

commit 06054ccae7995ed2a83d6d9653b911abb16636d7
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Mon Mar 18 16:07:30 2019 -0700

    [BEAM-6865] DefaultJobBundleFactory: create using default environmentFactoryProviderMap
---
 .../FlinkDefaultExecutableStageContext.java        | 23 +--------------------
 .../control/DefaultJobBundleFactory.java           | 24 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 22 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
index ccad674..1632a4a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
@@ -21,43 +21,22 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
-import org.apache.beam.runners.core.construction.BeamUrns;
-import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
 import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
-import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
 
 /** Implementation of a {@link FlinkExecutableStageContext}. */
 class FlinkDefaultExecutableStageContext implements FlinkExecutableStageContext, AutoCloseable {
   private final JobBundleFactory jobBundleFactory;
 
   private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) {
-    JobBundleFactory jobBundleFactory =
-        DefaultJobBundleFactory.create(
-            jobInfo,
-            ImmutableMap.of(
-                BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER),
-                new DockerEnvironmentFactory.Provider(
-                    PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())),
-                BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS),
-                new ProcessEnvironmentFactory.Provider(),
-                BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL),
-                new ExternalEnvironmentFactory.Provider(),
-                Environments.ENVIRONMENT_EMBEDDED, // Non Public urn for testing.
-                new EmbeddedEnvironmentFactory.Provider(
-                    PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()))));
+    JobBundleFactory jobBundleFactory = DefaultJobBundleFactory.create(jobInfo);
     return new FlinkDefaultExecutableStageContext(jobBundleFactory);
   }
 
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index 7a6e647..5e9ae9d 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -26,6 +26,10 @@ import java.util.concurrent.Executors;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
@@ -35,7 +39,11 @@ import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrie
 import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
 import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.BundleProcessor;
 import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
 import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
 import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
 import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
 import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
@@ -78,6 +86,22 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
   private final MapControlClientPool clientPool;
   private final IdGenerator stageIdGenerator;
 
+  public static DefaultJobBundleFactory create(JobInfo jobInfo) {
+    Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap =
+        ImmutableMap.of(
+            BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER),
+            new DockerEnvironmentFactory.Provider(
+                PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())),
+            BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS),
+            new ProcessEnvironmentFactory.Provider(),
+            BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL),
+            new ExternalEnvironmentFactory.Provider(),
+            Environments.ENVIRONMENT_EMBEDDED, // Non Public urn for testing.
+            new EmbeddedEnvironmentFactory.Provider(
+                PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())));
+    return new DefaultJobBundleFactory(jobInfo, environmentFactoryProviderMap);
+  }
+
   public static DefaultJobBundleFactory create(
       JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap) {
     return new DefaultJobBundleFactory(jobInfo, environmentFactoryProviderMap);