You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by go...@apache.org on 2019/03/01 21:10:49 UTC
[beam] branch master updated: [BEAM-6725] share some Flink job
invocation code with Samza runner
This is an automated email from the ASF dual-hosted git repository.
goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 194d85f [BEAM-6725] share some Flink job invocation code with Samza runner
new ee8f260 Merge pull request #7941 from ibzib/portable-job-invocation
194d85f is described below
commit 194d85f950f5dbe2c114e7cd20e6dbb54adfd5a5
Author: Kyle Weaver <kc...@google.com>
AuthorDate: Fri Feb 22 16:25:19 2019 -0800
[BEAM-6725] share some Flink job invocation code with Samza runner
---
.../apache/beam/runners/flink/FlinkJobInvoker.java | 45 +++---
.../beam/runners/flink/FlinkPipelineOptions.java | 2 +-
...JobInvocation.java => FlinkPipelineRunner.java} | 163 ++-------------------
.../beam/runners/flink/FlinkSavepointTest.java | 5 +-
.../beam/runners/flink/PortableExecutionTest.java | 5 +-
.../runners/flink/PortableStateExecutionTest.java | 5 +-
.../runners/flink/PortableTimersExecutionTest.java | 5 +-
.../fnexecution/jobsubmission/JobInvocation.java | 148 ++++++++++++++++++-
.../fnexecution/jobsubmission/JobInvoker.java | 33 ++++-
...JobInvoker.java => PortablePipelineRunner.java} | 12 +-
.../beam/runners/samza/SamzaJobInvocation.java | 128 ----------------
.../beam/runners/samza/SamzaJobServerDriver.java | 14 +-
.../beam/runners/samza/SamzaPipelineOptions.java | 2 +-
.../beam/runners/samza/SamzaPipelineRunner.java | 57 +++++++
14 files changed, 290 insertions(+), 334 deletions(-)
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index 9dde9e3..38dcdc8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -20,9 +20,8 @@ package org.apache.beam.runners.flink;
import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
import java.io.IOException;
+import java.util.List;
import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
@@ -31,39 +30,30 @@ import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Job Invoker for the {@link FlinkRunner}. */
-public class FlinkJobInvoker implements JobInvoker {
+public class FlinkJobInvoker extends JobInvoker {
private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class);
public static FlinkJobInvoker create(FlinkJobServerDriver.FlinkServerConfiguration serverConfig) {
- ThreadFactory threadFactory =
- new ThreadFactoryBuilder()
- .setNameFormat("flink-runner-job-invoker")
- .setDaemon(true)
- .build();
- ListeningExecutorService executorService =
- MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
- return new FlinkJobInvoker(executorService, serverConfig);
+ return new FlinkJobInvoker(serverConfig);
}
- private final ListeningExecutorService executorService;
private final FlinkJobServerDriver.FlinkServerConfiguration serverConfig;
- private FlinkJobInvoker(
- ListeningExecutorService executorService,
- FlinkJobServerDriver.FlinkServerConfiguration serverConfig) {
- this.executorService = executorService;
+ private FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverConfig) {
+ super("flink-runner-job-invoker");
this.serverConfig = serverConfig;
}
@Override
- public JobInvocation invoke(
- RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken)
+ protected JobInvocation invokeWithExecutor(
+ RunnerApi.Pipeline pipeline,
+ Struct options,
+ @Nullable String retrievalToken,
+ ListeningExecutorService executorService)
throws IOException {
// TODO: How to make Java/Python agree on names of keys and their values?
LOG.trace("Parsing pipeline options");
@@ -85,7 +75,7 @@ public class FlinkJobInvoker implements JobInvoker {
flinkOptions.setRunner(null);
- return FlinkJobInvocation.create(
+ return createJobInvocation(
invocationId,
retrievalToken,
executorService,
@@ -94,4 +84,17 @@ public class FlinkJobInvoker implements JobInvoker {
serverConfig.getFlinkConfDir(),
detectClassPathResourcesToStage(FlinkJobInvoker.class.getClassLoader()));
}
+
+ static JobInvocation createJobInvocation(
+ String invocationId,
+ String retrievalToken,
+ ListeningExecutorService executorService,
+ RunnerApi.Pipeline pipeline,
+ FlinkPipelineOptions flinkOptions,
+ @Nullable String confDir,
+ List<String> filesToStage) {
+ FlinkPipelineRunner pipelineRunner =
+ new FlinkPipelineRunner(invocationId, retrievalToken, flinkOptions, confDir, filesToStage);
+ return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner);
+ }
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index cc94793..e1103bc 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
-/** Options which can be used to configure a Flink PipelineRunner. */
+/** Options which can be used to configure a Flink PortablePipelineRunner. */
public interface FlinkPipelineOptions
extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
similarity index 50%
rename from runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
rename to runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index 421c38e..9ccd973 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -17,91 +17,53 @@
*/
package org.apache.beam.runners.flink;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getRootCause;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getStackTraceAsString;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.function.Consumer;
import javax.annotation.Nullable;
-import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
-import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
-import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListenableFuture;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.flink.api.common.JobExecutionResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** Invocation of a Flink Job via {@link FlinkRunner}. */
-public class FlinkJobInvocation implements JobInvocation {
- private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class);
-
- public static FlinkJobInvocation create(
- String id,
- String retrievalToken,
- ListeningExecutorService executorService,
- Pipeline pipeline,
- FlinkPipelineOptions pipelineOptions,
- @Nullable String confDir,
- List<String> filesToStage) {
- return new FlinkJobInvocation(
- id, retrievalToken, executorService, pipeline, pipelineOptions, confDir, filesToStage);
- }
+/** Runs a Pipeline on Flink via {@link FlinkRunner}. */
+public class FlinkPipelineRunner implements PortablePipelineRunner {
+ private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
private final String id;
private final String retrievalToken;
- private final ListeningExecutorService executorService;
- private final RunnerApi.Pipeline pipeline;
private final FlinkPipelineOptions pipelineOptions;
private final String confDir;
private final List<String> filesToStage;
- private JobState.Enum jobState;
- private List<Consumer<JobState.Enum>> stateObservers;
- private List<Consumer<JobMessage>> messageObservers;
-
- @Nullable private ListenableFuture<PipelineResult> invocationFuture;
- private FlinkJobInvocation(
+ public FlinkPipelineRunner(
String id,
String retrievalToken,
- ListeningExecutorService executorService,
- Pipeline pipeline,
FlinkPipelineOptions pipelineOptions,
@Nullable String confDir,
List<String> filesToStage) {
this.id = id;
this.retrievalToken = retrievalToken;
- this.executorService = executorService;
- this.pipeline = pipeline;
this.pipelineOptions = pipelineOptions;
this.confDir = confDir;
this.filesToStage = filesToStage;
- this.invocationFuture = null;
- this.jobState = JobState.Enum.STOPPED;
- this.stateObservers = new ArrayList<>();
- this.messageObservers = new ArrayList<>();
}
- private PipelineResult runPipeline() throws Exception {
+ @Override
+ public PipelineResult run(final Pipeline pipeline) throws Exception {
MetricsEnvironment.setMetricsSupported(false);
FlinkPortablePipelineTranslator<?> translator;
@@ -111,12 +73,12 @@ public class FlinkJobInvocation implements JobInvocation {
} else {
translator = new FlinkStreamingPortablePipelineTranslator();
}
- return runPipelineWithTranslator(translator);
+ return runPipelineWithTranslator(pipeline, translator);
}
private <T extends FlinkPortablePipelineTranslator.TranslationContext>
- PipelineResult runPipelineWithTranslator(FlinkPortablePipelineTranslator<T> translator)
- throws Exception {
+ PipelineResult runPipelineWithTranslator(
+ final Pipeline pipeline, FlinkPortablePipelineTranslator<T> translator) throws Exception {
LOG.info("Translating pipeline to Flink program.");
// Don't let the fuser fuse any subcomponents of native transforms.
@@ -178,111 +140,6 @@ public class FlinkJobInvocation implements JobInvocation {
}
}
- @Override
- public synchronized void start() {
- LOG.info("Starting job invocation {}", getId());
- if (getState() != JobState.Enum.STOPPED) {
- throw new IllegalStateException(String.format("Job %s already running.", getId()));
- }
- setState(JobState.Enum.STARTING);
- invocationFuture = executorService.submit(this::runPipeline);
- // TODO: Defer transitioning until the pipeline is up and running.
- setState(JobState.Enum.RUNNING);
- Futures.addCallback(
- invocationFuture,
- new FutureCallback<PipelineResult>() {
- @Override
- public void onSuccess(@Nullable PipelineResult pipelineResult) {
- if (pipelineResult != null) {
- checkArgument(
- pipelineResult.getState() == PipelineResult.State.DONE,
- "Success on non-Done state: " + pipelineResult.getState());
- setState(JobState.Enum.DONE);
- } else {
- setState(JobState.Enum.UNSPECIFIED);
- }
- }
-
- @Override
- public void onFailure(Throwable throwable) {
- String message = String.format("Error during job invocation %s.", getId());
- LOG.error(message, throwable);
- sendMessage(
- JobMessage.newBuilder()
- .setMessageText(getStackTraceAsString(throwable))
- .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_DEBUG)
- .build());
- sendMessage(
- JobMessage.newBuilder()
- .setMessageText(getRootCause(throwable).toString())
- .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR)
- .build());
- setState(JobState.Enum.FAILED);
- }
- },
- executorService);
- }
-
- @Override
- public String getId() {
- return id;
- }
-
- @Override
- public synchronized void cancel() {
- LOG.info("Canceling job invocation {}", getId());
- if (this.invocationFuture != null) {
- this.invocationFuture.cancel(true /* mayInterruptIfRunning */);
- Futures.addCallback(
- invocationFuture,
- new FutureCallback<PipelineResult>() {
- @Override
- public void onSuccess(@Nullable PipelineResult pipelineResult) {
- if (pipelineResult != null) {
- try {
- pipelineResult.cancel();
- } catch (IOException exn) {
- throw new RuntimeException(exn);
- }
- }
- }
-
- @Override
- public void onFailure(Throwable throwable) {}
- },
- executorService);
- }
- }
-
- @Override
- public JobState.Enum getState() {
- return this.jobState;
- }
-
- @Override
- public synchronized void addStateListener(Consumer<JobState.Enum> stateStreamObserver) {
- stateStreamObserver.accept(getState());
- stateObservers.add(stateStreamObserver);
- }
-
- @Override
- public synchronized void addMessageListener(Consumer<JobMessage> messageStreamObserver) {
- messageObservers.add(messageStreamObserver);
- }
-
- private synchronized void setState(JobState.Enum state) {
- this.jobState = state;
- for (Consumer<JobState.Enum> observer : stateObservers) {
- observer.accept(state);
- }
- }
-
- private synchronized void sendMessage(JobMessage message) {
- for (Consumer<JobMessage> observer : messageObservers) {
- observer.accept(message);
- }
- }
-
/** Indicates whether the given pipeline has any unbounded PCollections. */
private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) {
checkNotNull(pipeline);
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
index 756b706..41e9299 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Executors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -194,8 +195,8 @@ public class FlinkSavepointTest implements Serializable {
ListeningExecutorService executorService =
MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
try {
- FlinkJobInvocation jobInvocation =
- FlinkJobInvocation.create(
+ JobInvocation jobInvocation =
+ FlinkJobInvoker.createJobInvocation(
"id",
"none",
executorService,
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index c817cb9..6214ff4 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -140,8 +141,8 @@ public class PortableExecutionTest implements Serializable {
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
// execute the pipeline
- FlinkJobInvocation jobInvocation =
- FlinkJobInvocation.create(
+ JobInvocation jobInvocation =
+ FlinkJobInvoker.createJobInvocation(
"fakeId",
"fakeRetrievalToken",
flinkJobExecutor,
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
index e5dd6b9..c96ebb8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -194,8 +195,8 @@ public class PortableStateExecutionTest implements Serializable {
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
- FlinkJobInvocation jobInvocation =
- FlinkJobInvocation.create(
+ JobInvocation jobInvocation =
+ FlinkJobInvoker.createJobInvocation(
"id",
"none",
flinkJobExecutor,
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index 544a4e5..bffc903 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -181,8 +182,8 @@ public class PortableTimersExecutionTest implements Serializable {
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
- FlinkJobInvocation jobInvocation =
- FlinkJobInvocation.create(
+ JobInvocation jobInvocation =
+ FlinkJobInvoker.createJobInvocation(
"id",
"none",
flinkJobExecutor,
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
index 831edc9..292e6e8 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
@@ -17,31 +17,165 @@
*/
package org.apache.beam.runners.fnexecution.jobsubmission;
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getRootCause;
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getStackTraceAsString;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.function.Consumer;
+import javax.annotation.Nullable;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListenableFuture;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/** Internal representation of a Job which has been invoked (prepared and run) by a client. */
-public interface JobInvocation {
+public class JobInvocation {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobInvocation.class);
+
+ private final RunnerApi.Pipeline pipeline;
+ private final PortablePipelineRunner pipelineRunner;
+ private final String id;
+ private final ListeningExecutorService executorService;
+ private List<Consumer<Enum>> stateObservers;
+ private List<Consumer<JobMessage>> messageObservers;
+ private JobState.Enum jobState;
+ @Nullable private ListenableFuture<PipelineResult> invocationFuture;
+
+ public JobInvocation(
+ String id,
+ ListeningExecutorService executorService,
+ Pipeline pipeline,
+ PortablePipelineRunner pipelineRunner) {
+ this.id = id;
+ this.executorService = executorService;
+ this.pipeline = pipeline;
+ this.pipelineRunner = pipelineRunner;
+ this.stateObservers = new ArrayList<>();
+ this.messageObservers = new ArrayList<>();
+ this.invocationFuture = null;
+ this.jobState = JobState.Enum.STOPPED;
+ }
+
+ private PipelineResult runPipeline() throws Exception {
+ return pipelineRunner.run(pipeline);
+ }
/** Start the job. */
- void start();
+ public synchronized void start() {
+ LOG.info("Starting job invocation {}", getId());
+ if (getState() != JobState.Enum.STOPPED) {
+ throw new IllegalStateException(String.format("Job %s already running.", getId()));
+ }
+ setState(JobState.Enum.STARTING);
+ invocationFuture = executorService.submit(this::runPipeline);
+ // TODO: Defer transitioning until the pipeline is up and running.
+ setState(JobState.Enum.RUNNING);
+ Futures.addCallback(
+ invocationFuture,
+ new FutureCallback<PipelineResult>() {
+ @Override
+ public void onSuccess(@Nullable PipelineResult pipelineResult) {
+ if (pipelineResult != null) {
+ checkArgument(
+ pipelineResult.getState() == PipelineResult.State.DONE,
+ "Success on non-Done state: " + pipelineResult.getState());
+ setState(JobState.Enum.DONE);
+ } else {
+ setState(JobState.Enum.UNSPECIFIED);
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ String message = String.format("Error during job invocation %s.", getId());
+ LOG.error(message, throwable);
+ sendMessage(
+ JobMessage.newBuilder()
+ .setMessageText(getStackTraceAsString(throwable))
+ .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_DEBUG)
+ .build());
+ sendMessage(
+ JobMessage.newBuilder()
+ .setMessageText(getRootCause(throwable).toString())
+ .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR)
+ .build());
+ setState(JobState.Enum.FAILED);
+ }
+ },
+ executorService);
+ }
/** @return Unique identifier for the job invocation. */
- String getId();
+ public String getId() {
+ return id;
+ }
/** Cancel the job. */
- void cancel();
+ public synchronized void cancel() {
+ LOG.info("Canceling job invocation {}", getId());
+ if (this.invocationFuture != null) {
+ this.invocationFuture.cancel(true /* mayInterruptIfRunning */);
+ Futures.addCallback(
+ invocationFuture,
+ new FutureCallback<PipelineResult>() {
+ @Override
+ public void onSuccess(@Nullable PipelineResult pipelineResult) {
+ if (pipelineResult != null) {
+ try {
+ pipelineResult.cancel();
+ } catch (IOException exn) {
+ throw new RuntimeException(exn);
+ }
+ }
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {}
+ },
+ executorService);
+ }
+ }
/** Retrieve the job's current state. */
- JobState.Enum getState();
+ public JobState.Enum getState() {
+ return this.jobState;
+ }
/** Listen for job state changes with a {@link Consumer}. */
- void addStateListener(Consumer<Enum> stateStreamObserver);
+ public synchronized void addStateListener(Consumer<JobState.Enum> stateStreamObserver) {
+ stateStreamObserver.accept(getState());
+ stateObservers.add(stateStreamObserver);
+ }
/** Listen for job messages with a {@link Consumer}. */
- void addMessageListener(Consumer<JobMessage> messageStreamObserver);
+ public synchronized void addMessageListener(Consumer<JobMessage> messageStreamObserver) {
+ messageObservers.add(messageStreamObserver);
+ }
+
+ private synchronized void setState(JobState.Enum state) {
+ this.jobState = state;
+ for (Consumer<JobState.Enum> observer : stateObservers) {
+ observer.accept(state);
+ }
+ }
+
+ private synchronized void sendMessage(JobMessage message) {
+ for (Consumer<JobMessage> observer : messageObservers) {
+ observer.accept(message);
+ }
+ }
static Boolean isTerminated(Enum state) {
switch (state) {
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
index cfd18a1..1ed74ad 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
@@ -18,13 +18,40 @@
package org.apache.beam.runners.fnexecution.jobsubmission;
import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/** Factory to create {@link JobInvocation} instances. */
+public abstract class JobInvoker {
+
+ private final ListeningExecutorService executorService;
-/** Factory to create a {@link JobInvocation} instances. */
-public interface JobInvoker {
/** Start running a job, abstracting its state as a {@link JobInvocation} instance. */
- JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken)
+ protected abstract JobInvocation invokeWithExecutor(
+ RunnerApi.Pipeline pipeline,
+ Struct options,
+ @Nullable String retrievalToken,
+ ListeningExecutorService executorService)
throws IOException;
+
+ JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken)
+ throws IOException {
+ return invokeWithExecutor(pipeline, options, retrievalToken, this.executorService);
+ }
+
+ private ListeningExecutorService createExecutorService(String name) {
+ ThreadFactory threadFactory =
+ new ThreadFactoryBuilder().setNameFormat(name).setDaemon(true).build();
+ return MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
+ }
+
+ protected JobInvoker(String name) {
+ this.executorService = createExecutorService(name);
+ }
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
similarity index 67%
copy from runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
copy to runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
index cfd18a1..d6cf083 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
@@ -17,14 +17,10 @@
*/
package org.apache.beam.runners.fnexecution.jobsubmission;
-import java.io.IOException;
-import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
+import org.apache.beam.sdk.PipelineResult;
-/** Factory to create a {@link JobInvocation} instances. */
-public interface JobInvoker {
- /** Start running a job, abstracting its state as a {@link JobInvocation} instance. */
- JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken)
- throws IOException;
+/** Runs a portable Beam pipeline on some execution engine. */
+public interface PortablePipelineRunner {
+ PipelineResult run(RunnerApi.Pipeline pipeline) throws Exception;
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
deleted file mode 100644
index d70e10a..0000000
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.samza;
-
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED;
-
-import java.util.function.Consumer;
-import org.apache.beam.model.jobmanagement.v1.JobApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
-import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
-import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
-import org.apache.beam.sdk.fn.IdGenerator;
-import org.apache.beam.sdk.fn.IdGenerators;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Invocation of a Samza job via {@link SamzaRunner}. */
-public class SamzaJobInvocation implements JobInvocation {
- private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvocation.class);
- private static final IdGenerator idGenerator = IdGenerators.incrementingLongs();
-
- private final SamzaPipelineOptions options;
- private final RunnerApi.Pipeline originalPipeline;
- private volatile SamzaPipelineResult pipelineResult;
- private final String id;
-
- public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions options, String id) {
- this.originalPipeline = pipeline;
- this.options = options;
- this.id = id;
- }
-
- private SamzaPipelineResult invokeSamzaJob() {
- // Fused pipeline proto.
- final RunnerApi.Pipeline fusedPipeline =
- GreedyPipelineFuser.fuse(originalPipeline).toPipeline();
- LOG.info("Portable pipeline to run:");
- LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline));
- // the pipeline option coming from sdk will set the sdk specific runner which will break
- // serialization
- // so we need to reset the runner here to a valid Java runner
- options.setRunner(SamzaRunner.class);
- try {
- final SamzaRunner runner = new SamzaRunner(options);
- return runner.runPortablePipeline(fusedPipeline);
- } catch (Exception e) {
- throw new RuntimeException("Failed to invoke samza job", e);
- }
- }
-
- @Override
- public void start() {
- LOG.info("Starting job invocation {}", getId());
- pipelineResult = invokeSamzaJob();
- }
-
- @Override
- public String getId() {
- return id;
- }
-
- @Override
- public void cancel() {
- try {
- if (pipelineResult != null) {
- pipelineResult.cancel();
- }
- } catch (Exception e) {
- throw new RuntimeException("Failed to cancel job.", e);
- }
- }
-
- @Override
- public JobApi.JobState.Enum getState() {
- if (pipelineResult == null) {
- return STARTING;
- }
- switch (pipelineResult.getState()) {
- case RUNNING:
- return RUNNING;
- case FAILED:
- return FAILED;
- case DONE:
- return DONE;
- case STOPPED:
- return STOPPED;
- case UPDATED:
- return UPDATED;
- case CANCELLED:
- return CANCELLED;
- default:
- return UNRECOGNIZED;
- }
- }
-
- @Override
- public void addStateListener(Consumer<JobApi.JobState.Enum> stateStreamObserver) {
- LOG.info("state listener not yet implemented. Directly use getState() instead");
- }
-
- @Override
- public synchronized void addMessageListener(Consumer<JobApi.JobMessage> messageStreamObserver) {
- LOG.info("message listener not yet implemented.");
- }
-}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
index 834f066..c582d17 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -39,6 +40,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Driver program that starts a job server. */
+// TODO extend JobServerDriver
public class SamzaJobServerDriver {
private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class);
@@ -78,10 +80,13 @@ public class SamzaJobServerDriver {
private static InMemoryJobService createJobService(int controlPort) throws IOException {
JobInvoker jobInvoker =
- new JobInvoker() {
+ new JobInvoker("samza-job-invoker") {
@Override
- public JobInvocation invoke(
- RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken)
+ protected JobInvocation invokeWithExecutor(
+ RunnerApi.Pipeline pipeline,
+ Struct options,
+ @Nullable String retrievalToken,
+ ListeningExecutorService executorService)
throws IOException {
SamzaPipelineOptions samzaPipelineOptions =
PipelineOptionsTranslation.fromProto(options).as(SamzaPipelineOptions.class);
@@ -96,7 +101,8 @@ public class SamzaJobServerDriver {
String invocationId =
String.format(
"%s_%s", samzaPipelineOptions.getJobName(), UUID.randomUUID().toString());
- return new SamzaJobInvocation(pipeline, samzaPipelineOptions, invocationId);
+ SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(samzaPipelineOptions);
+ return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner);
}
};
return InMemoryJobService.create(
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
index d49dece..65204c4 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.samza.config.ConfigFactory;
import org.apache.samza.config.factories.PropertiesConfigFactory;
-/** Options which can be used to configure a Samza PipelineRunner. */
+/** Options which can be used to configure a Samza PortablePipelineRunner. */
public interface SamzaPipelineOptions extends PipelineOptions {
@Description(
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
new file mode 100644
index 0000000..1a94e1d
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
+import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
+import org.apache.beam.sdk.PipelineResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Runs a Samza job via {@link SamzaRunner}. */
+public class SamzaPipelineRunner implements PortablePipelineRunner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SamzaPipelineRunner.class);
+
+ private final SamzaPipelineOptions options;
+
+ @Override
+ public PipelineResult run(final Pipeline pipeline) {
+ // Fused pipeline proto.
+ final RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline();
+ LOG.info("Portable pipeline to run:");
+ LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline));
+ // the pipeline option coming from sdk will set the sdk specific runner which will break
+ // serialization
+ // so we need to reset the runner here to a valid Java runner
+ options.setRunner(SamzaRunner.class);
+ try {
+ final SamzaRunner runner = new SamzaRunner(options);
+ return runner.runPortablePipeline(fusedPipeline);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to invoke samza job", e);
+ }
+ }
+
+ public SamzaPipelineRunner(SamzaPipelineOptions options) {
+ this.options = options;
+ }
+}