You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/03/21 08:19:35 UTC
[beam] branch master updated: [BEAM-6865] DefaultJobBundleFactory:
create using default environmentFactoryProviderMap
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 06054cc [BEAM-6865] DefaultJobBundleFactory: create using default environmentFactoryProviderMap
new c961139 Merge pull request #8086 [BEAM-6865] DefaultJobBundleFactory and environmentFactoryProviderMap
06054cc is described below
commit 06054ccae7995ed2a83d6d9653b911abb16636d7
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Mon Mar 18 16:07:30 2019 -0700
[BEAM-6865] DefaultJobBundleFactory: create using default environmentFactoryProviderMap
---
.../FlinkDefaultExecutableStageContext.java | 23 +--------------------
.../control/DefaultJobBundleFactory.java | 24 ++++++++++++++++++++++
2 files changed, 25 insertions(+), 22 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
index ccad674..1632a4a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDefaultExecutableStageContext.java
@@ -21,43 +21,22 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
-import org.apache.beam.runners.core.construction.BeamUrns;
-import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory;
import org.apache.beam.runners.fnexecution.control.JobBundleFactory;
import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
-import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
/** Implementation of a {@link FlinkExecutableStageContext}. */
class FlinkDefaultExecutableStageContext implements FlinkExecutableStageContext, AutoCloseable {
private final JobBundleFactory jobBundleFactory;
private static FlinkDefaultExecutableStageContext create(JobInfo jobInfo) {
- JobBundleFactory jobBundleFactory =
- DefaultJobBundleFactory.create(
- jobInfo,
- ImmutableMap.of(
- BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER),
- new DockerEnvironmentFactory.Provider(
- PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())),
- BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS),
- new ProcessEnvironmentFactory.Provider(),
- BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL),
- new ExternalEnvironmentFactory.Provider(),
- Environments.ENVIRONMENT_EMBEDDED, // Non Public urn for testing.
- new EmbeddedEnvironmentFactory.Provider(
- PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions()))));
+ JobBundleFactory jobBundleFactory = DefaultJobBundleFactory.create(jobInfo);
return new FlinkDefaultExecutableStageContext(jobBundleFactory);
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
index 7a6e647..5e9ae9d 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DefaultJobBundleFactory.java
@@ -26,6 +26,10 @@ import java.util.concurrent.Executors;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardEnvironments;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.construction.Environments;
+import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
@@ -35,7 +39,11 @@ import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrie
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.BundleProcessor;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.EmbeddedEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
@@ -78,6 +86,22 @@ public class DefaultJobBundleFactory implements JobBundleFactory {
private final MapControlClientPool clientPool;
private final IdGenerator stageIdGenerator;
+ public static DefaultJobBundleFactory create(JobInfo jobInfo) {
+ Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap =
+ ImmutableMap.of(
+ BeamUrns.getUrn(StandardEnvironments.Environments.DOCKER),
+ new DockerEnvironmentFactory.Provider(
+ PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())),
+ BeamUrns.getUrn(StandardEnvironments.Environments.PROCESS),
+ new ProcessEnvironmentFactory.Provider(),
+ BeamUrns.getUrn(StandardEnvironments.Environments.EXTERNAL),
+ new ExternalEnvironmentFactory.Provider(),
+ Environments.ENVIRONMENT_EMBEDDED, // Non Public urn for testing.
+ new EmbeddedEnvironmentFactory.Provider(
+ PipelineOptionsTranslation.fromProto(jobInfo.pipelineOptions())));
+ return new DefaultJobBundleFactory(jobInfo, environmentFactoryProviderMap);
+ }
+
public static DefaultJobBundleFactory create(
JobInfo jobInfo, Map<String, EnvironmentFactory.Provider> environmentFactoryProviderMap) {
return new DefaultJobBundleFactory(jobInfo, environmentFactoryProviderMap);