You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2018/08/30 18:02:50 UTC
[beam] branch master updated: [BEAM-5187] Add a
ProcessJobBundleFactory for process-based execution (#6287)
This is an automated email from the ASF dual-hosted git repository.
thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5720c1d [BEAM-5187] Add a ProcessJobBundleFactory for process-based execution (#6287)
5720c1d is described below
commit 5720c1d22771a65ad5d7be6a06ad8aa0754fa64b
Author: Maximilian Michels <ma...@posteo.de>
AuthorDate: Thu Aug 30 20:02:45 2018 +0200
[BEAM-5187] Add a ProcessJobBundleFactory for process-based execution (#6287)
---
.../control/DockerJobBundleFactory.java | 266 ++-------------------
...undleFactory.java => JobBundleFactoryBase.java} | 137 +++--------
.../control/ProcessJobBundleFactory.java | 84 +++++++
.../environment/ProcessEnvironment.java | 77 ++++++
.../environment/ProcessEnvironmentFactory.java | 157 ++++++++++++
.../fnexecution/environment/ProcessManager.java | 225 +++++++++++++++++
.../control/ProcessJobBundleFactoryTest.java | 195 +++++++++++++++
.../environment/ProcessEnvironmentFactoryTest.java | 127 ++++++++++
.../environment/ProcessEnvironmentTest.java | 44 ++++
.../environment/ProcessManagerTest.java | 103 ++++++++
10 files changed, 1056 insertions(+), 359 deletions(-)
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
index 1e7f48b..3178a2e 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
@@ -18,46 +18,19 @@
package org.apache.beam.runners.fnexecution.control;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
import com.google.common.net.HostAndPort;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
-import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
-import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
-import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
-import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.BundleProcessor;
-import org.apache.beam.runners.fnexecution.data.GrpcDataService;
import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
-import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
-import org.apache.beam.runners.fnexecution.state.GrpcStateService;
-import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
-import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.IdGenerator;
-import org.apache.beam.sdk.fn.IdGenerators;
-import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
-import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,7 +40,7 @@ import org.slf4j.LoggerFactory;
* thread-safe. Instead, a new stage factory should be created for each client.
*/
@ThreadSafe
-public class DockerJobBundleFactory implements JobBundleFactory {
+public class DockerJobBundleFactory extends JobBundleFactoryBase {
private static final Logger LOG = LoggerFactory.getLogger(DockerJobBundleFactory.class);
// Port offset for MacOS since we don't have host networking and need to use published ports
@@ -77,7 +50,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
/** Factory that creates {@link JobBundleFactory} for the given {@link JobInfo}. */
public interface JobBundleFactoryFactory {
- DockerJobBundleFactory create(JobInfo jobInfo) throws Exception;
+ JobBundleFactory create(JobInfo jobInfo) throws Exception;
}
// TODO (BEAM-4819): a hacky way to override the factory for testing.
// Should be replaced with mechanism that let's users configure their own factory
@@ -85,7 +58,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
new AtomicReference(
new JobBundleFactoryFactory() {
@Override
- public DockerJobBundleFactory create(JobInfo jobInfo) throws Exception {
+ public JobBundleFactory create(JobInfo jobInfo) throws Exception {
return new DockerJobBundleFactory(jobInfo);
}
});
@@ -94,51 +67,12 @@ public class DockerJobBundleFactory implements JobBundleFactory {
// or attempt to document the supported Docker version(s)?
private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
- private final IdGenerator stageIdGenerator;
- private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
- private final GrpcFnServer<GrpcLoggingService> loggingServer;
- private final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
- private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
-
- private final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
-
- public static DockerJobBundleFactory create(JobInfo jobInfo) throws Exception {
+ public static JobBundleFactory create(JobInfo jobInfo) throws Exception {
return FACTORY.get().create(jobInfo);
}
protected DockerJobBundleFactory(JobInfo jobInfo) throws Exception {
- ServerFactory serverFactory = getServerFactory();
- IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
- ControlClientPool clientPool = MapControlClientPool.create();
-
- GrpcFnServer<FnApiControlClientPoolService> controlServer =
- GrpcFnServer.allocatePortAndCreateFor(
- FnApiControlClientPoolService.offeringClientsToPool(
- clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
- serverFactory);
- GrpcFnServer<GrpcLoggingService> loggingServer =
- GrpcFnServer.allocatePortAndCreateFor(
- GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
- GrpcFnServer<ArtifactRetrievalService> retrievalServer =
- GrpcFnServer.allocatePortAndCreateFor(
- BeamFileSystemArtifactRetrievalService.create(), serverFactory);
- GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
- GrpcFnServer.allocatePortAndCreateFor(
- StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory);
- EnvironmentFactory environmentFactory =
- getEnvironmentFactory(
- controlServer,
- loggingServer,
- retrievalServer,
- provisioningServer,
- clientPool.getSource(),
- IdGenerators.incrementingLongs());
- this.stageIdGenerator = stageIdGenerator;
- this.controlServer = controlServer;
- this.loggingServer = loggingServer;
- this.retrievalServer = retrievalServer;
- this.provisioningServer = provisioningServer;
- this.environmentCache = createEnvironmentCache(environmentFactory, serverFactory);
+ super(jobInfo);
}
@VisibleForTesting
@@ -150,54 +84,14 @@ public class DockerJobBundleFactory implements JobBundleFactory {
GrpcFnServer<GrpcLoggingService> loggingServer,
GrpcFnServer<ArtifactRetrievalService> retrievalServer,
GrpcFnServer<StaticGrpcProvisionService> provisioningServer) {
- this.stageIdGenerator = stageIdGenerator;
- this.controlServer = controlServer;
- this.loggingServer = loggingServer;
- this.retrievalServer = retrievalServer;
- this.provisioningServer = provisioningServer;
- this.environmentCache = createEnvironmentCache(environmentFactory, serverFactory);
- }
-
- private LoadingCache<Environment, WrappedSdkHarnessClient> createEnvironmentCache(
- EnvironmentFactory environmentFactory, ServerFactory serverFactory) {
- return CacheBuilder.newBuilder()
- .removalListener(
- ((RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
- LOG.debug("Cleaning up for environment {}", notification.getKey().getUrl());
- try {
- notification.getValue().close();
- } catch (Exception e) {
- LOG.warn(
- String.format("Error cleaning up environment %s", notification.getKey()), e);
- }
- }))
- .build(
- new CacheLoader<Environment, WrappedSdkHarnessClient>() {
- @Override
- public WrappedSdkHarnessClient load(Environment environment) throws Exception {
- RemoteEnvironment remoteEnvironment =
- environmentFactory.createEnvironment(environment);
- return WrappedSdkHarnessClient.wrapping(remoteEnvironment, serverFactory);
- }
- });
- }
-
- @Override
- public StageBundleFactory forStage(ExecutableStage executableStage) {
- WrappedSdkHarnessClient wrappedClient =
- environmentCache.getUnchecked(executableStage.getEnvironment());
- ExecutableProcessBundleDescriptor processBundleDescriptor;
- try {
- processBundleDescriptor =
- ProcessBundleDescriptors.fromExecutableStage(
- stageIdGenerator.getId(),
- executableStage,
- wrappedClient.getDataServer().getApiServiceDescriptor(),
- wrappedClient.getStateServer().getApiServiceDescriptor());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return SimpleStageBundleFactory.create(wrappedClient, processBundleDescriptor);
+ super(
+ environmentFactory,
+ serverFactory,
+ stageIdGenerator,
+ controlServer,
+ loggingServer,
+ retrievalServer,
+ provisioningServer);
}
@Override
@@ -213,6 +107,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
provisioningServer.close();
}
+ @Override
protected ServerFactory getServerFactory() {
switch (getPlatform()) {
case LINUX:
@@ -253,138 +148,6 @@ public class DockerJobBundleFactory implements JobBundleFactory {
return Platform.OTHER;
}
- private static class SimpleStageBundleFactory implements StageBundleFactory {
-
- private final BundleProcessor processor;
- private final ExecutableProcessBundleDescriptor processBundleDescriptor;
-
- // Store the wrapped client in order to keep a live reference into the cache.
- private WrappedSdkHarnessClient wrappedClient;
-
- static SimpleStageBundleFactory create(
- WrappedSdkHarnessClient wrappedClient,
- ExecutableProcessBundleDescriptor processBundleDescriptor) {
- @SuppressWarnings("unchecked")
- BundleProcessor processor =
- wrappedClient
- .getClient()
- .getProcessor(
- processBundleDescriptor.getProcessBundleDescriptor(),
- processBundleDescriptor.getRemoteInputDestinations(),
- wrappedClient.getStateServer().getService());
- return new SimpleStageBundleFactory(processBundleDescriptor, processor, wrappedClient);
- }
-
- SimpleStageBundleFactory(
- ExecutableProcessBundleDescriptor processBundleDescriptor,
- BundleProcessor processor,
- WrappedSdkHarnessClient wrappedClient) {
- this.processBundleDescriptor = processBundleDescriptor;
- this.processor = processor;
- this.wrappedClient = wrappedClient;
- }
-
- @Override
- public RemoteBundle getBundle(
- OutputReceiverFactory outputReceiverFactory,
- StateRequestHandler stateRequestHandler,
- BundleProgressHandler progressHandler)
- throws Exception {
- // TODO: Consider having BundleProcessor#newBundle take in an OutputReceiverFactory rather
- // than constructing the receiver map here. Every bundle factory will need this.
- ImmutableMap.Builder<Target, RemoteOutputReceiver<?>> outputReceivers =
- ImmutableMap.builder();
- for (Map.Entry<Target, Coder<WindowedValue<?>>> targetCoder :
- processBundleDescriptor.getOutputTargetCoders().entrySet()) {
- Target target = targetCoder.getKey();
- Coder<WindowedValue<?>> coder = targetCoder.getValue();
- String bundleOutputPCollection =
- Iterables.getOnlyElement(
- processBundleDescriptor
- .getProcessBundleDescriptor()
- .getTransformsOrThrow(target.getPrimitiveTransformReference())
- .getInputsMap()
- .values());
- FnDataReceiver<WindowedValue<?>> outputReceiver =
- outputReceiverFactory.create(bundleOutputPCollection);
- outputReceivers.put(target, RemoteOutputReceiver.of(coder, outputReceiver));
- }
- return processor.newBundle(outputReceivers.build(), stateRequestHandler, progressHandler);
- }
-
- @Override
- public void close() throws Exception {
- // Clear reference to encourage cache eviction. Values are weakly referenced.
- wrappedClient = null;
- }
- }
-
- /**
- * Holder for an {@link SdkHarnessClient} along with its associated state and data servers. As of
- * now, there is a 1:1 relationship between data services and harness clients. The servers are
- * packaged here to tie server lifetimes to harness client lifetimes.
- */
- private static class WrappedSdkHarnessClient implements AutoCloseable {
- private final RemoteEnvironment environment;
- private final ExecutorService executor;
- // TODO: How should data server lifetime be scoped? It is necessary here for now because
- // SdkHarnessClient requires one at construction.
- private final GrpcFnServer<GrpcDataService> dataServer;
- private final GrpcFnServer<GrpcStateService> stateServer;
- private final SdkHarnessClient client;
-
- static WrappedSdkHarnessClient wrapping(
- RemoteEnvironment environment, ServerFactory serverFactory) throws Exception {
- ExecutorService executor = Executors.newCachedThreadPool();
- GrpcFnServer<GrpcDataService> dataServer =
- GrpcFnServer.allocatePortAndCreateFor(
- GrpcDataService.create(executor, OutboundObserverFactory.serverDirect()),
- serverFactory);
- GrpcFnServer<GrpcStateService> stateServer =
- GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);
- SdkHarnessClient client =
- SdkHarnessClient.usingFnApiClient(
- environment.getInstructionRequestHandler(), dataServer.getService());
- return new WrappedSdkHarnessClient(environment, executor, dataServer, stateServer, client);
- }
-
- private WrappedSdkHarnessClient(
- RemoteEnvironment environment,
- ExecutorService executor,
- GrpcFnServer<GrpcDataService> dataServer,
- GrpcFnServer<GrpcStateService> stateServer,
- SdkHarnessClient client) {
- this.executor = executor;
- this.environment = environment;
- this.dataServer = dataServer;
- this.stateServer = stateServer;
- this.client = client;
- }
-
- SdkHarnessClient getClient() {
- return client;
- }
-
- GrpcFnServer<GrpcStateService> getStateServer() {
- return stateServer;
- }
-
- GrpcFnServer<GrpcDataService> getDataServer() {
- return dataServer;
- }
-
- @Override
- public void close() throws Exception {
- try (AutoCloseable stateServerCloser = stateServer;
- AutoCloseable dataServerCloser = dataServer;
- AutoCloseable envCloser = environment;
- AutoCloseable executorCloser = executor::shutdown) {
- // Wrap resources in try-with-resources to ensure all are cleaned up.
- }
- // TODO: Wait for executor shutdown?
- }
- }
-
private enum Platform {
MAC,
LINUX,
@@ -392,6 +155,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
}
/** Create {@link EnvironmentFactory} for the given services. */
+ @Override
protected EnvironmentFactory getEnvironmentFactory(
GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
GrpcFnServer<GrpcLoggingService> loggingServiceServer,
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
similarity index 72%
copy from runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
copy to runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
index 1e7f48b..3c1bd6e 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
@@ -24,13 +24,11 @@ import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-import com.google.common.net.HostAndPort;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
@@ -43,7 +41,6 @@ import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrie
import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.BundleProcessor;
import org.apache.beam.runners.fnexecution.data.GrpcDataService;
-import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
@@ -62,51 +59,24 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * A {@link JobBundleFactory} that uses a {@link DockerEnvironmentFactory} for environment
- * management. Note that returned {@link StageBundleFactory stage bundle factories} are not
- * thread-safe. Instead, a new stage factory should be created for each client.
+ * A base for a {@link JobBundleFactory} for which the implementation can specify a custom {@link
+ * EnvironmentFactory} for environment management. Note that returned {@link StageBundleFactory
+ * stage bundle factories} are not thread-safe. Instead, a new stage factory should be created for
+ * each client.
*/
@ThreadSafe
-public class DockerJobBundleFactory implements JobBundleFactory {
- private static final Logger LOG = LoggerFactory.getLogger(DockerJobBundleFactory.class);
+public abstract class JobBundleFactoryBase implements JobBundleFactory {
+ private static final Logger LOG = LoggerFactory.getLogger(JobBundleFactoryBase.class);
- // Port offset for MacOS since we don't have host networking and need to use published ports
- private static final int MAC_PORT_START = 8100;
- private static final int MAC_PORT_END = 8200;
- private static final AtomicInteger MAC_PORT = new AtomicInteger(MAC_PORT_START);
+ final IdGenerator stageIdGenerator;
+ final GrpcFnServer<FnApiControlClientPoolService> controlServer;
+ final GrpcFnServer<GrpcLoggingService> loggingServer;
+ final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+ final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
- /** Factory that creates {@link JobBundleFactory} for the given {@link JobInfo}. */
- public interface JobBundleFactoryFactory {
- DockerJobBundleFactory create(JobInfo jobInfo) throws Exception;
- }
- // TODO (BEAM-4819): a hacky way to override the factory for testing.
- // Should be replaced with mechanism that let's users configure their own factory
- public static final AtomicReference<JobBundleFactoryFactory> FACTORY =
- new AtomicReference(
- new JobBundleFactoryFactory() {
- @Override
- public DockerJobBundleFactory create(JobInfo jobInfo) throws Exception {
- return new DockerJobBundleFactory(jobInfo);
- }
- });
-
- // TODO: This host name seems to change with every other Docker release. Do we attempt to keep up
- // or attempt to document the supported Docker version(s)?
- private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
-
- private final IdGenerator stageIdGenerator;
- private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
- private final GrpcFnServer<GrpcLoggingService> loggingServer;
- private final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
- private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
-
- private final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
+ final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
- public static DockerJobBundleFactory create(JobInfo jobInfo) throws Exception {
- return FACTORY.get().create(jobInfo);
- }
-
- protected DockerJobBundleFactory(JobInfo jobInfo) throws Exception {
+ JobBundleFactoryBase(JobInfo jobInfo) throws Exception {
ServerFactory serverFactory = getServerFactory();
IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
ControlClientPool clientPool = MapControlClientPool.create();
@@ -142,7 +112,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
}
@VisibleForTesting
- DockerJobBundleFactory(
+ JobBundleFactoryBase(
EnvironmentFactory environmentFactory,
ServerFactory serverFactory,
IdGenerator stageIdGenerator,
@@ -158,6 +128,15 @@ public class DockerJobBundleFactory implements JobBundleFactory {
this.environmentCache = createEnvironmentCache(environmentFactory, serverFactory);
}
+ /** Create {@link EnvironmentFactory} for the given services. */
+ abstract EnvironmentFactory getEnvironmentFactory(
+ GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+ GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+ GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+ GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+ ControlClientPool.Source clientSource,
+ IdGenerator idGenerator);
+
private LoadingCache<Environment, WrappedSdkHarnessClient> createEnvironmentCache(
EnvironmentFactory environmentFactory, ServerFactory serverFactory) {
return CacheBuilder.newBuilder()
@@ -214,52 +193,17 @@ public class DockerJobBundleFactory implements JobBundleFactory {
}
protected ServerFactory getServerFactory() {
- switch (getPlatform()) {
- case LINUX:
- return ServerFactory.createDefault();
- case MAC:
- // NOTE: Deployment on Macs is intended for local development. As of 18.03, Docker-for-Mac
- // does not implement host networking (--networking=host is effectively a no-op). Instead,
- // we use a special DNS entry that points to the host:
- // https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds
- // The special hostname has historically changed between versions, so this is subject to
- // breakages and will likely only support the latest version at any time.
-
- // We need to use a fixed port range due to non-existing host networking in Docker-for-Mac.
- // The port range needs to be published when bringing up the Docker container, see
- // DockerEnvironmentFactory.
-
- return ServerFactory.createWithUrlFactoryAndPortSupplier(
- (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString(),
- // We only use the published Docker ports 8100-8200 in a round-robin fashion
- () -> MAC_PORT.getAndUpdate(val -> val == MAC_PORT_END ? MAC_PORT_START : val + 1));
- default:
- LOG.warn("Unknown Docker platform. Falling back to default server factory");
- return ServerFactory.createDefault();
- }
- }
-
- private static Platform getPlatform() {
- String osName = System.getProperty("os.name").toLowerCase();
- // TODO: Make this more robust?
- // The DOCKER_MAC_CONTAINER environment variable is necessary to detect whether we run on
- // a container on MacOs. MacOs internally uses a Linux VM which makes it indistinguishable from Linux.
- // We still need to apply port mapping due to missing host networking.
- if (osName.startsWith("mac") || "1".equals(System.getenv("DOCKER_MAC_CONTAINER"))) {
- return Platform.MAC;
- } else if (osName.startsWith("linux")) {
- return Platform.LINUX;
- }
- return Platform.OTHER;
+ return ServerFactory.createDefault();
}
- private static class SimpleStageBundleFactory implements StageBundleFactory {
+ /** A simple stage bundle factory for remotely processing bundles. */
+ protected static class SimpleStageBundleFactory implements StageBundleFactory {
private final BundleProcessor processor;
private final ExecutableProcessBundleDescriptor processBundleDescriptor;
// Store the wrapped client in order to keep a live reference into the cache.
- private WrappedSdkHarnessClient wrappedClient;
+ @SuppressFBWarnings private WrappedSdkHarnessClient wrappedClient;
static SimpleStageBundleFactory create(
WrappedSdkHarnessClient wrappedClient,
@@ -324,7 +268,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
* now, there is a 1:1 relationship between data services and harness clients. The servers are
* packaged here to tie server lifetimes to harness client lifetimes.
*/
- private static class WrappedSdkHarnessClient implements AutoCloseable {
+ protected static class WrappedSdkHarnessClient implements AutoCloseable {
private final RemoteEnvironment environment;
private final ExecutorService executor;
// TODO: How should data server lifetime be scoped? It is necessary here for now because
@@ -384,27 +328,4 @@ public class DockerJobBundleFactory implements JobBundleFactory {
// TODO: Wait for executor shutdown?
}
}
-
- private enum Platform {
- MAC,
- LINUX,
- OTHER,
- }
-
- /** Create {@link EnvironmentFactory} for the given services. */
- protected EnvironmentFactory getEnvironmentFactory(
- GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
- GrpcFnServer<GrpcLoggingService> loggingServiceServer,
- GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
- GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
- ControlClientPool.Source clientSource,
- IdGenerator idGenerator) {
- return DockerEnvironmentFactory.forServices(
- controlServiceServer,
- loggingServiceServer,
- retrievalServiceServer,
- provisioningServiceServer,
- clientSource,
- idGenerator);
- }
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactory.java
new file mode 100644
index 0000000..d165733
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link JobBundleFactoryBase} which uses a {@link ProcessEnvironmentFactory} to run the SDK
+ * harness in an external process.
+ */
+public class ProcessJobBundleFactory extends JobBundleFactoryBase {
+ private static final Logger LOG = LoggerFactory.getLogger(ProcessJobBundleFactory.class);
+
+ public static ProcessJobBundleFactory create(JobInfo jobInfo) throws Exception {
+ return new ProcessJobBundleFactory(jobInfo);
+ }
+
+ protected ProcessJobBundleFactory(JobInfo jobInfo) throws Exception {
+ super(jobInfo);
+ }
+
+ @VisibleForTesting
+ ProcessJobBundleFactory(
+ ProcessEnvironmentFactory envFactory,
+ ServerFactory serverFactory,
+ IdGenerator stageIdGenerator,
+ GrpcFnServer<FnApiControlClientPoolService> controlServer,
+ GrpcFnServer<GrpcLoggingService> loggingServer,
+ GrpcFnServer<ArtifactRetrievalService> retrievalServer,
+ GrpcFnServer<StaticGrpcProvisionService> provisioningServer) {
+ super(
+ envFactory,
+ serverFactory,
+ stageIdGenerator,
+ controlServer,
+ loggingServer,
+ retrievalServer,
+ provisioningServer);
+ }
+
+ @Override
+ protected EnvironmentFactory getEnvironmentFactory(
+ GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+ GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+ GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+ GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+ ControlClientPool.Source clientSource,
+ IdGenerator idGenerator) {
+ return ProcessEnvironmentFactory.create(
+ controlServiceServer,
+ loggingServiceServer,
+ retrievalServiceServer,
+ provisioningServiceServer,
+ clientSource,
+ idGenerator);
+ }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java
new file mode 100644
index 0000000..f6c91ee
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.environment;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+
+/**
+ * Environment for process-based execution. The environment is responsible for stopping the process.
+ */
+public class ProcessEnvironment implements RemoteEnvironment {
+
+ private final ProcessManager processManager;
+ private final RunnerApi.Environment environment;
+ private final String workerId;
+ private final InstructionRequestHandler instructionHandler;
+ private final Object lock = new Object();
+
+ private boolean isClosed;
+
+ public static RemoteEnvironment create(
+ ProcessManager processManager,
+ RunnerApi.Environment environment,
+ String workerId,
+ InstructionRequestHandler instructionHandler) {
+ return new ProcessEnvironment(processManager, environment, workerId, instructionHandler);
+ }
+
+ private ProcessEnvironment(
+ ProcessManager processManager,
+ RunnerApi.Environment environment,
+ String workerId,
+ InstructionRequestHandler instructionHandler) {
+
+ this.processManager = processManager;
+ this.environment = environment;
+ this.workerId = workerId;
+ this.instructionHandler = instructionHandler;
+ }
+
+ @Override
+ public RunnerApi.Environment getEnvironment() {
+ return environment;
+ }
+
+ @Override
+ public InstructionRequestHandler getInstructionRequestHandler() {
+ return instructionHandler;
+ }
+
+ @Override
+ public void close() throws Exception {
+ synchronized (lock) {
+ if (!isClosed) {
+ instructionHandler.close();
+ processManager.stopProcess(workerId);
+ isClosed = true;
+ }
+ }
+ }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
new file mode 100644
index 0000000..7b89ecc
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link EnvironmentFactory} which forks processes based on the given URL in the Environment.
+ * The returned {@link ProcessEnvironment} has to make sure to stop the processes.
+ */
+public class ProcessEnvironmentFactory implements EnvironmentFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentFactory.class);
+
+ public static ProcessEnvironmentFactory create(
+ GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+ GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+ GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+ GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+ ControlClientPool.Source clientSource,
+ IdGenerator idGenerator) {
+ return create(
+ ProcessManager.create(),
+ controlServiceServer,
+ loggingServiceServer,
+ retrievalServiceServer,
+ provisioningServiceServer,
+ clientSource,
+ idGenerator);
+ }
+
+ public static ProcessEnvironmentFactory create(
+ ProcessManager processManager,
+ GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+ GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+ GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+ GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+ ControlClientPool.Source clientSource,
+ IdGenerator idGenerator) {
+ return new ProcessEnvironmentFactory(
+ processManager,
+ controlServiceServer,
+ loggingServiceServer,
+ retrievalServiceServer,
+ provisioningServiceServer,
+ idGenerator,
+ clientSource);
+ }
+
+ private final ProcessManager processManager;
+ private final GrpcFnServer<FnApiControlClientPoolService> controlServiceServer;
+ private final GrpcFnServer<GrpcLoggingService> loggingServiceServer;
+ private final GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer;
+ private final GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer;
+ private final IdGenerator idGenerator;
+ private final ControlClientPool.Source clientSource;
+
+ private ProcessEnvironmentFactory(
+ ProcessManager processManager,
+ GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+ GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+ GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+ GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+ IdGenerator idGenerator,
+ ControlClientPool.Source clientSource) {
+ this.processManager = processManager;
+ this.controlServiceServer = controlServiceServer;
+ this.loggingServiceServer = loggingServiceServer;
+ this.retrievalServiceServer = retrievalServiceServer;
+ this.provisioningServiceServer = provisioningServiceServer;
+ this.idGenerator = idGenerator;
+ this.clientSource = clientSource;
+ }
+
+ /** Creates a new, active {@link RemoteEnvironment} backed by a forked process. */
+ @Override
+ public RemoteEnvironment createEnvironment(Environment environment) throws Exception {
+ String workerId = idGenerator.getId();
+
+ // TODO The Environment Protobuf message needs to be changed for process environment
+ String executable = environment.getUrl();
+ String loggingEndpoint = loggingServiceServer.getApiServiceDescriptor().getUrl();
+ String artifactEndpoint = retrievalServiceServer.getApiServiceDescriptor().getUrl();
+ String provisionEndpoint = provisioningServiceServer.getApiServiceDescriptor().getUrl();
+ String controlEndpoint = controlServiceServer.getApiServiceDescriptor().getUrl();
+
+ List<String> args =
+ ImmutableList.of(
+ String.format("--id=%s", workerId),
+ String.format("--logging_endpoint=%s", loggingEndpoint),
+ String.format("--artifact_endpoint=%s", artifactEndpoint),
+ String.format("--provision_endpoint=%s", provisionEndpoint),
+ String.format("--control_endpoint=%s", controlEndpoint));
+
+ LOG.debug("Creating Process for worker ID {}", workerId);
+ // Wrap the blocking call to clientSource.get in case an exception is thrown.
+ InstructionRequestHandler instructionHandler = null;
+ try {
+ ProcessManager.RunningProcess process =
+ processManager.startProcess(workerId, executable, args);
+ // Wait on a client from the gRPC server.
+ while (instructionHandler == null) {
+ try {
+ // If the process is not alive anymore, we abort.
+ process.isAliveOrThrow();
+ instructionHandler = clientSource.take(workerId, Duration.ofMinutes(2));
+ } catch (TimeoutException timeoutEx) {
+ LOG.info(
+ "Still waiting for startup of environment '{}' for worker id {}",
+ environment.getUrl(),
+ workerId);
+ } catch (InterruptedException interruptEx) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(interruptEx);
+ }
+ }
+ } catch (Exception e) {
+ try {
+ processManager.stopProcess(workerId);
+ } catch (Exception processKillException) {
+ e.addSuppressed(processKillException);
+ }
+ throw e;
+ }
+
+ return ProcessEnvironment.create(processManager, environment, workerId, instructionHandler);
+ }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
new file mode 100644
index 0000000..8f224fd
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.environment;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A simple process manager which forks processes and kills them if necessary. */
+@ThreadSafe
+public class ProcessManager {
+ private static final Logger LOG = LoggerFactory.getLogger(ProcessManager.class);
+
+ /** For debugging purposes, we inherit I/O of processes. */
+ private static final boolean INHERIT_IO = LOG.isDebugEnabled();
+
+ /** A list of all managers to ensure all processes shutdown on JVM exit . */
+ private static final List<ProcessManager> ALL_PROCESS_MANAGERS = new ArrayList<>();
+
+ static {
+ // Install a shutdown hook to ensure processes are stopped/killed.
+ Runtime.getRuntime().addShutdownHook(ShutdownHook.create());
+ }
+
+ private final Map<String, Process> processes;
+
+ public static ProcessManager create() {
+ synchronized (ALL_PROCESS_MANAGERS) {
+ ProcessManager processManager = new ProcessManager();
+ ALL_PROCESS_MANAGERS.add(processManager);
+ return processManager;
+ }
+ }
+
+ private ProcessManager() {
+ this.processes = Collections.synchronizedMap(new HashMap<>());
+ }
+
+ static class RunningProcess {
+ private Process process;
+
+ RunningProcess(Process process) {
+ this.process = process;
+ }
+
+ /** Checks if the underlying process is still running. */
+ void isAliveOrThrow() throws IllegalStateException {
+ if (!process.isAlive()) {
+ throw new IllegalStateException("Process died with exit code " + process.exitValue());
+ }
+ }
+
+ @VisibleForTesting
+ Process getUnderlyingProcess() {
+ return process;
+ }
+ }
+
+ /**
+ * Forks a process with the given command and arguments.
+ *
+ * @param id A unique id for the process
+ * @param command the name of the executable to run
+ * @param args arguments to provide to the executable
+ * @return A RunningProcess which can be checked for liveness
+ */
+ RunningProcess startProcess(String id, String command, List<String> args) throws IOException {
+ return startProcess(id, command, args, Collections.emptyMap());
+ }
+
+ /**
+ * Forks a process with the given command, arguments, and additional environment variables.
+ *
+ * @param id A unique id for the process
+ * @param command The name of the executable to run
+ * @param args Arguments to provide to the executable
+ * @param env Additional environment variables for the process to be forked
+ * @return A RunningProcess which can be checked for liveness
+ */
+ public RunningProcess startProcess(
+ String id, String command, List<String> args, Map<String, String> env) throws IOException {
+ checkNotNull(id, "Process id must not be null");
+ checkNotNull(command, "Command must not be null");
+ checkNotNull(args, "Process args must not be null");
+ checkNotNull(env, "Environment map must not be null");
+
+ ProcessBuilder pb =
+ new ProcessBuilder(ImmutableList.<String>builder().add(command).addAll(args).build());
+ pb.environment().putAll(env);
+
+ if (INHERIT_IO) {
+ LOG.debug(
+ "==> DEBUG enabled: Inheriting stdout/stderr of process (adjustable in ProcessManager)");
+ pb.inheritIO();
+ } else {
+ pb.redirectErrorStream(true);
+ // Pipe stdout and stderr to /dev/null to avoid blocking the process due to filled PIPE buffer
+ if (System.getProperty("os.name", "").startsWith("Windows")) {
+ pb.redirectOutput(new File("nul"));
+ } else {
+ pb.redirectOutput(new File("/dev/null"));
+ }
+ }
+
+ LOG.debug("Attempting to start process with command: {}", pb.command());
+ Process newProcess = pb.start();
+ Process oldProcess = processes.put(id, newProcess);
+ if (oldProcess != null) {
+ stopProcess(id, oldProcess);
+ stopProcess(id, newProcess);
+ throw new IllegalStateException("There was already a process running with id " + id);
+ }
+
+ return new RunningProcess(newProcess);
+ }
+
+ /** Stops a previously started process identified by its unique id. */
+ public void stopProcess(String id) {
+ checkNotNull(id, "Process id must not be null");
+ Process process = checkNotNull(processes.remove(id), "Process for id does not exist: " + id);
+ stopProcess(id, process);
+ }
+
+ private void stopProcess(String id, Process process) {
+ if (process.isAlive()) {
+ LOG.debug("Attempting to stop process with id {}", id);
+ // first try to kill gracefully
+ process.destroy();
+ long maxTimeToWait = 2000;
+ if (waitForProcessToDie(process, maxTimeToWait)) {
+ LOG.debug("Process for worker {} shut down gracefully.", id);
+ } else {
+ LOG.info("Process for worker {} still running. Killing.", id);
+ process.destroyForcibly();
+ if (waitForProcessToDie(process, maxTimeToWait)) {
+ LOG.debug("Process for worker {} killed.", id);
+ } else {
+ LOG.warn("Process for worker {} could not be killed.", id);
+ }
+ }
+ }
+ }
+
+ /** Returns true if the process exists within maxWaitTimeMillis. */
+ private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis) {
+ final long startTime = System.currentTimeMillis();
+ while (process.isAlive() && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted while waiting on process", e);
+ }
+ }
+ return !process.isAlive();
+ }
+
+ private static class ShutdownHook extends Thread {
+
+ private static ShutdownHook create() {
+ return new ShutdownHook();
+ }
+
+ private ShutdownHook() {}
+
+ @Override
+ @SuppressFBWarnings("SWL_SLEEP_WITH_LOCK_HELD")
+ public void run() {
+ synchronized (ALL_PROCESS_MANAGERS) {
+ ALL_PROCESS_MANAGERS.forEach(ProcessManager::stopAllProcesses);
+ for (ProcessManager pm : ALL_PROCESS_MANAGERS) {
+ if (pm.processes.values().stream().anyMatch(Process::isAlive)) {
+ try {
+ // Graceful shutdown period
+ Thread.sleep(200);
+ break;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses);
+ }
+ }
+ }
+
+ /** Stop all remaining processes gracefully, i.e. upon JVM shutdown */
+ private void stopAllProcesses() {
+ processes.forEach((id, process) -> process.destroy());
+ }
+
+ /** Kill all remaining processes forcibly, i.e. upon JVM shutdown */
+ private void killAllProcesses() {
+ processes.forEach((id, process) -> process.destroyForcibly());
+ }
+}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactoryTest.java
new file mode 100644
index 0000000..99f94dd
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactoryTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
+import org.apache.beam.runners.core.construction.ModelCoders;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link ProcessEnvironmentFactory}. */
+@RunWith(JUnit4.class)
+public class ProcessJobBundleFactoryTest {
+
+ @Mock private ProcessEnvironmentFactory envFactory;
+ @Mock private RemoteEnvironment remoteEnvironment;
+ @Mock private InstructionRequestHandler instructionHandler;
+ @Mock private ServerFactory serverFactory;
+ @Mock GrpcFnServer<FnApiControlClientPoolService> controlServer;
+ @Mock GrpcFnServer<GrpcLoggingService> loggingServer;
+ @Mock GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+ @Mock GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
+
+ private final Environment environment = Environment.newBuilder().setUrl("env-url").build();
+ private final IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
+ private final InstructionResponse instructionResponse =
+ InstructionResponse.newBuilder().setInstructionId("instruction-id").build();
+
+ @Before
+ public void setUpMocks() throws Exception {
+ MockitoAnnotations.initMocks(this);
+ when(envFactory.createEnvironment(environment)).thenReturn(remoteEnvironment);
+ when(remoteEnvironment.getInstructionRequestHandler()).thenReturn(instructionHandler);
+ when(instructionHandler.handle(any()))
+ .thenReturn(CompletableFuture.completedFuture(instructionResponse));
+ }
+
+ @Test
+ public void createsCorrectEnvironment() throws Exception {
+ try (ProcessJobBundleFactory bundleFactory =
+ new ProcessJobBundleFactory(
+ envFactory,
+ serverFactory,
+ stageIdGenerator,
+ controlServer,
+ loggingServer,
+ retrievalServer,
+ provisioningServer)) {
+ bundleFactory.forStage(getExecutableStage(environment));
+ verify(envFactory).createEnvironment(environment);
+ }
+ }
+
+ @Test
+ public void closesEnvironmentOnCleanup() throws Exception {
+ ProcessJobBundleFactory bundleFactory =
+ new ProcessJobBundleFactory(
+ envFactory,
+ serverFactory,
+ stageIdGenerator,
+ controlServer,
+ loggingServer,
+ retrievalServer,
+ provisioningServer);
+ try (AutoCloseable unused = bundleFactory) {
+ bundleFactory.forStage(getExecutableStage(environment));
+ }
+ verify(remoteEnvironment).close();
+ }
+
+ @Test
+ public void cachesEnvironment() throws Exception {
+ try (ProcessJobBundleFactory bundleFactory =
+ new ProcessJobBundleFactory(
+ envFactory,
+ serverFactory,
+ stageIdGenerator,
+ controlServer,
+ loggingServer,
+ retrievalServer,
+ provisioningServer)) {
+ StageBundleFactory bf1 = bundleFactory.forStage(getExecutableStage(environment));
+ StageBundleFactory bf2 = bundleFactory.forStage(getExecutableStage(environment));
+ // NOTE: We hang on to stage bundle references to ensure their underlying environments are not
+ // garbage collected. For additional safety, we print the factories to ensure the referernces
+ // are not optimized away.
+ System.out.println("bundle factory 1:" + bf1);
+ System.out.println("bundle factory 1:" + bf2);
+ verify(envFactory).createEnvironment(environment);
+ verifyNoMoreInteractions(envFactory);
+ }
+ }
+
+ @Test
+ public void doesNotCacheDifferentEnvironments() throws Exception {
+ Environment envFoo = Environment.newBuilder().setUrl("foo-env-url").build();
+ RemoteEnvironment remoteEnvFoo = mock(RemoteEnvironment.class);
+ InstructionRequestHandler fooInstructionHandler = mock(InstructionRequestHandler.class);
+ when(envFactory.createEnvironment(envFoo)).thenReturn(remoteEnvFoo);
+ when(remoteEnvFoo.getInstructionRequestHandler()).thenReturn(fooInstructionHandler);
+ // Don't bother creating a distinct instruction response because we don't use it here.
+ when(fooInstructionHandler.handle(any()))
+ .thenReturn(CompletableFuture.completedFuture(instructionResponse));
+
+ try (ProcessJobBundleFactory bundleFactory =
+ new ProcessJobBundleFactory(
+ envFactory,
+ serverFactory,
+ stageIdGenerator,
+ controlServer,
+ loggingServer,
+ retrievalServer,
+ provisioningServer)) {
+ bundleFactory.forStage(getExecutableStage(environment));
+ bundleFactory.forStage(getExecutableStage(envFoo));
+ verify(envFactory).createEnvironment(environment);
+ verify(envFactory).createEnvironment(envFoo);
+ verifyNoMoreInteractions(envFactory);
+ }
+ }
+
+ private static ExecutableStage getExecutableStage(Environment environment) {
+ return ExecutableStage.fromPayload(
+ ExecutableStagePayload.newBuilder()
+ .setInput("input-pc")
+ .setEnvironment(environment)
+ .setComponents(
+ Components.newBuilder()
+ .putPcollections(
+ "input-pc",
+ PCollection.newBuilder()
+ .setWindowingStrategyId("windowing-strategy")
+ .setCoderId("coder-id")
+ .build())
+ .putWindowingStrategies(
+ "windowing-strategy",
+ WindowingStrategy.newBuilder().setWindowCoderId("coder-id").build())
+ .putCoders(
+ "coder-id",
+ Coder.newBuilder()
+ .setSpec(
+ SdkFunctionSpec.newBuilder()
+ .setSpec(
+ FunctionSpec.newBuilder()
+ .setUrn(ModelCoders.INTERVAL_WINDOW_CODER_URN)
+ .build())
+ .build())
+ .build())
+ .build())
+ .build());
+ }
+}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactoryTest.java
new file mode 100644
index 0000000..b60dcda
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactoryTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link ProcessEnvironmentFactory}. */
+@RunWith(JUnit4.class)
+public class ProcessEnvironmentFactoryTest {
+
+ private static final ApiServiceDescriptor SERVICE_DESCRIPTOR =
+ ApiServiceDescriptor.newBuilder().setUrl("service-url").build();
+ private static final String COMMAND = "my-command";
+ private static final Environment ENVIRONMENT = Environment.newBuilder().setUrl(COMMAND).build();
+
+ private static final InspectibleIdGenerator ID_GENERATOR = new InspectibleIdGenerator();
+
+ @Mock private ProcessManager processManager;
+
+ @Mock private GrpcFnServer<FnApiControlClientPoolService> controlServiceServer;
+ @Mock private GrpcFnServer<GrpcLoggingService> loggingServiceServer;
+ @Mock private GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer;
+ @Mock private GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer;
+
+ @Mock private InstructionRequestHandler client;
+ private ProcessEnvironmentFactory factory;
+
+ @Before
+ public void initMocks() throws IOException {
+ MockitoAnnotations.initMocks(this);
+
+ when(processManager.startProcess(anyString(), anyString(), anyList()))
+ .thenReturn(Mockito.mock(ProcessManager.RunningProcess.class));
+ when(controlServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+ when(loggingServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+ when(retrievalServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+ when(provisioningServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+ factory =
+ ProcessEnvironmentFactory.create(
+ processManager,
+ controlServiceServer,
+ loggingServiceServer,
+ retrievalServiceServer,
+ provisioningServiceServer,
+ (workerId, timeout) -> client,
+ ID_GENERATOR);
+ }
+
+ @Test
+ public void createsCorrectEnvironment() throws Exception {
+ RemoteEnvironment handle = factory.createEnvironment(ENVIRONMENT);
+ assertThat(handle.getInstructionRequestHandler(), is(client));
+ assertThat(handle.getEnvironment(), equalTo(ENVIRONMENT));
+ Mockito.verify(processManager).startProcess(eq(ID_GENERATOR.currentId), anyString(), anyList());
+ }
+
+ @Test
+ public void destroysCorrectContainer() throws Exception {
+ RemoteEnvironment handle = factory.createEnvironment(ENVIRONMENT);
+ handle.close();
+ verify(processManager).stopProcess(ID_GENERATOR.currentId);
+ }
+
+ @Test
+ public void createsMultipleEnvironments() throws Exception {
+ Environment fooEnv = Environment.newBuilder().setUrl("foo").build();
+ RemoteEnvironment fooHandle = factory.createEnvironment(fooEnv);
+ assertThat(fooHandle.getEnvironment(), is(equalTo(fooEnv)));
+
+ Environment barEnv = Environment.newBuilder().setUrl("bar").build();
+ RemoteEnvironment barHandle = factory.createEnvironment(barEnv);
+ assertThat(barHandle.getEnvironment(), is(equalTo(barEnv)));
+ }
+
+ private static class InspectibleIdGenerator implements IdGenerator {
+
+ private IdGenerator generator = IdGenerators.incrementingLongs();
+ String currentId;
+
+ @Override
+ public String getId() {
+ currentId = generator.getId();
+ return currentId;
+ }
+ }
+}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentTest.java
new file mode 100644
index 0000000..26a8328
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.environment;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ProcessEnvironment}. */
+@RunWith(JUnit4.class)
+public class ProcessEnvironmentTest {
+
+ @Test
+ public void closeClosesInstructionRequestHandler() throws Exception {
+ InstructionRequestHandler handler = mock(InstructionRequestHandler.class);
+ RemoteEnvironment env =
+ ProcessEnvironment.create(
+ mock(ProcessManager.class), Environment.getDefaultInstance(), "1", handler);
+
+ env.close();
+ verify(handler).close();
+ }
+}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessManagerTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessManagerTest.java
new file mode 100644
index 0000000..cdcb53e
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessManagerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.environment;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ProcessManager}. */
+@RunWith(JUnit4.class)
+public class ProcessManagerTest {
+
+ @Test
+ public void testRunSimpleCommand() throws IOException {
+ ProcessManager processManager = ProcessManager.create();
+ processManager.startProcess("1", "bash", Collections.emptyList());
+ processManager.stopProcess("1");
+ processManager.startProcess("2", "bash", Arrays.asList("-c", "ls"));
+ processManager.stopProcess("2");
+ processManager.startProcess("1", "bash", Arrays.asList("-c", "ls", "-l", "-a"));
+ processManager.stopProcess("1");
+ }
+
+ @Test
+ public void testRunInvalidExecutable() throws IOException {
+ ProcessManager processManager = ProcessManager.create();
+ try {
+ processManager.startProcess("1", "asfasfls", Collections.emptyList());
+ fail();
+ } catch (IOException e) {
+ assertThat(e.getMessage(), containsString("Cannot run program \"asfasfls\""));
+ }
+ }
+
+ @Test
+ public void testDuplicateId() throws IOException {
+ ProcessManager processManager = ProcessManager.create();
+ processManager.startProcess("1", "bash", Arrays.asList("-c", "ls"));
+ try {
+ processManager.startProcess("1", "bash", Arrays.asList("-c", "ls"));
+ fail();
+ } catch (IllegalStateException e) {
+ // this is what we want
+ } finally {
+ processManager.stopProcess("1");
+ }
+ }
+
+ @Test
+ public void testLivenessCheck() throws IOException {
+ ProcessManager processManager = ProcessManager.create();
+ ProcessManager.RunningProcess process =
+ processManager.startProcess("1", "bash", Arrays.asList("-c", "sleep", "1000"));
+ process.isAliveOrThrow();
+ processManager.stopProcess("1");
+ try {
+ process.isAliveOrThrow();
+ fail();
+ } catch (IllegalStateException e) {
+ // this is what we want
+ }
+ }
+
+ @Test
+ public void testEnvironmentVariables() throws IOException, InterruptedException {
+ ProcessManager processManager = ProcessManager.create();
+ ProcessManager.RunningProcess process =
+ processManager.startProcess(
+ "1",
+ "bash",
+ Arrays.asList("-c", "sleep $PARAM"),
+ Collections.singletonMap("PARAM", "-h"));
+ for (int i = 0; i < 10 && process.getUnderlyingProcess().isAlive(); i++) {
+ Thread.sleep(100);
+ }
+ assertThat(process.getUnderlyingProcess().exitValue(), is(1));
+ processManager.stopProcess("1");
+ }
+}