You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by th...@apache.org on 2018/08/30 18:02:50 UTC

[beam] branch master updated: [BEAM-5187] Add a ProcessJobBundleFactory for process-based execution (#6287)

This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5720c1d  [BEAM-5187] Add a ProcessJobBundleFactory for process-based execution (#6287)
5720c1d is described below

commit 5720c1d22771a65ad5d7be6a06ad8aa0754fa64b
Author: Maximilian Michels <ma...@posteo.de>
AuthorDate: Thu Aug 30 20:02:45 2018 +0200

    [BEAM-5187] Add a ProcessJobBundleFactory for process-based execution (#6287)
---
 .../control/DockerJobBundleFactory.java            | 266 ++-------------------
 ...undleFactory.java => JobBundleFactoryBase.java} | 137 +++--------
 .../control/ProcessJobBundleFactory.java           |  84 +++++++
 .../environment/ProcessEnvironment.java            |  77 ++++++
 .../environment/ProcessEnvironmentFactory.java     | 157 ++++++++++++
 .../fnexecution/environment/ProcessManager.java    | 225 +++++++++++++++++
 .../control/ProcessJobBundleFactoryTest.java       | 195 +++++++++++++++
 .../environment/ProcessEnvironmentFactoryTest.java | 127 ++++++++++
 .../environment/ProcessEnvironmentTest.java        |  44 ++++
 .../environment/ProcessManagerTest.java            | 103 ++++++++
 10 files changed, 1056 insertions(+), 359 deletions(-)

diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
index 1e7f48b..3178a2e 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
@@ -18,46 +18,19 @@
 package org.apache.beam.runners.fnexecution.control;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
 import com.google.common.net.HostAndPort;
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
-import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
-import org.apache.beam.runners.core.construction.graph.ExecutableStage;
-import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
 import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
-import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService;
-import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
-import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.BundleProcessor;
-import org.apache.beam.runners.fnexecution.data.GrpcDataService;
 import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
 import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
-import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
 import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
-import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
-import org.apache.beam.runners.fnexecution.state.GrpcStateService;
-import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.fn.IdGenerator;
-import org.apache.beam.sdk.fn.IdGenerators;
-import org.apache.beam.sdk.fn.data.FnDataReceiver;
-import org.apache.beam.sdk.fn.stream.OutboundObserverFactory;
-import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,7 +40,7 @@ import org.slf4j.LoggerFactory;
  * thread-safe. Instead, a new stage factory should be created for each client.
  */
 @ThreadSafe
-public class DockerJobBundleFactory implements JobBundleFactory {
+public class DockerJobBundleFactory extends JobBundleFactoryBase {
   private static final Logger LOG = LoggerFactory.getLogger(DockerJobBundleFactory.class);
 
   // Port offset for MacOS since we don't have host networking and need to use published ports
@@ -77,7 +50,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
 
   /** Factory that creates {@link JobBundleFactory} for the given {@link JobInfo}. */
   public interface JobBundleFactoryFactory {
-    DockerJobBundleFactory create(JobInfo jobInfo) throws Exception;
+    JobBundleFactory create(JobInfo jobInfo) throws Exception;
   }
   // TODO (BEAM-4819): a hacky way to override the factory for testing.
   // Should be replaced with mechanism that let's users configure their own factory
@@ -85,7 +58,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
       new AtomicReference(
           new JobBundleFactoryFactory() {
             @Override
-            public DockerJobBundleFactory create(JobInfo jobInfo) throws Exception {
+            public JobBundleFactory create(JobInfo jobInfo) throws Exception {
               return new DockerJobBundleFactory(jobInfo);
             }
           });
@@ -94,51 +67,12 @@ public class DockerJobBundleFactory implements JobBundleFactory {
   // or attempt to document the supported Docker version(s)?
   private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
 
-  private final IdGenerator stageIdGenerator;
-  private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
-  private final GrpcFnServer<GrpcLoggingService> loggingServer;
-  private final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
-  private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
-
-  private final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
-
-  public static DockerJobBundleFactory create(JobInfo jobInfo) throws Exception {
+  public static JobBundleFactory create(JobInfo jobInfo) throws Exception {
     return FACTORY.get().create(jobInfo);
   }
 
   protected DockerJobBundleFactory(JobInfo jobInfo) throws Exception {
-    ServerFactory serverFactory = getServerFactory();
-    IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
-    ControlClientPool clientPool = MapControlClientPool.create();
-
-    GrpcFnServer<FnApiControlClientPoolService> controlServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            FnApiControlClientPoolService.offeringClientsToPool(
-                clientPool.getSink(), GrpcContextHeaderAccessorProvider.getHeaderAccessor()),
-            serverFactory);
-    GrpcFnServer<GrpcLoggingService> loggingServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), serverFactory);
-    GrpcFnServer<ArtifactRetrievalService> retrievalServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            BeamFileSystemArtifactRetrievalService.create(), serverFactory);
-    GrpcFnServer<StaticGrpcProvisionService> provisioningServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            StaticGrpcProvisionService.create(jobInfo.toProvisionInfo()), serverFactory);
-    EnvironmentFactory environmentFactory =
-        getEnvironmentFactory(
-            controlServer,
-            loggingServer,
-            retrievalServer,
-            provisioningServer,
-            clientPool.getSource(),
-            IdGenerators.incrementingLongs());
-    this.stageIdGenerator = stageIdGenerator;
-    this.controlServer = controlServer;
-    this.loggingServer = loggingServer;
-    this.retrievalServer = retrievalServer;
-    this.provisioningServer = provisioningServer;
-    this.environmentCache = createEnvironmentCache(environmentFactory, serverFactory);
+    super(jobInfo);
   }
 
   @VisibleForTesting
@@ -150,54 +84,14 @@ public class DockerJobBundleFactory implements JobBundleFactory {
       GrpcFnServer<GrpcLoggingService> loggingServer,
       GrpcFnServer<ArtifactRetrievalService> retrievalServer,
       GrpcFnServer<StaticGrpcProvisionService> provisioningServer) {
-    this.stageIdGenerator = stageIdGenerator;
-    this.controlServer = controlServer;
-    this.loggingServer = loggingServer;
-    this.retrievalServer = retrievalServer;
-    this.provisioningServer = provisioningServer;
-    this.environmentCache = createEnvironmentCache(environmentFactory, serverFactory);
-  }
-
-  private LoadingCache<Environment, WrappedSdkHarnessClient> createEnvironmentCache(
-      EnvironmentFactory environmentFactory, ServerFactory serverFactory) {
-    return CacheBuilder.newBuilder()
-        .removalListener(
-            ((RemovalNotification<Environment, WrappedSdkHarnessClient> notification) -> {
-              LOG.debug("Cleaning up for environment {}", notification.getKey().getUrl());
-              try {
-                notification.getValue().close();
-              } catch (Exception e) {
-                LOG.warn(
-                    String.format("Error cleaning up environment %s", notification.getKey()), e);
-              }
-            }))
-        .build(
-            new CacheLoader<Environment, WrappedSdkHarnessClient>() {
-              @Override
-              public WrappedSdkHarnessClient load(Environment environment) throws Exception {
-                RemoteEnvironment remoteEnvironment =
-                    environmentFactory.createEnvironment(environment);
-                return WrappedSdkHarnessClient.wrapping(remoteEnvironment, serverFactory);
-              }
-            });
-  }
-
-  @Override
-  public StageBundleFactory forStage(ExecutableStage executableStage) {
-    WrappedSdkHarnessClient wrappedClient =
-        environmentCache.getUnchecked(executableStage.getEnvironment());
-    ExecutableProcessBundleDescriptor processBundleDescriptor;
-    try {
-      processBundleDescriptor =
-          ProcessBundleDescriptors.fromExecutableStage(
-              stageIdGenerator.getId(),
-              executableStage,
-              wrappedClient.getDataServer().getApiServiceDescriptor(),
-              wrappedClient.getStateServer().getApiServiceDescriptor());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return SimpleStageBundleFactory.create(wrappedClient, processBundleDescriptor);
+    super(
+        environmentFactory,
+        serverFactory,
+        stageIdGenerator,
+        controlServer,
+        loggingServer,
+        retrievalServer,
+        provisioningServer);
   }
 
   @Override
@@ -213,6 +107,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
     provisioningServer.close();
   }
 
+  @Override
   protected ServerFactory getServerFactory() {
     switch (getPlatform()) {
       case LINUX:
@@ -253,138 +148,6 @@ public class DockerJobBundleFactory implements JobBundleFactory {
     return Platform.OTHER;
   }
 
-  private static class SimpleStageBundleFactory implements StageBundleFactory {
-
-    private final BundleProcessor processor;
-    private final ExecutableProcessBundleDescriptor processBundleDescriptor;
-
-    // Store the wrapped client in order to keep a live reference into the cache.
-    private WrappedSdkHarnessClient wrappedClient;
-
-    static SimpleStageBundleFactory create(
-        WrappedSdkHarnessClient wrappedClient,
-        ExecutableProcessBundleDescriptor processBundleDescriptor) {
-      @SuppressWarnings("unchecked")
-      BundleProcessor processor =
-          wrappedClient
-              .getClient()
-              .getProcessor(
-                  processBundleDescriptor.getProcessBundleDescriptor(),
-                  processBundleDescriptor.getRemoteInputDestinations(),
-                  wrappedClient.getStateServer().getService());
-      return new SimpleStageBundleFactory(processBundleDescriptor, processor, wrappedClient);
-    }
-
-    SimpleStageBundleFactory(
-        ExecutableProcessBundleDescriptor processBundleDescriptor,
-        BundleProcessor processor,
-        WrappedSdkHarnessClient wrappedClient) {
-      this.processBundleDescriptor = processBundleDescriptor;
-      this.processor = processor;
-      this.wrappedClient = wrappedClient;
-    }
-
-    @Override
-    public RemoteBundle getBundle(
-        OutputReceiverFactory outputReceiverFactory,
-        StateRequestHandler stateRequestHandler,
-        BundleProgressHandler progressHandler)
-        throws Exception {
-      // TODO: Consider having BundleProcessor#newBundle take in an OutputReceiverFactory rather
-      // than constructing the receiver map here. Every bundle factory will need this.
-      ImmutableMap.Builder<Target, RemoteOutputReceiver<?>> outputReceivers =
-          ImmutableMap.builder();
-      for (Map.Entry<Target, Coder<WindowedValue<?>>> targetCoder :
-          processBundleDescriptor.getOutputTargetCoders().entrySet()) {
-        Target target = targetCoder.getKey();
-        Coder<WindowedValue<?>> coder = targetCoder.getValue();
-        String bundleOutputPCollection =
-            Iterables.getOnlyElement(
-                processBundleDescriptor
-                    .getProcessBundleDescriptor()
-                    .getTransformsOrThrow(target.getPrimitiveTransformReference())
-                    .getInputsMap()
-                    .values());
-        FnDataReceiver<WindowedValue<?>> outputReceiver =
-            outputReceiverFactory.create(bundleOutputPCollection);
-        outputReceivers.put(target, RemoteOutputReceiver.of(coder, outputReceiver));
-      }
-      return processor.newBundle(outputReceivers.build(), stateRequestHandler, progressHandler);
-    }
-
-    @Override
-    public void close() throws Exception {
-      // Clear reference to encourage cache eviction. Values are weakly referenced.
-      wrappedClient = null;
-    }
-  }
-
-  /**
-   * Holder for an {@link SdkHarnessClient} along with its associated state and data servers. As of
-   * now, there is a 1:1 relationship between data services and harness clients. The servers are
-   * packaged here to tie server lifetimes to harness client lifetimes.
-   */
-  private static class WrappedSdkHarnessClient implements AutoCloseable {
-    private final RemoteEnvironment environment;
-    private final ExecutorService executor;
-    // TODO: How should data server lifetime be scoped? It is necessary here for now because
-    // SdkHarnessClient requires one at construction.
-    private final GrpcFnServer<GrpcDataService> dataServer;
-    private final GrpcFnServer<GrpcStateService> stateServer;
-    private final SdkHarnessClient client;
-
-    static WrappedSdkHarnessClient wrapping(
-        RemoteEnvironment environment, ServerFactory serverFactory) throws Exception {
-      ExecutorService executor = Executors.newCachedThreadPool();
-      GrpcFnServer<GrpcDataService> dataServer =
-          GrpcFnServer.allocatePortAndCreateFor(
-              GrpcDataService.create(executor, OutboundObserverFactory.serverDirect()),
-              serverFactory);
-      GrpcFnServer<GrpcStateService> stateServer =
-          GrpcFnServer.allocatePortAndCreateFor(GrpcStateService.create(), serverFactory);
-      SdkHarnessClient client =
-          SdkHarnessClient.usingFnApiClient(
-              environment.getInstructionRequestHandler(), dataServer.getService());
-      return new WrappedSdkHarnessClient(environment, executor, dataServer, stateServer, client);
-    }
-
-    private WrappedSdkHarnessClient(
-        RemoteEnvironment environment,
-        ExecutorService executor,
-        GrpcFnServer<GrpcDataService> dataServer,
-        GrpcFnServer<GrpcStateService> stateServer,
-        SdkHarnessClient client) {
-      this.executor = executor;
-      this.environment = environment;
-      this.dataServer = dataServer;
-      this.stateServer = stateServer;
-      this.client = client;
-    }
-
-    SdkHarnessClient getClient() {
-      return client;
-    }
-
-    GrpcFnServer<GrpcStateService> getStateServer() {
-      return stateServer;
-    }
-
-    GrpcFnServer<GrpcDataService> getDataServer() {
-      return dataServer;
-    }
-
-    @Override
-    public void close() throws Exception {
-      try (AutoCloseable stateServerCloser = stateServer;
-          AutoCloseable dataServerCloser = dataServer;
-          AutoCloseable envCloser = environment;
-          AutoCloseable executorCloser = executor::shutdown) {
-        // Wrap resources in try-with-resources to ensure all are cleaned up.
-      }
-      // TODO: Wait for executor shutdown?
-    }
-  }
-
   private enum Platform {
     MAC,
     LINUX,
@@ -392,6 +155,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
   }
 
   /** Create {@link EnvironmentFactory} for the given services. */
+  @Override
   protected EnvironmentFactory getEnvironmentFactory(
       GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
       GrpcFnServer<GrpcLoggingService> loggingServiceServer,
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
similarity index 72%
copy from runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
copy to runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
index 1e7f48b..3c1bd6e 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
@@ -24,13 +24,11 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import com.google.common.net.HostAndPort;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
@@ -43,7 +41,6 @@ import org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrie
 import org.apache.beam.runners.fnexecution.control.ProcessBundleDescriptors.ExecutableProcessBundleDescriptor;
 import org.apache.beam.runners.fnexecution.control.SdkHarnessClient.BundleProcessor;
 import org.apache.beam.runners.fnexecution.data.GrpcDataService;
-import org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory;
 import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
 import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
 import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
@@ -62,51 +59,24 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * A {@link JobBundleFactory} that uses a {@link DockerEnvironmentFactory} for environment
- * management. Note that returned {@link StageBundleFactory stage bundle factories} are not
- * thread-safe. Instead, a new stage factory should be created for each client.
+ * A base for a {@link JobBundleFactory} for which the implementation can specify a custom {@link
+ * EnvironmentFactory} for environment management. Note that returned {@link StageBundleFactory
+ * stage bundle factories} are not thread-safe. Instead, a new stage factory should be created for
+ * each client.
  */
 @ThreadSafe
-public class DockerJobBundleFactory implements JobBundleFactory {
-  private static final Logger LOG = LoggerFactory.getLogger(DockerJobBundleFactory.class);
+public abstract class JobBundleFactoryBase implements JobBundleFactory {
+  private static final Logger LOG = LoggerFactory.getLogger(JobBundleFactoryBase.class);
 
-  // Port offset for MacOS since we don't have host networking and need to use published ports
-  private static final int MAC_PORT_START = 8100;
-  private static final int MAC_PORT_END = 8200;
-  private static final AtomicInteger MAC_PORT = new AtomicInteger(MAC_PORT_START);
+  final IdGenerator stageIdGenerator;
+  final GrpcFnServer<FnApiControlClientPoolService> controlServer;
+  final GrpcFnServer<GrpcLoggingService> loggingServer;
+  final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+  final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
 
-  /** Factory that creates {@link JobBundleFactory} for the given {@link JobInfo}. */
-  public interface JobBundleFactoryFactory {
-    DockerJobBundleFactory create(JobInfo jobInfo) throws Exception;
-  }
-  // TODO (BEAM-4819): a hacky way to override the factory for testing.
-  // Should be replaced with mechanism that let's users configure their own factory
-  public static final AtomicReference<JobBundleFactoryFactory> FACTORY =
-      new AtomicReference(
-          new JobBundleFactoryFactory() {
-            @Override
-            public DockerJobBundleFactory create(JobInfo jobInfo) throws Exception {
-              return new DockerJobBundleFactory(jobInfo);
-            }
-          });
-
-  // TODO: This host name seems to change with every other Docker release. Do we attempt to keep up
-  // or attempt to document the supported Docker version(s)?
-  private static final String DOCKER_FOR_MAC_HOST = "host.docker.internal";
-
-  private final IdGenerator stageIdGenerator;
-  private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
-  private final GrpcFnServer<GrpcLoggingService> loggingServer;
-  private final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
-  private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
-
-  private final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
+  final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
 
-  public static DockerJobBundleFactory create(JobInfo jobInfo) throws Exception {
-    return FACTORY.get().create(jobInfo);
-  }
-
-  protected DockerJobBundleFactory(JobInfo jobInfo) throws Exception {
+  JobBundleFactoryBase(JobInfo jobInfo) throws Exception {
     ServerFactory serverFactory = getServerFactory();
     IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
     ControlClientPool clientPool = MapControlClientPool.create();
@@ -142,7 +112,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
   }
 
   @VisibleForTesting
-  DockerJobBundleFactory(
+  JobBundleFactoryBase(
       EnvironmentFactory environmentFactory,
       ServerFactory serverFactory,
       IdGenerator stageIdGenerator,
@@ -158,6 +128,15 @@ public class DockerJobBundleFactory implements JobBundleFactory {
     this.environmentCache = createEnvironmentCache(environmentFactory, serverFactory);
   }
 
+  /** Create {@link EnvironmentFactory} for the given services. */
+  abstract EnvironmentFactory getEnvironmentFactory(
+      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+      ControlClientPool.Source clientSource,
+      IdGenerator idGenerator);
+
   private LoadingCache<Environment, WrappedSdkHarnessClient> createEnvironmentCache(
       EnvironmentFactory environmentFactory, ServerFactory serverFactory) {
     return CacheBuilder.newBuilder()
@@ -214,52 +193,17 @@ public class DockerJobBundleFactory implements JobBundleFactory {
   }
 
   protected ServerFactory getServerFactory() {
-    switch (getPlatform()) {
-      case LINUX:
-        return ServerFactory.createDefault();
-      case MAC:
-        // NOTE: Deployment on Macs is intended for local development. As of 18.03, Docker-for-Mac
-        // does not implement host networking (--networking=host is effectively a no-op). Instead,
-        // we use a special DNS entry that points to the host:
-        // https://docs.docker.com/docker-for-mac/networking/#use-cases-and-workarounds
-        // The special hostname has historically changed between versions, so this is subject to
-        // breakages and will likely only support the latest version at any time.
-
-        // We need to use a fixed port range due to non-existing host networking in Docker-for-Mac.
-        // The port range needs to be published when bringing up the Docker container, see
-        // DockerEnvironmentFactory.
-
-        return ServerFactory.createWithUrlFactoryAndPortSupplier(
-            (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString(),
-            // We only use the published Docker ports 8100-8200 in a round-robin fashion
-            () -> MAC_PORT.getAndUpdate(val -> val == MAC_PORT_END ? MAC_PORT_START : val + 1));
-      default:
-        LOG.warn("Unknown Docker platform. Falling back to default server factory");
-        return ServerFactory.createDefault();
-    }
-  }
-
-  private static Platform getPlatform() {
-    String osName = System.getProperty("os.name").toLowerCase();
-    // TODO: Make this more robust?
-    // The DOCKER_MAC_CONTAINER environment variable is necessary to detect whether we run on
-    // a container on MacOs. MacOs internally uses a Linux VM which makes it indistinguishable from Linux.
-    // We still need to apply port mapping due to missing host networking.
-    if (osName.startsWith("mac") || "1".equals(System.getenv("DOCKER_MAC_CONTAINER"))) {
-      return Platform.MAC;
-    } else if (osName.startsWith("linux")) {
-      return Platform.LINUX;
-    }
-    return Platform.OTHER;
+    return ServerFactory.createDefault();
   }
 
-  private static class SimpleStageBundleFactory implements StageBundleFactory {
+  /** A simple stage bundle factory for remotely processing bundles. */
+  protected static class SimpleStageBundleFactory implements StageBundleFactory {
 
     private final BundleProcessor processor;
     private final ExecutableProcessBundleDescriptor processBundleDescriptor;
 
     // Store the wrapped client in order to keep a live reference into the cache.
-    private WrappedSdkHarnessClient wrappedClient;
+    @SuppressFBWarnings private WrappedSdkHarnessClient wrappedClient;
 
     static SimpleStageBundleFactory create(
         WrappedSdkHarnessClient wrappedClient,
@@ -324,7 +268,7 @@ public class DockerJobBundleFactory implements JobBundleFactory {
    * now, there is a 1:1 relationship between data services and harness clients. The servers are
    * packaged here to tie server lifetimes to harness client lifetimes.
    */
-  private static class WrappedSdkHarnessClient implements AutoCloseable {
+  protected static class WrappedSdkHarnessClient implements AutoCloseable {
     private final RemoteEnvironment environment;
     private final ExecutorService executor;
     // TODO: How should data server lifetime be scoped? It is necessary here for now because
@@ -384,27 +328,4 @@ public class DockerJobBundleFactory implements JobBundleFactory {
       // TODO: Wait for executor shutdown?
     }
   }
-
-  private enum Platform {
-    MAC,
-    LINUX,
-    OTHER,
-  }
-
-  /** Create {@link EnvironmentFactory} for the given services. */
-  protected EnvironmentFactory getEnvironmentFactory(
-      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
-      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
-      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
-      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
-      ControlClientPool.Source clientSource,
-      IdGenerator idGenerator) {
-    return DockerEnvironmentFactory.forServices(
-        controlServiceServer,
-        loggingServiceServer,
-        retrievalServiceServer,
-        provisioningServiceServer,
-        clientSource,
-        idGenerator);
-  }
 }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactory.java
new file mode 100644
index 0000000..d165733
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactory.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.control;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.environment.EnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link JobBundleFactoryBase} which uses a {@link ProcessEnvironmentFactory} to run the SDK
+ * harness in an external process.
+ */
+public class ProcessJobBundleFactory extends JobBundleFactoryBase {
+  private static final Logger LOG = LoggerFactory.getLogger(ProcessJobBundleFactory.class);
+
+  public static ProcessJobBundleFactory create(JobInfo jobInfo) throws Exception {
+    return new ProcessJobBundleFactory(jobInfo);
+  }
+
+  protected ProcessJobBundleFactory(JobInfo jobInfo) throws Exception {
+    super(jobInfo);
+  }
+
+  @VisibleForTesting
+  ProcessJobBundleFactory(
+      ProcessEnvironmentFactory envFactory,
+      ServerFactory serverFactory,
+      IdGenerator stageIdGenerator,
+      GrpcFnServer<FnApiControlClientPoolService> controlServer,
+      GrpcFnServer<GrpcLoggingService> loggingServer,
+      GrpcFnServer<ArtifactRetrievalService> retrievalServer,
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServer) {
+    super(
+        envFactory,
+        serverFactory,
+        stageIdGenerator,
+        controlServer,
+        loggingServer,
+        retrievalServer,
+        provisioningServer);
+  }
+
+  @Override
+  protected EnvironmentFactory getEnvironmentFactory(
+      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+      ControlClientPool.Source clientSource,
+      IdGenerator idGenerator) {
+    return ProcessEnvironmentFactory.create(
+        controlServiceServer,
+        loggingServiceServer,
+        retrievalServiceServer,
+        provisioningServiceServer,
+        clientSource,
+        idGenerator);
+  }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java
new file mode 100644
index 0000000..f6c91ee
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironment.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.environment;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+
+/**
+ * Environment for process-based execution. The environment is responsible for stopping the process.
+ */
+public class ProcessEnvironment implements RemoteEnvironment {
+
+  private final ProcessManager processManager;
+  private final RunnerApi.Environment environment;
+  private final String workerId;
+  private final InstructionRequestHandler instructionHandler;
+  private final Object lock = new Object();
+
+  private boolean isClosed;
+
+  public static RemoteEnvironment create(
+      ProcessManager processManager,
+      RunnerApi.Environment environment,
+      String workerId,
+      InstructionRequestHandler instructionHandler) {
+    return new ProcessEnvironment(processManager, environment, workerId, instructionHandler);
+  }
+
+  private ProcessEnvironment(
+      ProcessManager processManager,
+      RunnerApi.Environment environment,
+      String workerId,
+      InstructionRequestHandler instructionHandler) {
+
+    this.processManager = processManager;
+    this.environment = environment;
+    this.workerId = workerId;
+    this.instructionHandler = instructionHandler;
+  }
+
+  @Override
+  public RunnerApi.Environment getEnvironment() {
+    return environment;
+  }
+
+  @Override
+  public InstructionRequestHandler getInstructionRequestHandler() {
+    return instructionHandler;
+  }
+
+  @Override
+  public void close() throws Exception {
+    synchronized (lock) {
+      if (!isClosed) {
+        instructionHandler.close();
+        processManager.stopProcess(workerId);
+        isClosed = true;
+      }
+    }
+  }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
new file mode 100644
index 0000000..7b89ecc
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import com.google.common.collect.ImmutableList;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.control.ControlClientPool;
+import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link EnvironmentFactory} which forks processes based on the given URL in the Environment.
+ * The returned {@link ProcessEnvironment} has to make sure to stop the processes.
+ */
+public class ProcessEnvironmentFactory implements EnvironmentFactory {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ProcessEnvironmentFactory.class);
+
+  public static ProcessEnvironmentFactory create(
+      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+      ControlClientPool.Source clientSource,
+      IdGenerator idGenerator) {
+    return create(
+        ProcessManager.create(),
+        controlServiceServer,
+        loggingServiceServer,
+        retrievalServiceServer,
+        provisioningServiceServer,
+        clientSource,
+        idGenerator);
+  }
+
+  public static ProcessEnvironmentFactory create(
+      ProcessManager processManager,
+      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+      ControlClientPool.Source clientSource,
+      IdGenerator idGenerator) {
+    return new ProcessEnvironmentFactory(
+        processManager,
+        controlServiceServer,
+        loggingServiceServer,
+        retrievalServiceServer,
+        provisioningServiceServer,
+        idGenerator,
+        clientSource);
+  }
+
+  private final ProcessManager processManager;
+  private final GrpcFnServer<FnApiControlClientPoolService> controlServiceServer;
+  private final GrpcFnServer<GrpcLoggingService> loggingServiceServer;
+  private final GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer;
+  private final GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer;
+  private final IdGenerator idGenerator;
+  private final ControlClientPool.Source clientSource;
+
+  private ProcessEnvironmentFactory(
+      ProcessManager processManager,
+      GrpcFnServer<FnApiControlClientPoolService> controlServiceServer,
+      GrpcFnServer<GrpcLoggingService> loggingServiceServer,
+      GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer,
+      GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer,
+      IdGenerator idGenerator,
+      ControlClientPool.Source clientSource) {
+    this.processManager = processManager;
+    this.controlServiceServer = controlServiceServer;
+    this.loggingServiceServer = loggingServiceServer;
+    this.retrievalServiceServer = retrievalServiceServer;
+    this.provisioningServiceServer = provisioningServiceServer;
+    this.idGenerator = idGenerator;
+    this.clientSource = clientSource;
+  }
+
+  /** Creates a new, active {@link RemoteEnvironment} backed by a forked process. */
+  @Override
+  public RemoteEnvironment createEnvironment(Environment environment) throws Exception {
+    String workerId = idGenerator.getId();
+
+    // TODO The Environment Protobuf message needs to be changed for process environment
+    String executable = environment.getUrl();
+    String loggingEndpoint = loggingServiceServer.getApiServiceDescriptor().getUrl();
+    String artifactEndpoint = retrievalServiceServer.getApiServiceDescriptor().getUrl();
+    String provisionEndpoint = provisioningServiceServer.getApiServiceDescriptor().getUrl();
+    String controlEndpoint = controlServiceServer.getApiServiceDescriptor().getUrl();
+
+    List<String> args =
+        ImmutableList.of(
+            String.format("--id=%s", workerId),
+            String.format("--logging_endpoint=%s", loggingEndpoint),
+            String.format("--artifact_endpoint=%s", artifactEndpoint),
+            String.format("--provision_endpoint=%s", provisionEndpoint),
+            String.format("--control_endpoint=%s", controlEndpoint));
+
+    LOG.debug("Creating Process for worker ID {}", workerId);
+    // Wrap the blocking call to clientSource.get in case an exception is thrown.
+    InstructionRequestHandler instructionHandler = null;
+    try {
+      ProcessManager.RunningProcess process =
+          processManager.startProcess(workerId, executable, args);
+      // Wait on a client from the gRPC server.
+      while (instructionHandler == null) {
+        try {
+          // If the process is not alive anymore, we abort.
+          process.isAliveOrThrow();
+          instructionHandler = clientSource.take(workerId, Duration.ofMinutes(2));
+        } catch (TimeoutException timeoutEx) {
+          LOG.info(
+              "Still waiting for startup of environment '{}' for worker id {}",
+              environment.getUrl(),
+              workerId);
+        } catch (InterruptedException interruptEx) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(interruptEx);
+        }
+      }
+    } catch (Exception e) {
+      try {
+        processManager.stopProcess(workerId);
+      } catch (Exception processKillException) {
+        e.addSuppressed(processKillException);
+      }
+      throw e;
+    }
+
+    return ProcessEnvironment.create(processManager, environment, workerId, instructionHandler);
+  }
+}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
new file mode 100644
index 0000000..8f224fd
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/ProcessManager.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.environment;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.concurrent.ThreadSafe;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A simple process manager which forks processes and kills them if necessary. */
+@ThreadSafe
+public class ProcessManager {
+  private static final Logger LOG = LoggerFactory.getLogger(ProcessManager.class);
+
+  /** For debugging purposes, we inherit I/O of processes. */
+  private static final boolean INHERIT_IO = LOG.isDebugEnabled();
+
+  /** A list of all managers to ensure all processes shutdown on JVM exit . */
+  private static final List<ProcessManager> ALL_PROCESS_MANAGERS = new ArrayList<>();
+
+  static {
+    // Install a shutdown hook to ensure processes are stopped/killed.
+    Runtime.getRuntime().addShutdownHook(ShutdownHook.create());
+  }
+
+  private final Map<String, Process> processes;
+
+  public static ProcessManager create() {
+    synchronized (ALL_PROCESS_MANAGERS) {
+      ProcessManager processManager = new ProcessManager();
+      ALL_PROCESS_MANAGERS.add(processManager);
+      return processManager;
+    }
+  }
+
+  private ProcessManager() {
+    this.processes = Collections.synchronizedMap(new HashMap<>());
+  }
+
+  static class RunningProcess {
+    private Process process;
+
+    RunningProcess(Process process) {
+      this.process = process;
+    }
+
+    /** Checks if the underlying process is still running. */
+    void isAliveOrThrow() throws IllegalStateException {
+      if (!process.isAlive()) {
+        throw new IllegalStateException("Process died with exit code " + process.exitValue());
+      }
+    }
+
+    @VisibleForTesting
+    Process getUnderlyingProcess() {
+      return process;
+    }
+  }
+
+  /**
+   * Forks a process with the given command and arguments.
+   *
+   * @param id A unique id for the process
+   * @param command the name of the executable to run
+   * @param args arguments to provide to the executable
+   * @return A RunningProcess which can be checked for liveness
+   */
+  RunningProcess startProcess(String id, String command, List<String> args) throws IOException {
+    return startProcess(id, command, args, Collections.emptyMap());
+  }
+
+  /**
+   * Forks a process with the given command, arguments, and additional environment variables.
+   *
+   * @param id A unique id for the process
+   * @param command The name of the executable to run
+   * @param args Arguments to provide to the executable
+   * @param env Additional environment variables for the process to be forked
+   * @return A RunningProcess which can be checked for liveness
+   */
+  public RunningProcess startProcess(
+      String id, String command, List<String> args, Map<String, String> env) throws IOException {
+    checkNotNull(id, "Process id must not be null");
+    checkNotNull(command, "Command must not be null");
+    checkNotNull(args, "Process args must not be null");
+    checkNotNull(env, "Environment map must not be null");
+
+    ProcessBuilder pb =
+        new ProcessBuilder(ImmutableList.<String>builder().add(command).addAll(args).build());
+    pb.environment().putAll(env);
+
+    if (INHERIT_IO) {
+      LOG.debug(
+          "==> DEBUG enabled: Inheriting stdout/stderr of process (adjustable in ProcessManager)");
+      pb.inheritIO();
+    } else {
+      pb.redirectErrorStream(true);
+      // Pipe stdout and stderr to /dev/null to avoid blocking the process due to filled PIPE buffer
+      if (System.getProperty("os.name", "").startsWith("Windows")) {
+        pb.redirectOutput(new File("nul"));
+      } else {
+        pb.redirectOutput(new File("/dev/null"));
+      }
+    }
+
+    LOG.debug("Attempting to start process with command: {}", pb.command());
+    Process newProcess = pb.start();
+    Process oldProcess = processes.put(id, newProcess);
+    if (oldProcess != null) {
+      stopProcess(id, oldProcess);
+      stopProcess(id, newProcess);
+      throw new IllegalStateException("There was already a process running with id " + id);
+    }
+
+    return new RunningProcess(newProcess);
+  }
+
+  /** Stops a previously started process identified by its unique id. */
+  public void stopProcess(String id) {
+    checkNotNull(id, "Process id must not be null");
+    Process process = checkNotNull(processes.remove(id), "Process for id does not exist: " + id);
+    stopProcess(id, process);
+  }
+
+  private void stopProcess(String id, Process process) {
+    if (process.isAlive()) {
+      LOG.debug("Attempting to stop process with id {}", id);
+      // first try to kill gracefully
+      process.destroy();
+      long maxTimeToWait = 2000;
+      if (waitForProcessToDie(process, maxTimeToWait)) {
+        LOG.debug("Process for worker {} shut down gracefully.", id);
+      } else {
+        LOG.info("Process for worker {} still running. Killing.", id);
+        process.destroyForcibly();
+        if (waitForProcessToDie(process, maxTimeToWait)) {
+          LOG.debug("Process for worker {} killed.", id);
+        } else {
+          LOG.warn("Process for worker {} could not be killed.", id);
+        }
+      }
+    }
+  }
+
+  /** Returns true if the process exists within maxWaitTimeMillis. */
+  private static boolean waitForProcessToDie(Process process, long maxWaitTimeMillis) {
+    final long startTime = System.currentTimeMillis();
+    while (process.isAlive() && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException("Interrupted while waiting on process", e);
+      }
+    }
+    return !process.isAlive();
+  }
+
+  private static class ShutdownHook extends Thread {
+
+    private static ShutdownHook create() {
+      return new ShutdownHook();
+    }
+
+    private ShutdownHook() {}
+
+    @Override
+    @SuppressFBWarnings("SWL_SLEEP_WITH_LOCK_HELD")
+    public void run() {
+      synchronized (ALL_PROCESS_MANAGERS) {
+        ALL_PROCESS_MANAGERS.forEach(ProcessManager::stopAllProcesses);
+        for (ProcessManager pm : ALL_PROCESS_MANAGERS) {
+          if (pm.processes.values().stream().anyMatch(Process::isAlive)) {
+            try {
+              // Graceful shutdown period
+              Thread.sleep(200);
+              break;
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new RuntimeException(e);
+            }
+          }
+        }
+        ALL_PROCESS_MANAGERS.forEach(ProcessManager::killAllProcesses);
+      }
+    }
+  }
+
+  /** Stop all remaining processes gracefully, i.e. upon JVM shutdown */
+  private void stopAllProcesses() {
+    processes.forEach((id, process) -> process.destroy());
+  }
+
+  /** Kill all remaining processes forcibly, i.e. upon JVM shutdown */
+  private void killAllProcesses() {
+    processes.forEach((id, process) -> process.destroyForcibly());
+  }
+}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactoryTest.java
new file mode 100644
index 0000000..99f94dd
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/ProcessJobBundleFactoryTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.control;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Coder;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Components;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.model.pipeline.v1.RunnerApi.ExecutableStagePayload;
+import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PCollection;
+import org.apache.beam.model.pipeline.v1.RunnerApi.SdkFunctionSpec;
+import org.apache.beam.model.pipeline.v1.RunnerApi.WindowingStrategy;
+import org.apache.beam.runners.core.construction.ModelCoders;
+import org.apache.beam.runners.core.construction.graph.ExecutableStage;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.ServerFactory;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory;
+import org.apache.beam.runners.fnexecution.environment.RemoteEnvironment;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link ProcessEnvironmentFactory}. */
+@RunWith(JUnit4.class)
+public class ProcessJobBundleFactoryTest {
+
+  @Mock private ProcessEnvironmentFactory envFactory;
+  @Mock private RemoteEnvironment remoteEnvironment;
+  @Mock private InstructionRequestHandler instructionHandler;
+  @Mock private ServerFactory serverFactory;
+  @Mock GrpcFnServer<FnApiControlClientPoolService> controlServer;
+  @Mock GrpcFnServer<GrpcLoggingService> loggingServer;
+  @Mock GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+  @Mock GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
+
+  private final Environment environment = Environment.newBuilder().setUrl("env-url").build();
+  private final IdGenerator stageIdGenerator = IdGenerators.incrementingLongs();
+  private final InstructionResponse instructionResponse =
+      InstructionResponse.newBuilder().setInstructionId("instruction-id").build();
+
+  @Before
+  public void setUpMocks() throws Exception {
+    MockitoAnnotations.initMocks(this);
+    when(envFactory.createEnvironment(environment)).thenReturn(remoteEnvironment);
+    when(remoteEnvironment.getInstructionRequestHandler()).thenReturn(instructionHandler);
+    when(instructionHandler.handle(any()))
+        .thenReturn(CompletableFuture.completedFuture(instructionResponse));
+  }
+
+  @Test
+  public void createsCorrectEnvironment() throws Exception {
+    try (ProcessJobBundleFactory bundleFactory =
+        new ProcessJobBundleFactory(
+            envFactory,
+            serverFactory,
+            stageIdGenerator,
+            controlServer,
+            loggingServer,
+            retrievalServer,
+            provisioningServer)) {
+      bundleFactory.forStage(getExecutableStage(environment));
+      verify(envFactory).createEnvironment(environment);
+    }
+  }
+
+  @Test
+  public void closesEnvironmentOnCleanup() throws Exception {
+    ProcessJobBundleFactory bundleFactory =
+        new ProcessJobBundleFactory(
+            envFactory,
+            serverFactory,
+            stageIdGenerator,
+            controlServer,
+            loggingServer,
+            retrievalServer,
+            provisioningServer);
+    try (AutoCloseable unused = bundleFactory) {
+      bundleFactory.forStage(getExecutableStage(environment));
+    }
+    verify(remoteEnvironment).close();
+  }
+
+  @Test
+  public void cachesEnvironment() throws Exception {
+    try (ProcessJobBundleFactory bundleFactory =
+        new ProcessJobBundleFactory(
+            envFactory,
+            serverFactory,
+            stageIdGenerator,
+            controlServer,
+            loggingServer,
+            retrievalServer,
+            provisioningServer)) {
+      StageBundleFactory bf1 = bundleFactory.forStage(getExecutableStage(environment));
+      StageBundleFactory bf2 = bundleFactory.forStage(getExecutableStage(environment));
+      // NOTE: We hang on to stage bundle references to ensure their underlying environments are not
+      // garbage collected. For additional safety, we print the factories to ensure the referernces
+      // are not optimized away.
+      System.out.println("bundle factory 1:" + bf1);
+      System.out.println("bundle factory 1:" + bf2);
+      verify(envFactory).createEnvironment(environment);
+      verifyNoMoreInteractions(envFactory);
+    }
+  }
+
+  @Test
+  public void doesNotCacheDifferentEnvironments() throws Exception {
+    Environment envFoo = Environment.newBuilder().setUrl("foo-env-url").build();
+    RemoteEnvironment remoteEnvFoo = mock(RemoteEnvironment.class);
+    InstructionRequestHandler fooInstructionHandler = mock(InstructionRequestHandler.class);
+    when(envFactory.createEnvironment(envFoo)).thenReturn(remoteEnvFoo);
+    when(remoteEnvFoo.getInstructionRequestHandler()).thenReturn(fooInstructionHandler);
+    // Don't bother creating a distinct instruction response because we don't use it here.
+    when(fooInstructionHandler.handle(any()))
+        .thenReturn(CompletableFuture.completedFuture(instructionResponse));
+
+    try (ProcessJobBundleFactory bundleFactory =
+        new ProcessJobBundleFactory(
+            envFactory,
+            serverFactory,
+            stageIdGenerator,
+            controlServer,
+            loggingServer,
+            retrievalServer,
+            provisioningServer)) {
+      bundleFactory.forStage(getExecutableStage(environment));
+      bundleFactory.forStage(getExecutableStage(envFoo));
+      verify(envFactory).createEnvironment(environment);
+      verify(envFactory).createEnvironment(envFoo);
+      verifyNoMoreInteractions(envFactory);
+    }
+  }
+
+  private static ExecutableStage getExecutableStage(Environment environment) {
+    return ExecutableStage.fromPayload(
+        ExecutableStagePayload.newBuilder()
+            .setInput("input-pc")
+            .setEnvironment(environment)
+            .setComponents(
+                Components.newBuilder()
+                    .putPcollections(
+                        "input-pc",
+                        PCollection.newBuilder()
+                            .setWindowingStrategyId("windowing-strategy")
+                            .setCoderId("coder-id")
+                            .build())
+                    .putWindowingStrategies(
+                        "windowing-strategy",
+                        WindowingStrategy.newBuilder().setWindowCoderId("coder-id").build())
+                    .putCoders(
+                        "coder-id",
+                        Coder.newBuilder()
+                            .setSpec(
+                                SdkFunctionSpec.newBuilder()
+                                    .setSpec(
+                                        FunctionSpec.newBuilder()
+                                            .setUrn(ModelCoders.INTERVAL_WINDOW_CODER_URN)
+                                            .build())
+                                    .build())
+                            .build())
+                    .build())
+            .build());
+  }
+}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactoryTest.java
new file mode 100644
index 0000000..b60dcda
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentFactoryTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.environment;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService;
+import org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.provisioning.StaticGrpcProvisionService;
+import org.apache.beam.sdk.fn.IdGenerator;
+import org.apache.beam.sdk.fn.IdGenerators;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link ProcessEnvironmentFactory}. */
+@RunWith(JUnit4.class)
+public class ProcessEnvironmentFactoryTest {
+
+  private static final ApiServiceDescriptor SERVICE_DESCRIPTOR =
+      ApiServiceDescriptor.newBuilder().setUrl("service-url").build();
+  private static final String COMMAND = "my-command";
+  private static final Environment ENVIRONMENT = Environment.newBuilder().setUrl(COMMAND).build();
+
+  private static final InspectibleIdGenerator ID_GENERATOR = new InspectibleIdGenerator();
+
+  @Mock private ProcessManager processManager;
+
+  @Mock private GrpcFnServer<FnApiControlClientPoolService> controlServiceServer;
+  @Mock private GrpcFnServer<GrpcLoggingService> loggingServiceServer;
+  @Mock private GrpcFnServer<ArtifactRetrievalService> retrievalServiceServer;
+  @Mock private GrpcFnServer<StaticGrpcProvisionService> provisioningServiceServer;
+
+  @Mock private InstructionRequestHandler client;
+  private ProcessEnvironmentFactory factory;
+
+  @Before
+  public void initMocks() throws IOException {
+    MockitoAnnotations.initMocks(this);
+
+    when(processManager.startProcess(anyString(), anyString(), anyList()))
+        .thenReturn(Mockito.mock(ProcessManager.RunningProcess.class));
+    when(controlServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+    when(loggingServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+    when(retrievalServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+    when(provisioningServiceServer.getApiServiceDescriptor()).thenReturn(SERVICE_DESCRIPTOR);
+    factory =
+        ProcessEnvironmentFactory.create(
+            processManager,
+            controlServiceServer,
+            loggingServiceServer,
+            retrievalServiceServer,
+            provisioningServiceServer,
+            (workerId, timeout) -> client,
+            ID_GENERATOR);
+  }
+
+  @Test
+  public void createsCorrectEnvironment() throws Exception {
+    RemoteEnvironment handle = factory.createEnvironment(ENVIRONMENT);
+    assertThat(handle.getInstructionRequestHandler(), is(client));
+    assertThat(handle.getEnvironment(), equalTo(ENVIRONMENT));
+    Mockito.verify(processManager).startProcess(eq(ID_GENERATOR.currentId), anyString(), anyList());
+  }
+
+  @Test
+  public void destroysCorrectContainer() throws Exception {
+    RemoteEnvironment handle = factory.createEnvironment(ENVIRONMENT);
+    handle.close();
+    verify(processManager).stopProcess(ID_GENERATOR.currentId);
+  }
+
+  @Test
+  public void createsMultipleEnvironments() throws Exception {
+    Environment fooEnv = Environment.newBuilder().setUrl("foo").build();
+    RemoteEnvironment fooHandle = factory.createEnvironment(fooEnv);
+    assertThat(fooHandle.getEnvironment(), is(equalTo(fooEnv)));
+
+    Environment barEnv = Environment.newBuilder().setUrl("bar").build();
+    RemoteEnvironment barHandle = factory.createEnvironment(barEnv);
+    assertThat(barHandle.getEnvironment(), is(equalTo(barEnv)));
+  }
+
+  private static class InspectibleIdGenerator implements IdGenerator {
+
+    private IdGenerator generator = IdGenerators.incrementingLongs();
+    String currentId;
+
+    @Override
+    public String getId() {
+      currentId = generator.getId();
+      return currentId;
+    }
+  }
+}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentTest.java
new file mode 100644
index 0000000..26a8328
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessEnvironmentTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.environment;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ProcessEnvironment}. */
+@RunWith(JUnit4.class)
+public class ProcessEnvironmentTest {
+
+  @Test
+  public void closeClosesInstructionRequestHandler() throws Exception {
+    InstructionRequestHandler handler = mock(InstructionRequestHandler.class);
+    RemoteEnvironment env =
+        ProcessEnvironment.create(
+            mock(ProcessManager.class), Environment.getDefaultInstance(), "1", handler);
+
+    env.close();
+    verify(handler).close();
+  }
+}
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessManagerTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessManagerTest.java
new file mode 100644
index 0000000..cdcb53e
--- /dev/null
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/environment/ProcessManagerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution.environment;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link ProcessManager}. */
+@RunWith(JUnit4.class)
+public class ProcessManagerTest {
+
+  @Test
+  public void testRunSimpleCommand() throws IOException {
+    ProcessManager processManager = ProcessManager.create();
+    processManager.startProcess("1", "bash", Collections.emptyList());
+    processManager.stopProcess("1");
+    processManager.startProcess("2", "bash", Arrays.asList("-c", "ls"));
+    processManager.stopProcess("2");
+    processManager.startProcess("1", "bash", Arrays.asList("-c", "ls", "-l", "-a"));
+    processManager.stopProcess("1");
+  }
+
+  @Test
+  public void testRunInvalidExecutable() throws IOException {
+    ProcessManager processManager = ProcessManager.create();
+    try {
+      processManager.startProcess("1", "asfasfls", Collections.emptyList());
+      fail();
+    } catch (IOException e) {
+      assertThat(e.getMessage(), containsString("Cannot run program \"asfasfls\""));
+    }
+  }
+
+  @Test
+  public void testDuplicateId() throws IOException {
+    ProcessManager processManager = ProcessManager.create();
+    processManager.startProcess("1", "bash", Arrays.asList("-c", "ls"));
+    try {
+      processManager.startProcess("1", "bash", Arrays.asList("-c", "ls"));
+      fail();
+    } catch (IllegalStateException e) {
+      // this is what we want
+    } finally {
+      processManager.stopProcess("1");
+    }
+  }
+
+  @Test
+  public void testLivenessCheck() throws IOException {
+    ProcessManager processManager = ProcessManager.create();
+    ProcessManager.RunningProcess process =
+        processManager.startProcess("1", "bash", Arrays.asList("-c", "sleep", "1000"));
+    process.isAliveOrThrow();
+    processManager.stopProcess("1");
+    try {
+      process.isAliveOrThrow();
+      fail();
+    } catch (IllegalStateException e) {
+      // this is what we want
+    }
+  }
+
+  @Test
+  public void testEnvironmentVariables() throws IOException, InterruptedException {
+    ProcessManager processManager = ProcessManager.create();
+    ProcessManager.RunningProcess process =
+        processManager.startProcess(
+            "1",
+            "bash",
+            Arrays.asList("-c", "sleep $PARAM"),
+            Collections.singletonMap("PARAM", "-h"));
+    for (int i = 0; i < 10 && process.getUnderlyingProcess().isAlive(); i++) {
+      Thread.sleep(100);
+    }
+    assertThat(process.getUnderlyingProcess().exitValue(), is(1));
+    processManager.stopProcess("1");
+  }
+}