You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/03/20 09:21:18 UTC

[beam] branch master updated: [BEAM-6864] Pass JobInfo to PortablePipelineRunner.run

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7325762  [BEAM-6864] Pass JobInfo to PortablePipelineRunner.run
     new 3beb96d  Merge pull request #8087 [BEAM-6864] Pass JobInfo to PortablePipelineRunner.run
7325762 is described below

commit 7325762b5077e05329cc6038701691e6304d64fd
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Mon Mar 18 16:52:00 2019 -0700

    [BEAM-6864] Pass JobInfo to PortablePipelineRunner.run
---
 .../apache/beam/runners/flink/FlinkJobInvoker.java | 11 ++++++++--
 .../beam/runners/flink/FlinkPipelineRunner.java    | 24 +++++-----------------
 .../fnexecution/jobsubmission/JobInvocation.java   | 11 +++++-----
 .../jobsubmission/PortablePipelineRunner.java      |  3 ++-
 .../beam/runners/samza/SamzaJobServerDriver.java   |  9 +++++++-
 .../beam/runners/samza/SamzaPipelineRunner.java    |  3 ++-
 6 files changed, 32 insertions(+), 29 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index 38dcdc8..3dc22c2 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -27,6 +27,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
@@ -93,8 +94,14 @@ public class FlinkJobInvoker extends JobInvoker {
       FlinkPipelineOptions flinkOptions,
       @Nullable String confDir,
       List<String> filesToStage) {
+    JobInfo jobInfo =
+        JobInfo.create(
+            invocationId,
+            flinkOptions.getJobName(),
+            retrievalToken,
+            PipelineOptionsTranslation.toProto(flinkOptions));
     FlinkPipelineRunner pipelineRunner =
-        new FlinkPipelineRunner(invocationId, retrievalToken, flinkOptions, confDir, filesToStage);
-    return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner);
+        new FlinkPipelineRunner(flinkOptions, confDir, filesToStage);
+    return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner);
   }
 }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index c340e23..7bd2709 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -24,7 +24,6 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
