You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2019/03/20 09:21:18 UTC
[beam] branch master updated: [BEAM-6864] Pass JobInfo to
PortablePipelineRunner.run
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 7325762 [BEAM-6864] Pass JobInfo to PortablePipelineRunner.run
new 3beb96d Merge pull request #8087 [BEAM-6864] Pass JobInfo to PortablePipelineRunner.run
7325762 is described below
commit 7325762b5077e05329cc6038701691e6304d64fd
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Mon Mar 18 16:52:00 2019 -0700
[BEAM-6864] Pass JobInfo to PortablePipelineRunner.run
---
.../apache/beam/runners/flink/FlinkJobInvoker.java | 11 ++++++++--
.../beam/runners/flink/FlinkPipelineRunner.java | 24 +++++-----------------
.../fnexecution/jobsubmission/JobInvocation.java | 11 +++++-----
.../jobsubmission/PortablePipelineRunner.java | 3 ++-
.../beam/runners/samza/SamzaJobServerDriver.java | 9 +++++++-
.../beam/runners/samza/SamzaPipelineRunner.java | 3 ++-
6 files changed, 32 insertions(+), 29 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index 38dcdc8..3dc22c2 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -27,6 +27,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
@@ -93,8 +94,14 @@ public class FlinkJobInvoker extends JobInvoker {
FlinkPipelineOptions flinkOptions,
@Nullable String confDir,
List<String> filesToStage) {
+ JobInfo jobInfo =
+ JobInfo.create(
+ invocationId,
+ flinkOptions.getJobName(),
+ retrievalToken,
+ PipelineOptionsTranslation.toProto(flinkOptions));
FlinkPipelineRunner pipelineRunner =
- new FlinkPipelineRunner(invocationId, retrievalToken, flinkOptions, confDir, filesToStage);
- return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner);
+ new FlinkPipelineRunner(flinkOptions, confDir, filesToStage);
+ return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner);
}
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index c340e23..7bd2709 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -24,7 +24,6 @@ import java.util.List;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
-import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.core.construction.graph.PipelineTrimmer;
@@ -40,27 +39,19 @@ import org.slf4j.LoggerFactory;
public class FlinkPipelineRunner implements PortablePipelineRunner {
private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
- private final String id;
- private final String retrievalToken;
private final FlinkPipelineOptions pipelineOptions;
private final String confDir;
private final List<String> filesToStage;
public FlinkPipelineRunner(
- String id,
- String retrievalToken,
- FlinkPipelineOptions pipelineOptions,
- @Nullable String confDir,
- List<String> filesToStage) {
- this.id = id;
- this.retrievalToken = retrievalToken;
+ FlinkPipelineOptions pipelineOptions, @Nullable String confDir, List<String> filesToStage) {
this.pipelineOptions = pipelineOptions;
this.confDir = confDir;
this.filesToStage = filesToStage;
}
@Override
- public PipelineResult run(final Pipeline pipeline) throws Exception {
+ public PipelineResult run(final Pipeline pipeline, JobInfo jobInfo) throws Exception {
MetricsEnvironment.setMetricsSupported(false);
FlinkPortablePipelineTranslator<?> translator;
@@ -70,12 +61,13 @@ public class FlinkPipelineRunner implements PortablePipelineRunner {
} else {
translator = new FlinkStreamingPortablePipelineTranslator();
}
- return runPipelineWithTranslator(pipeline, translator);
+ return runPipelineWithTranslator(pipeline, jobInfo, translator);
}
private <T extends FlinkPortablePipelineTranslator.TranslationContext>
PipelineResult runPipelineWithTranslator(
- final Pipeline pipeline, FlinkPortablePipelineTranslator<T> translator) throws Exception {
+ final Pipeline pipeline, JobInfo jobInfo, FlinkPortablePipelineTranslator<T> translator)
+ throws Exception {
LOG.info("Translating pipeline to Flink program.");
// Don't let the fuser fuse any subcomponents of native transforms.
@@ -88,12 +80,6 @@ public class FlinkPipelineRunner implements PortablePipelineRunner {
.anyMatch(proto -> ExecutableStage.URN.equals(proto.getSpec().getUrn()))
? trimmedPipeline
: GreedyPipelineFuser.fuse(trimmedPipeline).toPipeline();
- JobInfo jobInfo =
- JobInfo.create(
- id,
- pipelineOptions.getJobName(),
- retrievalToken,
- PipelineOptionsTranslation.toProto(pipelineOptions));
FlinkPortablePipelineTranslator.Executor executor =
translator.translate(
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
index 292e6e8..d32aef4 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
@@ -31,6 +31,7 @@ import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures;
@@ -46,7 +47,7 @@ public class JobInvocation {
private final RunnerApi.Pipeline pipeline;
private final PortablePipelineRunner pipelineRunner;
- private final String id;
+ private final JobInfo jobInfo;
private final ListeningExecutorService executorService;
private List<Consumer<Enum>> stateObservers;
private List<Consumer<JobMessage>> messageObservers;
@@ -54,11 +55,11 @@ public class JobInvocation {
@Nullable private ListenableFuture<PipelineResult> invocationFuture;
public JobInvocation(
- String id,
+ JobInfo jobInfo,
ListeningExecutorService executorService,
Pipeline pipeline,
PortablePipelineRunner pipelineRunner) {
- this.id = id;
+ this.jobInfo = jobInfo;
this.executorService = executorService;
this.pipeline = pipeline;
this.pipelineRunner = pipelineRunner;
@@ -69,7 +70,7 @@ public class JobInvocation {
}
private PipelineResult runPipeline() throws Exception {
- return pipelineRunner.run(pipeline);
+ return pipelineRunner.run(pipeline, jobInfo);
}
/** Start the job. */
@@ -119,7 +120,7 @@ public class JobInvocation {
/** @return Unique identifier for the job invocation. */
public String getId() {
- return id;
+ return jobInfo.jobId();
}
/** Cancel the job. */
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
index d6cf083..08adee6 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
@@ -18,9 +18,10 @@
package org.apache.beam.runners.fnexecution.jobsubmission;
import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.PipelineResult;
/** Runs a portable Beam pipeline on some execution engine. */
public interface PortablePipelineRunner {
- PipelineResult run(RunnerApi.Pipeline pipeline) throws Exception;
+ PipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) throws Exception;
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
index c582d17..9effd57 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
@@ -31,6 +31,7 @@ import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactStagin
import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.kohsuke.args4j.CmdLineException;
@@ -102,7 +103,13 @@ public class SamzaJobServerDriver {
String.format(
"%s_%s", samzaPipelineOptions.getJobName(), UUID.randomUUID().toString());
SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(samzaPipelineOptions);
- return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner);
+ JobInfo jobInfo =
+ JobInfo.create(
+ invocationId,
+ samzaPipelineOptions.getJobName(),
+ retrievalToken,
+ PipelineOptionsTranslation.toProto(samzaPipelineOptions));
+ return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner);
}
};
return InMemoryJobService.create(
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
index 1a94e1d..4759d88 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
@@ -21,6 +21,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
import org.apache.beam.sdk.PipelineResult;
import org.slf4j.Logger;
@@ -34,7 +35,7 @@ public class SamzaPipelineRunner implements PortablePipelineRunner {
private final SamzaPipelineOptions options;
@Override
- public PipelineResult run(final Pipeline pipeline) {
+ public PipelineResult run(final Pipeline pipeline, JobInfo jobInfo) {
// Fused pipeline proto.
final RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline();
LOG.info("Portable pipeline to run:");