You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/12/03 22:38:22 UTC

[beam] branch master updated: [BEAM-6160] Use service server rather than service (#7168)

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 78c1a10  [BEAM-6160] Use service server rather than service (#7168)
78c1a10 is described below

commit 78c1a105aa44fb892b155f3f336c262491e76362
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Mon Dec 3 14:38:14 2018 -0800

    [BEAM-6160] Use service server rather than service (#7168)
---
 .../dataflow/worker/BatchDataflowWorker.java       |   4 +-
 .../worker/BeamFnMapTaskExecutorFactory.java       |  13 +-
 .../worker/DataflowMapTaskExecutorFactory.java     |   9 +-
 .../dataflow/worker/DataflowRunnerHarness.java     |  21 +-
 .../worker/IntrinsicMapTaskExecutorFactory.java    |   9 +-
 .../dataflow/worker/SdkHarnessRegistries.java      |  15 +-
 .../dataflow/worker/SdkHarnessRegistry.java        |   9 +-
 .../dataflow/worker/StreamingDataflowWorker.java   |   4 +-
 .../runners/dataflow/worker/fn/ServerFactory.java  | 229 -------------------
 .../worker/fn/data/BeamFnDataGrpcService.java      |  78 ++++---
 .../worker/fn/BeamFnControlServiceTest.java        |   7 +-
 .../dataflow/worker/fn/ServerFactoryTest.java      | 244 ---------------------
 .../beam/runners/fnexecution/GrpcFnServer.java     |  17 +-
 .../fnexecution/InProcessServerFactory.java        |  31 ++-
 .../beam/runners/fnexecution/ServerFactory.java    | 184 +++++++++++++---
 .../GrpcContextHeaderAccessorProviderTest.java     |   4 +-
 .../runners/fnexecution/ServerFactoryTest.java     |  40 +++-
 17 files changed, 333 insertions(+), 585 deletions(-)

diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index b103dce..2666dea 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -336,9 +336,9 @@ public class BatchDataflowWorker implements Closeable {
         worker =
             mapTaskExecutorFactory.create(
                 sdkWorkerHarness.getControlClientHandler(),
-                sdkWorkerHarness.getDataService(),
+                sdkWorkerHarness.getGrpcDataFnServer(),
                 sdkHarnessRegistry.beamFnDataApiServiceDescriptor(),
-                sdkWorkerHarness.getStateService(),
+                sdkWorkerHarness.getGrpcStateFnServer(),
                 network,
                 options,
                 stageName,
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 067b221..8e191e2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -55,6 +55,7 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
 import org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor;
 import org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation;
+import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
 import org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation;
 import org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
@@ -82,8 +83,10 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.runners.fnexecution.state.StateDelegator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -115,9 +118,9 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
   @Override
   public DataflowMapTaskExecutor create(
       InstructionRequestHandler instructionRequestHandler,
-      FnDataService beamFnDataService,
+      GrpcFnServer<BeamFnDataGrpcService.DataService> grpcDataFnServer,
       Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
-      StateDelegator beamFnStateDelegator,
+      GrpcFnServer<GrpcStateService> grpcStateFnServer,
       MutableNetwork<Node, Edge> network,
       PipelineOptions options,
       String stageName,
@@ -143,7 +146,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
         createOperationTransformForRegisterFnNodes(
             idGenerator,
             instructionRequestHandler,
-            beamFnStateDelegator,
+            grpcStateFnServer.getService(),
             stageName,
             executionContext));
 
@@ -153,7 +156,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
         network,
         createOperationTransformForGrpcPortNodes(
             network,
-            beamFnDataService,
+            grpcDataFnServer.getService(),
             // TODO: Set NameContext properly for these operations.
             executionContext.createOperationContext(
                 NameContext.create(stageName, stageName, stageName, stageName))));
@@ -165,7 +168,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
             network,
             idGenerator,
             instructionRequestHandler,
-            beamFnDataService,
+            grpcDataFnServer.getService(),
             dataApiServiceDescriptor,
             executionContext,
             stageName));
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
index 56b2cbc..b6bcb5a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
@@ -22,11 +22,12 @@ import com.google.common.graph.MutableNetwork;
 import java.util.function.Supplier;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
+import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
 import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
-import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.options.PipelineOptions;
 
 /** Creates a {@link DataflowMapTaskExecutor} from a {@link MapTask} definition. */
@@ -38,9 +39,9 @@ public interface DataflowMapTaskExecutorFactory {
    */
   DataflowMapTaskExecutor create(
       InstructionRequestHandler instructionRequestHandler,
-      FnDataService beamFnDataService,
+      GrpcFnServer<BeamFnDataGrpcService.DataService> grpcDataFnServer,
       Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
-      StateDelegator beamFnStateDelegator,
+      GrpcFnServer<GrpcStateService> grpcStateFnServer,
       MutableNetwork<Node, Edge> network,
       PipelineOptions options,
       String stageName,
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
index c4f3736..8fb610b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
@@ -26,14 +26,16 @@ import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
 import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
 import org.apache.beam.runners.dataflow.worker.fn.BeamFnControlService;
-import org.apache.beam.runners.dataflow.worker.fn.ServerFactory;
 import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
 import org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingService;
 import org.apache.beam.runners.dataflow.worker.fn.stream.ServerStreamObserverFactory;
 import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
 import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.io.FileSystems;
@@ -76,7 +78,16 @@ public class DataflowRunnerHarness {
     // Initialized registered file systems.˜
     FileSystems.setDefaultPipelineOptions(pipelineOptions);
 
-    ServerFactory serverFactory = ServerFactory.fromOptions(pipelineOptions);
+    DataflowPipelineDebugOptions dataflowOptions =
+        pipelineOptions.as(DataflowPipelineDebugOptions.class);
+    ServerFactory serverFactory;
+    if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api_epoll_domain_socket")) {
+      serverFactory = ServerFactory.createEpollDomainSocket();
+    } else if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api_epoll")) {
+      serverFactory = ServerFactory.createEpollSocket();
+    } else {
+      serverFactory = ServerFactory.createDefault();
+    }
     ServerStreamObserverFactory streamObserverFactory =
         ServerStreamObserverFactory.fromOptions(pipelineOptions);
 
@@ -103,11 +114,11 @@ public class DataflowRunnerHarness {
 
       servicesServer =
           serverFactory.create(
-              controlApiService,
-              ImmutableList.of(beamFnControlService, beamFnDataService, beamFnStateService));
+              ImmutableList.of(beamFnControlService, beamFnDataService, beamFnStateService),
+              controlApiService);
 
       loggingServer =
-          serverFactory.create(loggingApiService, ImmutableList.of(beamFnLoggingService));
+          serverFactory.create(ImmutableList.of(beamFnLoggingService), loggingApiService);
 
       start(
           pipeline,
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
index 2bd0c1e..e632719b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
@@ -47,6 +47,7 @@ import org.apache.beam.runners.dataflow.util.CloudObjects;
 import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
 import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
 import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService.DataService;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
 import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
 import org.apache.beam.runners.dataflow.worker.graph.Networks;
@@ -69,9 +70,9 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
-import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -98,9 +99,9 @@ public class IntrinsicMapTaskExecutorFactory implements DataflowMapTaskExecutorF
   @Override
   public DataflowMapTaskExecutor create(
       InstructionRequestHandler instructionRequestHandler,
-      FnDataService beamFnDataService,
+      GrpcFnServer<DataService> grpcDataFnServer,
       Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
-      StateDelegator beamFnStateDelegator,
+      GrpcFnServer<GrpcStateService> grpcStateFnServer,
       MutableNetwork<Node, Edge> network,
       PipelineOptions options,
       String stageName,
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
index 3e3284e..a0d54ea 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
@@ -26,8 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -187,14 +187,15 @@ public class SdkHarnessRegistries {
 
       @Override
       @Nullable
-      public FnDataService getDataService() {
-        return beamFnDataGrpcService.getDataService(getWorkerId());
+      public GrpcFnServer<BeamFnDataGrpcService.DataService> getGrpcDataFnServer() {
+        return GrpcFnServer.create(
+            beamFnDataGrpcService.getDataService(getWorkerId()), beamFnDataApiServiceDescriptor());
       }
 
       @Override
       @Nullable
-      public GrpcStateService getStateService() {
-        return beamFnStateService;
+      public GrpcFnServer<GrpcStateService> getGrpcStateFnServer() {
+        return GrpcFnServer.create(beamFnStateService, beamFnDataApiServiceDescriptor());
       }
     }
   }
@@ -228,13 +229,13 @@ public class SdkHarnessRegistries {
 
           @Nullable
           @Override
-          public FnDataService getDataService() {
+          public GrpcFnServer<BeamFnDataGrpcService.DataService> getGrpcDataFnServer() {
             return null;
           }
 
           @Nullable
           @Override
-          public GrpcStateService getStateService() {
+          public GrpcFnServer<GrpcStateService> getGrpcStateFnServer() {
             return null;
           }
         };
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
index 8c11d14..383d292 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
@@ -19,9 +19,10 @@ package org.apache.beam.runners.dataflow.worker;
 
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
 import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
-import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
 
 /** Registry used to manage all the connections (Control, Data, State) from SdkHarness */
 public interface SdkHarnessRegistry {
@@ -60,9 +61,9 @@ public interface SdkHarnessRegistry {
     public String getWorkerId();
 
     @Nullable
-    public FnDataService getDataService();
+    public GrpcFnServer<BeamFnDataGrpcService.DataService> getGrpcDataFnServer();
 
     @Nullable
-    public StateDelegator getStateService();
+    public GrpcFnServer<GrpcStateService> getGrpcStateFnServer();
   }
 }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 070973b..80556a8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1143,9 +1143,9 @@ public class StreamingDataflowWorker {
         DataflowMapTaskExecutor mapTaskExecutor =
             mapTaskExecutorFactory.create(
                 worker.getControlClientHandler(),
-                worker.getDataService(),
+                worker.getGrpcDataFnServer(),
                 sdkHarnessRegistry.beamFnDataApiServiceDescriptor(),
-                worker.getStateService(),
+                worker.getGrpcStateFnServer(),
                 mapTaskNetwork,
                 options,
                 mapTask.getStageName(),
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactory.java
deleted file mode 100644
index 616ce0f..0000000
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactory.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.fn;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.net.HostAndPort;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
-import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.BindableService;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerBuilder;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerInterceptors;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.netty.NettyServerBuilder;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollEventLoopGroup;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerDomainSocketChannel;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerSocketChannel;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.unix.DomainSocketAddress;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.util.internal.ThreadLocalRandom;
-
-/**
- * A {@link Server gRPC Server} factory that returns a server based upon {@link PipelineOptions}
- * experiments. <br>
- * TODO: Kill {@link ServerFactory} instead use {@link
- * org.apache.beam.runners.fnexecution.ServerFactory}.
- */
-@Deprecated
-public abstract class ServerFactory {
-  public static ServerFactory fromOptions(PipelineOptions options) {
-    DataflowPipelineDebugOptions dataflowOptions = options.as(DataflowPipelineDebugOptions.class);
-    if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api_epoll_domain_socket")) {
-      return new EpollDomainSocket();
-    } else if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api_epoll")) {
-      return new EpollSocket();
-    }
-    return new Default();
-  }
-
-  /**
-   * Allocates a port for a server using an ephemeral port chosen automatically. The chosen port is
-   * accessible to the caller from the URL set in the input {@link
-   * Endpoints.ApiServiceDescriptor.Builder}. Server applies {@link
-   * GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
-   */
-  public abstract Server allocatePortAndCreate(
-      Endpoints.ApiServiceDescriptor.Builder builder, List<BindableService> services)
-      throws IOException;
-
-  /**
-   * Creates an instance of this server at the address specified by the given service descriptor.
-   * Server applies {@link GrpcContextHeaderAccessorProvider#interceptor()} to all incoming
-   * requests.
-   */
-  public abstract Server create(
-      Endpoints.ApiServiceDescriptor serviceDescriptor, List<BindableService> services)
-      throws IOException;
-
-  /**
-   * Creates a {@link Server gRPC Server} using a Unix domain socket. Note that this requires <a
-   * href="http://netty.io/wiki/forked-tomcat-native.html">Netty TcNative</a> available to be able
-   * to provide a {@link EpollServerDomainSocketChannel}.
-   *
-   * <p>The unix domain socket is located at ${java.io.tmpdir}/fnapi${random[0-10000)}.sock
-   */
-  private static class EpollDomainSocket extends ServerFactory {
-    private static File getFileForPort(int port) {
-      return new File(System.getProperty("java.io.tmpdir"), String.format("fnapi%d.sock", port));
-    }
-
-    @Override
-    public Server allocatePortAndCreate(
-        Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor, List<BindableService> services)
-        throws IOException {
-      File tmp;
-      do {
-        tmp = getFileForPort(ThreadLocalRandom.current().nextInt(10000));
-      } while (tmp.exists());
-      apiServiceDescriptor.setUrl("unix://" + tmp.getAbsolutePath());
-      return create(apiServiceDescriptor.build(), services);
-    }
-
-    @Override
-    public Server create(
-        Endpoints.ApiServiceDescriptor serviceDescriptor, List<BindableService> services)
-        throws IOException {
-      SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
-      checkArgument(
-          socketAddress instanceof DomainSocketAddress,
-          "%s requires a Unix domain socket address, got %s",
-          EpollDomainSocket.class.getSimpleName(),
-          serviceDescriptor.getUrl());
-      return createServer((DomainSocketAddress) socketAddress, services);
-    }
-
-    private static Server createServer(
-        DomainSocketAddress domainSocket, List<BindableService> services) throws IOException {
-      NettyServerBuilder builder =
-          NettyServerBuilder.forAddress(domainSocket)
-              .channelType(EpollServerDomainSocketChannel.class)
-              .workerEventLoopGroup(new EpollEventLoopGroup())
-              .bossEventLoopGroup(new EpollEventLoopGroup())
-              .maxMessageSize(Integer.MAX_VALUE);
-      for (BindableService service : services) {
-        // Wrap the service to extract headers
-        builder.addService(
-            ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()));
-      }
-      return builder.build().start();
-    }
-  }
-
-  /**
-   * Creates a {@link Server gRPC Server} using an Epoll socket. Note that this requires <a
-   * href="http://netty.io/wiki/forked-tomcat-native.html">Netty TcNative</a> available to be able
-   * to provide a {@link EpollServerSocketChannel}.
-   *
-   * <p>The server is created listening any open port on "localhost".
-   */
-  private static class EpollSocket extends ServerFactory {
-    @Override
-    public Server allocatePortAndCreate(
-        Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor, List<BindableService> services)
-        throws IOException {
-      InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
-      Server server = createServer(address, services);
-      apiServiceDescriptor.setUrl(
-          HostAndPort.fromParts(address.getHostName(), server.getPort()).toString());
-      return server;
-    }
-
-    @Override
-    public Server create(
-        Endpoints.ApiServiceDescriptor serviceDescriptor, List<BindableService> services)
-        throws IOException {
-      SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
-      checkArgument(
-          socketAddress instanceof InetSocketAddress,
-          "%s requires a host:port socket address, got %s",
-          EpollSocket.class.getSimpleName(),
-          serviceDescriptor.getUrl());
-      return createServer((InetSocketAddress) socketAddress, services);
-    }
-
-    private static Server createServer(InetSocketAddress socket, List<BindableService> services)
-        throws IOException {
-      ServerBuilder builder =
-          NettyServerBuilder.forAddress(socket)
-              .channelType(EpollServerSocketChannel.class)
-              .workerEventLoopGroup(new EpollEventLoopGroup())
-              .bossEventLoopGroup(new EpollEventLoopGroup())
-              .maxMessageSize(Integer.MAX_VALUE);
-      for (BindableService service : services) {
-        // Wrap the service to extract headers
-        builder.addService(
-            ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()));
-      }
-      return builder.build().start();
-    }
-  }
-
-  /**
-   * Creates a {@link Server gRPC Server} using the default server factory.
-   *
-   * <p>The server is created listening any open port on "localhost".
-   */
-  private static class Default extends ServerFactory {
-    @Override
-    public Server allocatePortAndCreate(
-        Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor, List<BindableService> services)
-        throws IOException {
-      InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
-      Server server = createServer(address, services);
-      apiServiceDescriptor.setUrl(
-          HostAndPort.fromParts(address.getHostName(), server.getPort()).toString());
-      return server;
-    }
-
-    @Override
-    public Server create(
-        Endpoints.ApiServiceDescriptor serviceDescriptor, List<BindableService> services)
-        throws IOException {
-      SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
-      checkArgument(
-          socketAddress instanceof InetSocketAddress,
-          "Default ServerFactory requires a host:port socket address, got %s",
-          serviceDescriptor.getUrl());
-      return createServer((InetSocketAddress) socketAddress, services);
-    }
-
-    private static Server createServer(InetSocketAddress socket, List<BindableService> services)
-        throws IOException {
-      NettyServerBuilder builder =
-          NettyServerBuilder.forPort(socket.getPort())
-              // Set the message size to max value here. The actual size is governed by the
-              // buffer size in the layers above.
-              .maxMessageSize(Integer.MAX_VALUE);
-      for (BindableService service : services) {
-        // Wrap the service to extract headers
-        builder.addService(
-            ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()));
-      }
-      return builder.build().start();
-    }
-  }
-}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
index be295f9..d81b902 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
@@ -32,6 +32,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
 import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.dataflow.worker.fn.grpc.BeamFnService;
+import org.apache.beam.runners.fnexecution.FnService;
 import org.apache.beam.runners.fnexecution.HeaderAccessor;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
 import org.apache.beam.sdk.coders.Coder;
@@ -204,42 +205,55 @@ public class BeamFnDataGrpcService extends BeamFnDataGrpc.BeamFnDataImplBase
     }
   }
 
-  /** Get the DataService for the clientId */
-  public FnDataService getDataService(final String clientId) {
-    return new FnDataService() {
-      @Override
-      public <T> InboundDataClient receive(
-          LogicalEndpoint inputLocation,
-          Coder<WindowedValue<T>> coder,
-          FnDataReceiver<WindowedValue<T>> consumer) {
-        LOG.debug("Registering consumer for {}", inputLocation);
+  // A wrapper class
+  public class DataService extends BeamFnDataGrpc.BeamFnDataImplBase
+      implements FnDataService, FnService {
+    private final String clientId;
 
-        return new DeferredInboundDataClient(clientId, inputLocation, coder, consumer);
-      }
+    public DataService(String clientId) {
+      this.clientId = clientId;
+    }
 
-      @Override
-      public <T> CloseableFnDataReceiver<WindowedValue<T>> send(
-          LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) {
-        LOG.debug("Creating output consumer for {}", outputLocation);
-        try {
-          if (outboundBufferLimit.isPresent()) {
-            return BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(
-                outboundBufferLimit.get(),
-                outputLocation,
-                coder,
-                getClientFuture(clientId).get().getOutboundObserver());
-          } else {
-            return BeamFnDataBufferingOutboundObserver.forLocation(
-                outputLocation, coder, getClientFuture(clientId).get().getOutboundObserver());
-          }
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          throw new RuntimeException(e);
+    @Override
+    public <T> InboundDataClient receive(
+        LogicalEndpoint inputLocation,
+        Coder<WindowedValue<T>> coder,
+        FnDataReceiver<WindowedValue<T>> consumer) {
+      LOG.debug("Registering consumer for {}", inputLocation);
+
+      return new DeferredInboundDataClient(this.clientId, inputLocation, coder, consumer);
+    }
+
+    @Override
+    public <T> CloseableFnDataReceiver<WindowedValue<T>> send(
+        LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) {
+      LOG.debug("Creating output consumer for {}", outputLocation);
+      try {
+        if (outboundBufferLimit.isPresent()) {
+          return BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(
+              outboundBufferLimit.get(),
+              outputLocation,
+              coder,
+              getClientFuture(this.clientId).get().getOutboundObserver());
+        } else {
+          return BeamFnDataBufferingOutboundObserver.forLocation(
+              outputLocation, coder, getClientFuture(this.clientId).get().getOutboundObserver());
         }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      } catch (ExecutionException e) {
+        throw new RuntimeException(e);
       }
-    };
+    }
+
+    @Override
+    public void close() throws Exception {}
+  }
+
+  /** Get the DataService for the clientId */
+  public DataService getDataService(final String clientId) {
+    return new DataService(clientId);
   }
 
   @Override
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
index 4620ecb..b660732 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.runners.dataflow.worker.fn.stream.ServerStreamObserverFactory;
 import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.ServerFactory;
 import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -73,8 +74,7 @@ public class BeamFnControlServiceTest {
             descriptor,
             ServerStreamObserverFactory.fromOptions(options)::from,
             GrpcContextHeaderAccessorProvider.getHeaderAccessor());
-    Server server =
-        ServerFactory.fromOptions(options).create(descriptor, ImmutableList.of(service));
+    Server server = ServerFactory.createDefault().create(ImmutableList.of(service), descriptor);
     String url = service.getApiServiceDescriptor().getUrl();
     BeamFnControlGrpc.BeamFnControlStub clientStub =
         BeamFnControlGrpc.newStub(ManagedChannelBuilder.forTarget(url).usePlaintext(true).build());
@@ -102,8 +102,7 @@ public class BeamFnControlServiceTest {
             descriptor,
             ServerStreamObserverFactory.fromOptions(options)::from,
             GrpcContextHeaderAccessorProvider.getHeaderAccessor());
-    Server server =
-        ServerFactory.fromOptions(options).create(descriptor, ImmutableList.of(service));
+    Server server = ServerFactory.createDefault().create(ImmutableList.of(service), descriptor);
 
     String url = service.getApiServiceDescriptor().getUrl();
     BeamFnControlGrpc.BeamFnControlStub clientStub =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactoryTest.java
deleted file mode 100644
index 70b9980..0000000
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactoryTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.fn;
-
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.startsWith;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assume.assumeTrue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.net.HostAndPort;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
-import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-import org.apache.beam.runners.dataflow.harness.test.TestStreams;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ManagedChannel;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ManagedChannelBuilder;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.netty.NettyChannelBuilder;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.CallStreamObserver;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.StreamObserver;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.Epoll;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollDomainSocketChannel;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollEventLoopGroup;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollSocketChannel;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.unix.DomainSocketAddress;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link ServerFactory}. */
-@RunWith(JUnit4.class)
-public class ServerFactoryTest {
-  private static final BeamFnApi.Elements CLIENT_DATA =
-      BeamFnApi.Elements.newBuilder()
-          .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
-          .build();
-  private static final BeamFnApi.Elements SERVER_DATA =
-      BeamFnApi.Elements.newBuilder()
-          .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
-          .build();
-
-  @Test
-  public void testCreatingDefaultServer() throws Exception {
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        runTestUsing(PipelineOptionsFactory.create());
-    HostAndPort hostAndPort = HostAndPort.fromString(apiServiceDescriptor.getUrl());
-    assertThat(
-        hostAndPort.getHost(),
-        anyOf(
-            equalTo(InetAddress.getLoopbackAddress().getHostName()),
-            equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
-    assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
-  }
-
-  @Test
-  public void testCreatingEpollServer() throws Exception {
-    assumeTrue(Epoll.isAvailable());
-    // tcnative only supports the ipv4 address family
-    assumeTrue(InetAddress.getLoopbackAddress() instanceof Inet4Address);
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        runTestUsing(
-            PipelineOptionsFactory.fromArgs(new String[] {"--experiments=beam_fn_api_epoll"})
-                .create());
-    HostAndPort hostAndPort = HostAndPort.fromString(apiServiceDescriptor.getUrl());
-    assertThat(
-        hostAndPort.getHost(),
-        anyOf(
-            equalTo(InetAddress.getLoopbackAddress().getHostName()),
-            equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
-    assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
-  }
-
-  @Test
-  public void testCreatingUnixDomainSocketServer() throws Exception {
-    assumeTrue(Epoll.isAvailable());
-    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
-        runTestUsing(
-            PipelineOptionsFactory.fromArgs(
-                    new String[] {
-                      "--experiments=beam_fn_api_epoll,beam_fn_api_epoll_domain_socket"
-                    })
-                .create());
-    assertThat(
-        apiServiceDescriptor.getUrl(),
-        startsWith("unix://" + System.getProperty("java.io.tmpdir")));
-  }
-
-  private Endpoints.ApiServiceDescriptor runTestUsing(PipelineOptions options) throws Exception {
-    ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options);
-    Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
-        Endpoints.ApiServiceDescriptor.newBuilder();
-
-    Collection<BeamFnApi.Elements> serverElements = new ArrayList<>();
-    CountDownLatch clientHangedUp = new CountDownLatch(1);
-    CallStreamObserver<BeamFnApi.Elements> serverInboundObserver =
-        TestStreams.withOnNext(serverElements::add)
-            .withOnCompleted(clientHangedUp::countDown)
-            .build();
-    TestDataService service = new TestDataService(serverInboundObserver);
-
-    ServerFactory serverFactory = ServerFactory.fromOptions(options);
-    Server server =
-        serverFactory.allocatePortAndCreate(apiServiceDescriptorBuilder, ImmutableList.of(service));
-    assertFalse(server.isShutdown());
-    ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());
-    BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel);
-    Collection<BeamFnApi.Elements> clientElements = new ArrayList<>();
-    CountDownLatch serverHangedUp = new CountDownLatch(1);
-    CallStreamObserver<BeamFnApi.Elements> clientInboundObserver =
-        TestStreams.withOnNext(clientElements::add)
-            .withOnCompleted(serverHangedUp::countDown)
-            .build();
-
-    StreamObserver<BeamFnApi.Elements> clientOutboundObserver = stub.data(clientInboundObserver);
-    StreamObserver<BeamFnApi.Elements> serverOutboundObserver = service.outboundObservers.take();
-
-    clientOutboundObserver.onNext(CLIENT_DATA);
-    serverOutboundObserver.onNext(SERVER_DATA);
-    clientOutboundObserver.onCompleted();
-    clientHangedUp.await();
-    serverOutboundObserver.onCompleted();
-    serverHangedUp.await();
-
-    assertThat(clientElements, contains(SERVER_DATA));
-    assertThat(serverElements, contains(CLIENT_DATA));
-    server.shutdown();
-    server.awaitTermination(1, TimeUnit.SECONDS);
-    server.shutdownNow();
-
-    return apiServiceDescriptorBuilder.build();
-  }
-
-  /** A test gRPC service that uses the provided inbound observer for all clients. */
-  private static class TestDataService extends BeamFnDataGrpc.BeamFnDataImplBase {
-    private final LinkedBlockingQueue<StreamObserver<BeamFnApi.Elements>> outboundObservers;
-    private final StreamObserver<BeamFnApi.Elements> inboundObserver;
-
-    private TestDataService(StreamObserver<BeamFnApi.Elements> inboundObserver) {
-      this.inboundObserver = inboundObserver;
-      this.outboundObservers = new LinkedBlockingQueue<>();
-    }
-
-    @Override
-    public StreamObserver<BeamFnApi.Elements> data(
-        StreamObserver<BeamFnApi.Elements> outboundObserver) {
-      Uninterruptibles.putUninterruptibly(outboundObservers, outboundObserver);
-      return inboundObserver;
-    }
-  }
-
-  /**
-   * Uses {@link PipelineOptions} to configure which underlying {@link ManagedChannel}
-   * implementation to use.
-   *
-   * <p>TODO: Remove this fork once available from a common shared library.
-   */
-  public abstract static class ManagedChannelFactory {
-    public static ManagedChannelFactory from(PipelineOptions options) {
-      List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
-      if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
-        org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.Epoll.ensureAvailability();
-        return new Epoll();
-      }
-      return new Default();
-    }
-
-    public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor);
-
-    /**
-     * Creates a {@link ManagedChannel} backed by an {@link EpollDomainSocketChannel} if the address
-     * is a {@link DomainSocketAddress}. Otherwise creates a {@link ManagedChannel} backed by an
-     * {@link EpollSocketChannel}.
-     */
-    private static class Epoll extends ManagedChannelFactory {
-      @Override
-      public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
-        SocketAddress address = SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl());
-        return NettyChannelBuilder.forAddress(address)
-            .channelType(
-                address instanceof DomainSocketAddress
-                    ? EpollDomainSocketChannel.class
-                    : EpollSocketChannel.class)
-            .eventLoopGroup(new EpollEventLoopGroup())
-            .usePlaintext(true)
-            // Set the message size to max value here. The actual size is governed by the
-            // buffer size in the layers above.
-            .maxInboundMessageSize(Integer.MAX_VALUE)
-            .build();
-      }
-    }
-
-    /**
-     * Creates a {@link ManagedChannel} relying on the {@link ManagedChannelBuilder} to create
-     * instances.
-     */
-    private static class Default extends ManagedChannelFactory {
-      @Override
-      public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
-        return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
-            .usePlaintext(true)
-            // Set the message size to max value here. The actual size is governed by the
-            // buffer size in the layers above.
-            .maxInboundMessageSize(Integer.MAX_VALUE)
-            .build();
-      }
-    }
-  }
-}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
index 52bcbaf..aabfd1f 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.fnexecution;
 
+import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
@@ -33,7 +34,8 @@ public class GrpcFnServer<ServiceT extends FnService> implements AutoCloseable {
   public static <ServiceT extends FnService> GrpcFnServer<ServiceT> allocatePortAndCreateFor(
       ServiceT service, ServerFactory factory) throws IOException {
     ApiServiceDescriptor.Builder apiServiceDescriptor = ApiServiceDescriptor.newBuilder();
-    Server server = factory.allocatePortAndCreate(service, apiServiceDescriptor);
+    Server server =
+        factory.allocateAddressAndCreate(ImmutableList.of(service), apiServiceDescriptor);
     return new GrpcFnServer<>(server, service, apiServiceDescriptor.build());
   }
 
@@ -43,7 +45,18 @@ public class GrpcFnServer<ServiceT extends FnService> implements AutoCloseable {
    */
   public static <ServiceT extends FnService> GrpcFnServer<ServiceT> create(
       ServiceT service, ApiServiceDescriptor endpoint, ServerFactory factory) throws IOException {
-    return new GrpcFnServer<>(factory.create(service, endpoint), service, endpoint);
+    return new GrpcFnServer<>(
+        factory.create(ImmutableList.of(service), endpoint), service, endpoint);
+  }
+
+  /** @deprecated This create function is used for Dataflow migration purpose only. */
+  @Deprecated
+  public static <ServiceT extends FnService> GrpcFnServer<ServiceT> create(
+      ServiceT service, ApiServiceDescriptor endpoint) {
+    return new GrpcFnServer(null, service, endpoint) {
+      @Override
+      public void close() throws Exception {}
+    };
   }
 
   private final Server server;
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
index 9300d6b..6f6afdd 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.fnexecution;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.BindableService;
@@ -39,20 +40,32 @@ public class InProcessServerFactory extends ServerFactory {
   private InProcessServerFactory() {}
 
   @Override
-  public Server allocatePortAndCreate(BindableService service, ApiServiceDescriptor.Builder builder)
-      throws IOException {
+  public Server allocateAddressAndCreate(
+      List<BindableService> services, ApiServiceDescriptor.Builder builder) throws IOException {
     String name = String.format("InProcessServer_%s", serviceNameUniqifier.getAndIncrement());
     builder.setUrl(name);
-    return InProcessServerBuilder.forName(name).addService(service).build().start();
+    InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName(name);
+    services
+        .stream()
+        .forEach(
+            service ->
+                serverBuilder.addService(
+                    ServerInterceptors.intercept(
+                        service, GrpcContextHeaderAccessorProvider.interceptor())));
+    return serverBuilder.build().start();
   }
 
   @Override
-  public Server create(BindableService service, ApiServiceDescriptor serviceDescriptor)
+  public Server create(List<BindableService> services, ApiServiceDescriptor serviceDescriptor)
       throws IOException {
-    return InProcessServerBuilder.forName(serviceDescriptor.getUrl())
-        .addService(
-            ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()))
-        .build()
-        .start();
+    InProcessServerBuilder builder = InProcessServerBuilder.forName(serviceDescriptor.getUrl());
+    services
+        .stream()
+        .forEach(
+            service ->
+                builder.addService(
+                    ServerInterceptors.intercept(
+                        service, GrpcContextHeaderAccessorProvider.interceptor())));
+    return builder.build().start();
   }
 }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
index 54a758c..18818c6 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -20,58 +20,81 @@ package org.apache.beam.runners.fnexecution;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.net.HostAndPort;
+import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
+import java.util.List;
 import java.util.function.Supplier;
 import org.apache.beam.model.pipeline.v1.Endpoints;
 import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.BindableService;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerBuilder;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerInterceptors;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.netty.NettyServerBuilder;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollEventLoopGroup;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerDomainSocketChannel;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerSocketChannel;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.unix.DomainSocketAddress;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.util.internal.ThreadLocalRandom;
 
 /** A {@link Server gRPC server} factory. */
 public abstract class ServerFactory {
-  /** Create a default {@link ServerFactory}. */
+  /** Create a default {@link InetSocketAddressServerFactory}. */
   public static ServerFactory createDefault() {
     return new InetSocketAddressServerFactory(UrlFactory.createDefault());
   }
 
-  /** Create a {@link ServerFactory} that uses the given url factory. */
+  /** Create a {@link InetSocketAddressServerFactory} that uses the given url factory. */
   public static ServerFactory createWithUrlFactory(UrlFactory urlFactory) {
     return new InetSocketAddressServerFactory(urlFactory);
   }
 
-  /** Create a {@link ServerFactory} that uses ports from a supplier. */
+  /** Create a {@link InetSocketAddressServerFactory} that uses ports from a supplier. */
   public static ServerFactory createWithPortSupplier(Supplier<Integer> portSupplier) {
     return new InetSocketAddressServerFactory(UrlFactory.createDefault(), portSupplier);
   }
 
-  /** Create a {@link ServerFactory} that uses the given url factory and ports from a supplier. */
+  /**
+   * Create a {@link InetSocketAddressServerFactory} that uses the given url factory and ports from
+   * a supplier.
+   */
   public static ServerFactory createWithUrlFactoryAndPortSupplier(
       UrlFactory urlFactory, Supplier<Integer> portSupplier) {
     return new InetSocketAddressServerFactory(urlFactory, portSupplier);
   }
 
+  /** Create a {@link EpollSocket}. */
+  public static ServerFactory createEpollSocket() {
+    return new EpollSocket();
+  }
+
+  /** Create a {@link EpollDomainSocket}. */
+  public static ServerFactory createEpollDomainSocket() {
+    return new EpollDomainSocket();
+  }
+
   /**
-   * Creates an instance of this server using an ephemeral port chosen automatically. The chosen
-   * port is accessible to the caller from the URL set in the input {@link
-   * Endpoints.ApiServiceDescriptor.Builder}. Server applies {@link
+   * Creates an instance of this server using an ephemeral address. The allocation of the address is
+   * server type dependent, which means the address may be a port for certain type of server, or a
+   * file path for other certain types. The chosen address is accessible to the caller from the URL
+   * set in the input {@link Endpoints.ApiServiceDescriptor.Builder}. Server applies {@link
    * GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
    */
-  public abstract Server allocatePortAndCreate(
-      BindableService service, Endpoints.ApiServiceDescriptor.Builder builder) throws IOException;
+  public abstract Server allocateAddressAndCreate(
+      List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder builder)
+      throws IOException;
 
   /**
-   * Creates an instance of this server at the address specified by the given service descriptor.
-   * Server applies {@link GrpcContextHeaderAccessorProvider#interceptor()} to all incoming
-   * requests.
+   * Creates an instance of this server at the address specified by the given service descriptor and
+   * bound to multiple services. Server applies {@link
+   * GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
    */
   public abstract Server create(
-      BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor) throws IOException;
-
+      List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
+      throws IOException;
   /**
    * Creates a {@link Server gRPC Server} using the default server factory.
    *
@@ -91,18 +114,19 @@ public abstract class ServerFactory {
     }
 
     @Override
-    public Server allocatePortAndCreate(
-        BindableService service, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+    public Server allocateAddressAndCreate(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
         throws IOException {
       InetSocketAddress address =
           new InetSocketAddress(InetAddress.getLoopbackAddress(), portSupplier.get());
-      Server server = createServer(service, address);
+      Server server = createServer(services, address);
       apiServiceDescriptor.setUrl(urlFactory.createUrl(address.getHostName(), server.getPort()));
       return server;
     }
 
     @Override
-    public Server create(BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor)
+    public Server create(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
         throws IOException {
       SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
       checkArgument(
@@ -111,24 +135,128 @@ public abstract class ServerFactory {
           getClass().getSimpleName(),
           ServerFactory.class.getSimpleName(),
           serviceDescriptor.getUrl());
-      return createServer(service, (InetSocketAddress) socketAddress);
+      return createServer(services, (InetSocketAddress) socketAddress);
     }
 
-    private static Server createServer(BindableService service, InetSocketAddress socket)
+    private static Server createServer(List<BindableService> services, InetSocketAddress socket)
         throws IOException {
-      // Note: Every ServerFactory should apply GrpcContextHeaderAccessorProvider to the service.
-      Server server =
+      NettyServerBuilder builder =
           NettyServerBuilder.forPort(socket.getPort())
-              .addService(
-                  ServerInterceptors.intercept(
-                      service, GrpcContextHeaderAccessorProvider.interceptor()))
               // Set the message size to max value here. The actual size is governed by the
               // buffer size in the layers above.
-              .maxMessageSize(Integer.MAX_VALUE)
-              .build();
-      server.start();
+              .maxMessageSize(Integer.MAX_VALUE);
+      services
+          .stream()
+          .forEach(
+              service ->
+                  builder.addService(
+                      ServerInterceptors.intercept(
+                          service, GrpcContextHeaderAccessorProvider.interceptor())));
+      return builder.build().start();
+    }
+  }
+
+  /**
+   * Creates a {@link Server gRPC Server} using a Unix domain socket. Note that this requires <a
+   * href="http://netty.io/wiki/forked-tomcat-native.html">Netty TcNative</a> available to be able
+   * to provide a {@link EpollServerDomainSocketChannel}.
+   *
+   * <p>The unix domain socket is located at ${java.io.tmpdir}/fnapi${random[0-10000)}.sock
+   */
+  private static class EpollDomainSocket extends ServerFactory {
+    private static File chooseRandomTmpFile(int port) {
+      return new File(System.getProperty("java.io.tmpdir"), String.format("fnapi%d.sock", port));
+    }
+
+    @Override
+    public Server allocateAddressAndCreate(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+        throws IOException {
+      File tmp;
+      do {
+        tmp = chooseRandomTmpFile(ThreadLocalRandom.current().nextInt(10000));
+      } while (tmp.exists());
+      apiServiceDescriptor.setUrl("unix://" + tmp.getAbsolutePath());
+      return create(services, apiServiceDescriptor.build());
+    }
+
+    @Override
+    public Server create(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
+        throws IOException {
+      SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+      checkArgument(
+          socketAddress instanceof DomainSocketAddress,
+          "%s requires a Unix domain socket address, got %s",
+          EpollDomainSocket.class.getSimpleName(),
+          serviceDescriptor.getUrl());
+      return createServer(services, (DomainSocketAddress) socketAddress);
+    }
+
+    private static Server createServer(
+        List<BindableService> services, DomainSocketAddress domainSocket) throws IOException {
+      NettyServerBuilder builder =
+          NettyServerBuilder.forAddress(domainSocket)
+              .channelType(EpollServerDomainSocketChannel.class)
+              .workerEventLoopGroup(new EpollEventLoopGroup())
+              .bossEventLoopGroup(new EpollEventLoopGroup())
+              .maxMessageSize(Integer.MAX_VALUE);
+      for (BindableService service : services) {
+        // Wrap the service to extract headers
+        builder.addService(
+            ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()));
+      }
+      return builder.build().start();
+    }
+  }
+
+  /**
+   * Creates a {@link Server gRPC Server} using an Epoll socket. Note that this requires <a
+   * href="http://netty.io/wiki/forked-tomcat-native.html">Netty TcNative</a> available to be able
+   * to provide a {@link EpollServerSocketChannel}.
+   *
+   * <p>The server is created listening any open port on "localhost".
+   */
+  private static class EpollSocket extends ServerFactory {
+    @Override
+    public Server allocateAddressAndCreate(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+        throws IOException {
+      InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+      Server server = createServer(services, address);
+      apiServiceDescriptor.setUrl(
+          HostAndPort.fromParts(address.getHostName(), server.getPort()).toString());
       return server;
     }
+
+    @Override
+    public Server create(
+        List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
+        throws IOException {
+      SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+      checkArgument(
+          socketAddress instanceof InetSocketAddress,
+          "%s requires a host:port socket address, got %s",
+          EpollSocket.class.getSimpleName(),
+          serviceDescriptor.getUrl());
+      return createServer(services, (InetSocketAddress) socketAddress);
+    }
+
+    private static Server createServer(List<BindableService> services, InetSocketAddress socket)
+        throws IOException {
+      ServerBuilder builder =
+          NettyServerBuilder.forAddress(socket)
+              .channelType(EpollServerSocketChannel.class)
+              .workerEventLoopGroup(new EpollEventLoopGroup())
+              .bossEventLoopGroup(new EpollEventLoopGroup())
+              .maxMessageSize(Integer.MAX_VALUE);
+      for (BindableService service : services) {
+        // Wrap the service to extract headers
+        builder.addService(
+            ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()));
+      }
+      return builder.build().start();
+    }
   }
 
   /**
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
index ece8e83..8d146b7 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.fnexecution;
 
+import com.google.common.collect.ImmutableList;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -54,7 +55,8 @@ public class GrpcContextHeaderAccessorProviderTest {
     TestDataService testService = new TestDataService(Mockito.mock(StreamObserver.class), consumer);
     ApiServiceDescriptor serviceDescriptor =
         ApiServiceDescriptor.newBuilder().setUrl("testServer").build();
-    Server server = InProcessServerFactory.create().create(testService, serviceDescriptor);
+    Server server =
+        InProcessServerFactory.create().create(ImmutableList.of(testService), serviceDescriptor);
     final Metadata.Key<String> workerIdKey =
         Metadata.Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER);
     Channel channel =
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
index 2942faa..58d7e1d 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -24,11 +24,15 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.startsWith;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeTrue;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.net.HostAndPort;
 import com.google.common.util.concurrent.Uninterruptibles;
+import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -45,6 +49,7 @@ import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ManagedChannel;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.CallStreamObserver;
 import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.Epoll;
 import org.junit.Test;
 
 /** Tests for {@link ServerFactory}. */
@@ -79,7 +84,8 @@ public class ServerFactoryTest {
         TestStreams.withOnNext((Elements unused) -> {}).withOnCompleted(() -> {}).build();
     TestDataService service = new TestDataService(observer);
     ApiServiceDescriptor.Builder descriptorBuilder = ApiServiceDescriptor.newBuilder();
-    Server server = serverFactory.allocatePortAndCreate(service, descriptorBuilder);
+    Server server =
+        serverFactory.allocateAddressAndCreate(ImmutableList.of(service), descriptorBuilder);
     // Immediately terminate server. We don't actually use it here.
     server.shutdown();
     assertThat(descriptorBuilder.getUrl(), is("foo"));
@@ -111,7 +117,7 @@ public class ServerFactoryTest {
     ApiServiceDescriptor.Builder descriptorBuilder = ApiServiceDescriptor.newBuilder();
     Server server = null;
     try {
-      server = serverFactory.allocatePortAndCreate(service, descriptorBuilder);
+      server = serverFactory.allocateAddressAndCreate(ImmutableList.of(service), descriptorBuilder);
       assertThat(descriptorBuilder.getUrl(), is("foo:65535"));
     } finally {
       if (server != null) {
@@ -120,6 +126,32 @@ public class ServerFactoryTest {
     }
   }
 
+  @Test
+  public void testCreatingEpollServer() throws Exception {
+    assumeTrue(Epoll.isAvailable());
+    // tcnative only supports the ipv4 address family
+    assumeTrue(InetAddress.getLoopbackAddress() instanceof Inet4Address);
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        runTestUsing(ServerFactory.createEpollSocket(), ManagedChannelFactory.createEpoll());
+    HostAndPort hostAndPort = HostAndPort.fromString(apiServiceDescriptor.getUrl());
+    assertThat(
+        hostAndPort.getHost(),
+        anyOf(
+            equalTo(InetAddress.getLoopbackAddress().getHostName()),
+            equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
+    assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
+  }
+
+  @Test
+  public void testCreatingUnixDomainSocketServer() throws Exception {
+    assumeTrue(Epoll.isAvailable());
+    Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+        runTestUsing(ServerFactory.createEpollDomainSocket(), ManagedChannelFactory.createEpoll());
+    assertThat(
+        apiServiceDescriptor.getUrl(),
+        startsWith("unix://" + System.getProperty("java.io.tmpdir")));
+  }
+
   private Endpoints.ApiServiceDescriptor runTestUsing(
       ServerFactory serverFactory, ManagedChannelFactory channelFactory) throws Exception {
     Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
@@ -132,7 +164,9 @@ public class ServerFactoryTest {
             .withOnCompleted(clientHangedUp::countDown)
             .build();
     TestDataService service = new TestDataService(serverInboundObserver);
-    Server server = serverFactory.allocatePortAndCreate(service, apiServiceDescriptorBuilder);
+    Server server =
+        serverFactory.allocateAddressAndCreate(
+            ImmutableList.of(service), apiServiceDescriptorBuilder);
     assertFalse(server.isShutdown());
 
     ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());