@@ -40,27 +39,19 @@ import org.slf4j.LoggerFactory;
 public class FlinkPipelineRunner implements PortablePipelineRunner {
   private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
 
-  private final String id;
-  private final String retrievalToken;
   private final FlinkPipelineOptions pipelineOptions;
   private final String confDir;
   private final List<String> filesToStage;
 
   public FlinkPipelineRunner(
-      String id,
-      String retrievalToken,
-      FlinkPipelineOptions pipelineOptions,
-      @Nullable String confDir,
-      List<String> filesToStage) {
-    this.id = id;
-    this.retrievalToken = retrievalToken;
+      FlinkPipelineOptions pipelineOptions, @Nullable String confDir, List<String> filesToStage) {
     this.pipelineOptions = pipelineOptions;
     this.confDir = confDir;
     this.filesToStage = filesToStage;
   }
 
   @Override
-  public PipelineResult run(final Pipeline pipeline) throws Exception {
+  public PipelineResult run(final Pipeline pipeline, JobInfo jobInfo) throws Exception {
     MetricsEnvironment.setMetricsSupported(false);
 
     FlinkPortablePipelineTranslator<?> translator;
@@ -70,12 +61,13 @@ public class FlinkPipelineRunner implements PortablePipelineRunner {
     } else {
       translator = new FlinkStreamingPortablePipelineTranslator();
     }
-    return runPipelineWithTranslator(pipeline, translator);
+    return runPipelineWithTranslator(pipeline, jobInfo, translator);
   }
 
   private <T extends FlinkPortablePipelineTranslator.TranslationContext>
       PipelineResult runPipelineWithTranslator(
-          final Pipeline pipeline, FlinkPortablePipelineTranslator<T> translator) throws Exception {
+          final Pipeline pipeline, JobInfo jobInfo, FlinkPortablePipelineTranslator<T> translator)
+          throws Exception {
     LOG.info("Translating pipeline to Flink program.");
 
     // Don't let the fuser fuse any subcomponents of native transforms.
@@ -88,12 +80,6 @@ public class FlinkPipelineRunner implements PortablePipelineRunner {
                 .anyMatch(proto -> ExecutableStage.URN.equals(proto.getSpec().getUrn()))
             ? trimmedPipeline
             : GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
-    JobInfo jobInfo =
-        JobInfo.create(
-            id,
-            pipelineOptions.getJobName(),
-            retrievalToken,
-            PipelineOptionsTranslation.toProto(pipelineOptions));
 
     FlinkPortablePipelineTranslator.Executor executor =
         translator.translate(
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
index 292e6e8..d32aef4 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
@@ -31,6 +31,7 @@ import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures;
@@ -46,7 +47,7 @@ public class JobInvocation {
 
   private final RunnerApi.Pipeline pipeline;
   private final PortablePipelineRunner pipelineRunner;
-  private final String id;
+  private final JobInfo jobInfo;
   private final ListeningExecutorService executorService;
   private List<Consumer<Enum>> stateObservers;
   private List<Consumer<JobMessage>> messageObservers;
@@ -54,11 +55,11 @@ public class JobInvocation {
   @Nullable private ListenableFuture<PipelineResult> invocationFuture;
 
   public JobInvocation(
-      String id,
+      JobInfo jobInfo,
       ListeningExecutorService executorService,
       Pipeline pipeline,
       PortablePipelineRunner pipelineRunner) {
-    this.id = id;
+    this.jobInfo = jobInfo;
     this.executorService = executorService;
     this.pipeline = pipeline;
     this.pipelineRunner = pipelineRunner;
@@ -69,7 +70,7 @@ public class JobInvocation {
   }
 
   private PipelineResult runPipeline() throws Exception {
-    return pipelineRunner.run(pipeline);
+    return pipelineRunner.run(pipeline, jobInfo);
   }
 
   /** Start the job. */
@@ -119,7 +120,7 @@ public class JobInvocation {
 
   /** @return Unique identifier for the job invocation. */
   public String getId() {
-    return id;
+    return jobInfo.jobId();
   }
 
   /** Cancel the job. */
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
index d6cf083..08adee6 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
@@ -18,9 +18,10 @@
 package org.apache.beam.runners.fnexecution.jobsubmission;
 
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.sdk.PipelineResult;
 
 /** Runs a portable Beam pipeline on some execution engine. */
 public interface PortablePipelineRunner {
-  PipelineResult run(RunnerApi.Pipeline pipeline) throws Exception;
+  PipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception;
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
index c582d17..9effd57 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
@@ -31,6 +31,7 @@ import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagin
 import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
 import org.kohsuke.args4j.CmdLineException;
@@ -102,7 +103,13 @@ public class SamzaJobServerDriver {
                 String.format(
                     "%s_%s", samzaPipelineOptions.getJobName(), UUID.randomUUID().toString());
             SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(samzaPipelineOptions);
-            return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner);
+            JobInfo jobInfo =
+                JobInfo.create(
+                    invocationId,
+                    samzaPipelineOptions.getJobName(),
+                    retrievalToken,
+                    PipelineOptionsTranslation.toProto(samzaPipelineOptions));
+            return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner);
           }
         };
     return InMemoryJobService.create(
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
index 1a94e1d..4759d88 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
@@ -21,6 +21,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
 import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
 import org.apache.beam.sdk.PipelineResult;
 import org.slf4j.Logger;
@@ -34,7 +35,7 @@ public class SamzaPipelineRunner implements PortablePipelineRunner {
   private final SamzaPipelineOptions options;
 
   @Override
-  public PipelineResult run(final Pipeline pipeline) {
+  public PipelineResult run(final Pipeline pipeline, JobInfo jobInfo) {
     // Fused pipeline proto.
     final RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline();
     LOG.info("Portable pipeline to run:");