You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2018/12/03 22:38:22 UTC
[beam] branch master updated: [BEAM-6160] Use service server rather
than service (#7168)
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 78c1a10 [BEAM-6160] Use service server rather than service (#7168)
78c1a10 is described below
commit 78c1a105aa44fb892b155f3f336c262491e76362
Author: Boyuan Zhang <36...@users.noreply.github.com>
AuthorDate: Mon Dec 3 14:38:14 2018 -0800
[BEAM-6160] Use service server rather than service (#7168)
---
.../dataflow/worker/BatchDataflowWorker.java | 4 +-
.../worker/BeamFnMapTaskExecutorFactory.java | 13 +-
.../worker/DataflowMapTaskExecutorFactory.java | 9 +-
.../dataflow/worker/DataflowRunnerHarness.java | 21 +-
.../worker/IntrinsicMapTaskExecutorFactory.java | 9 +-
.../dataflow/worker/SdkHarnessRegistries.java | 15 +-
.../dataflow/worker/SdkHarnessRegistry.java | 9 +-
.../dataflow/worker/StreamingDataflowWorker.java | 4 +-
.../runners/dataflow/worker/fn/ServerFactory.java | 229 -------------------
.../worker/fn/data/BeamFnDataGrpcService.java | 78 ++++---
.../worker/fn/BeamFnControlServiceTest.java | 7 +-
.../dataflow/worker/fn/ServerFactoryTest.java | 244 ---------------------
.../beam/runners/fnexecution/GrpcFnServer.java | 17 +-
.../fnexecution/InProcessServerFactory.java | 31 ++-
.../beam/runners/fnexecution/ServerFactory.java | 184 +++++++++++++---
.../GrpcContextHeaderAccessorProviderTest.java | 4 +-
.../runners/fnexecution/ServerFactoryTest.java | 40 +++-
17 files changed, 333 insertions(+), 585 deletions(-)
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
index b103dce..2666dea 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BatchDataflowWorker.java
@@ -336,9 +336,9 @@ public class BatchDataflowWorker implements Closeable {
worker =
mapTaskExecutorFactory.create(
sdkWorkerHarness.getControlClientHandler(),
- sdkWorkerHarness.getDataService(),
+ sdkWorkerHarness.getGrpcDataFnServer(),
sdkHarnessRegistry.beamFnDataApiServiceDescriptor(),
- sdkWorkerHarness.getStateService(),
+ sdkWorkerHarness.getGrpcStateFnServer(),
network,
options,
stageName,
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
index 067b221..8e191e2 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/BeamFnMapTaskExecutorFactory.java
@@ -55,6 +55,7 @@ import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
import org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor;
import org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation;
+import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
import org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortReadOperation;
import org.apache.beam.runners.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation;
import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
@@ -82,8 +83,10 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
import org.apache.beam.runners.fnexecution.data.FnDataService;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.runners.fnexecution.state.StateDelegator;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
@@ -115,9 +118,9 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
@Override
public DataflowMapTaskExecutor create(
InstructionRequestHandler instructionRequestHandler,
- FnDataService beamFnDataService,
+ GrpcFnServer<BeamFnDataGrpcService.DataService> grpcDataFnServer,
Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
- StateDelegator beamFnStateDelegator,
+ GrpcFnServer<GrpcStateService> grpcStateFnServer,
MutableNetwork<Node, Edge> network,
PipelineOptions options,
String stageName,
@@ -143,7 +146,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
createOperationTransformForRegisterFnNodes(
idGenerator,
instructionRequestHandler,
- beamFnStateDelegator,
+ grpcStateFnServer.getService(),
stageName,
executionContext));
@@ -153,7 +156,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
network,
createOperationTransformForGrpcPortNodes(
network,
- beamFnDataService,
+ grpcDataFnServer.getService(),
// TODO: Set NameContext properly for these operations.
executionContext.createOperationContext(
NameContext.create(stageName, stageName, stageName, stageName))));
@@ -165,7 +168,7 @@ public class BeamFnMapTaskExecutorFactory implements DataflowMapTaskExecutorFact
network,
idGenerator,
instructionRequestHandler,
- beamFnDataService,
+ grpcDataFnServer.getService(),
dataApiServiceDescriptor,
executionContext,
stageName));
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
index 56b2cbc..b6bcb5a 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowMapTaskExecutorFactory.java
@@ -22,11 +22,12 @@ import com.google.common.graph.MutableNetwork;
import java.util.function.Supplier;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
+import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
import org.apache.beam.runners.dataflow.worker.graph.Nodes.Node;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
-import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.sdk.options.PipelineOptions;
/** Creates a {@link DataflowMapTaskExecutor} from a {@link MapTask} definition. */
@@ -38,9 +39,9 @@ public interface DataflowMapTaskExecutorFactory {
*/
DataflowMapTaskExecutor create(
InstructionRequestHandler instructionRequestHandler,
- FnDataService beamFnDataService,
+ GrpcFnServer<BeamFnDataGrpcService.DataService> grpcDataFnServer,
Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
- StateDelegator beamFnStateDelegator,
+ GrpcFnServer<GrpcStateService> grpcStateFnServer,
MutableNetwork<Node, Edge> network,
PipelineOptions options,
String stageName,
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
index c4f3736..8fb610b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowRunnerHarness.java
@@ -26,14 +26,16 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.runners.dataflow.DataflowRunner;
+import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions;
import org.apache.beam.runners.dataflow.worker.fn.BeamFnControlService;
-import org.apache.beam.runners.dataflow.worker.fn.ServerFactory;
import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
import org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingService;
import org.apache.beam.runners.dataflow.worker.fn.stream.ServerStreamObserverFactory;
import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingInitializer;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.sdk.io.FileSystems;
@@ -76,7 +78,16 @@ public class DataflowRunnerHarness {
// Initialized registered file systems.˜
FileSystems.setDefaultPipelineOptions(pipelineOptions);
- ServerFactory serverFactory = ServerFactory.fromOptions(pipelineOptions);
+ DataflowPipelineDebugOptions dataflowOptions =
+ pipelineOptions.as(DataflowPipelineDebugOptions.class);
+ ServerFactory serverFactory;
+ if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api_epoll_domain_socket")) {
+ serverFactory = ServerFactory.createEpollDomainSocket();
+ } else if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api_epoll")) {
+ serverFactory = ServerFactory.createEpollSocket();
+ } else {
+ serverFactory = ServerFactory.createDefault();
+ }
ServerStreamObserverFactory streamObserverFactory =
ServerStreamObserverFactory.fromOptions(pipelineOptions);
@@ -103,11 +114,11 @@ public class DataflowRunnerHarness {
servicesServer =
serverFactory.create(
- controlApiService,
- ImmutableList.of(beamFnControlService, beamFnDataService, beamFnStateService));
+ ImmutableList.of(beamFnControlService, beamFnDataService, beamFnStateService),
+ controlApiService);
loggingServer =
- serverFactory.create(loggingApiService, ImmutableList.of(beamFnLoggingService));
+ serverFactory.create(ImmutableList.of(beamFnLoggingService), loggingApiService);
start(
pipeline,
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
index 2bd0c1e..e632719b 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/IntrinsicMapTaskExecutorFactory.java
@@ -47,6 +47,7 @@ import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.runners.dataflow.worker.counters.CounterFactory;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.runners.dataflow.worker.counters.NameContext;
+import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService.DataService;
import org.apache.beam.runners.dataflow.worker.graph.Edges.Edge;
import org.apache.beam.runners.dataflow.worker.graph.Edges.MultiOutputInfoEdge;
import org.apache.beam.runners.dataflow.worker.graph.Networks;
@@ -69,9 +70,9 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Sink;
import org.apache.beam.runners.dataflow.worker.util.common.worker.WriteOperation;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.control.InstructionRequestHandler;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
-import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -98,9 +99,9 @@ public class IntrinsicMapTaskExecutorFactory implements DataflowMapTaskExecutorF
@Override
public DataflowMapTaskExecutor create(
InstructionRequestHandler instructionRequestHandler,
- FnDataService beamFnDataService,
+ GrpcFnServer<DataService> grpcDataFnServer,
Endpoints.ApiServiceDescriptor dataApiServiceDescriptor,
- StateDelegator beamFnStateDelegator,
+ GrpcFnServer<GrpcStateService> grpcStateFnServer,
MutableNetwork<Node, Edge> network,
PipelineOptions options,
String stageName,
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
index 3e3284e..a0d54ea 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistries.java
@@ -26,8 +26,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.runners.fnexecution.state.GrpcStateService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -187,14 +187,15 @@ public class SdkHarnessRegistries {
@Override
@Nullable
- public FnDataService getDataService() {
- return beamFnDataGrpcService.getDataService(getWorkerId());
+ public GrpcFnServer<BeamFnDataGrpcService.DataService> getGrpcDataFnServer() {
+ return GrpcFnServer.create(
+ beamFnDataGrpcService.getDataService(getWorkerId()), beamFnDataApiServiceDescriptor());
}
@Override
@Nullable
- public GrpcStateService getStateService() {
- return beamFnStateService;
+ public GrpcFnServer<GrpcStateService> getGrpcStateFnServer() {
+ return GrpcFnServer.create(beamFnStateService, beamFnDataApiServiceDescriptor());
}
}
}
@@ -228,13 +229,13 @@ public class SdkHarnessRegistries {
@Nullable
@Override
- public FnDataService getDataService() {
+ public GrpcFnServer<BeamFnDataGrpcService.DataService> getGrpcDataFnServer() {
return null;
}
@Nullable
@Override
- public GrpcStateService getStateService() {
+ public GrpcFnServer<GrpcStateService> getGrpcStateFnServer() {
return null;
}
};
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
index 8c11d14..383d292 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SdkHarnessRegistry.java
@@ -19,9 +19,10 @@ package org.apache.beam.runners.dataflow.worker;
import javax.annotation.Nullable;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.dataflow.worker.fn.data.BeamFnDataGrpcService;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
-import org.apache.beam.runners.fnexecution.data.FnDataService;
-import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.GrpcStateService;
/** Registry used to manage all the connections (Control, Data, State) from SdkHarness */
public interface SdkHarnessRegistry {
@@ -60,9 +61,9 @@ public interface SdkHarnessRegistry {
public String getWorkerId();
@Nullable
- public FnDataService getDataService();
+ public GrpcFnServer<BeamFnDataGrpcService.DataService> getGrpcDataFnServer();
@Nullable
- public StateDelegator getStateService();
+ public GrpcFnServer<GrpcStateService> getGrpcStateFnServer();
}
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 070973b..80556a8 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1143,9 +1143,9 @@ public class StreamingDataflowWorker {
DataflowMapTaskExecutor mapTaskExecutor =
mapTaskExecutorFactory.create(
worker.getControlClientHandler(),
- worker.getDataService(),
+ worker.getGrpcDataFnServer(),
sdkHarnessRegistry.beamFnDataApiServiceDescriptor(),
- worker.getStateService(),
+ worker.getGrpcStateFnServer(),
mapTaskNetwork,
options,
mapTask.getStageName(),
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactory.java
deleted file mode 100644
index 616ce0f..0000000
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactory.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.fn;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.net.HostAndPort;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.List;
-import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.runners.dataflow.DataflowRunner;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
-import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.BindableService;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerBuilder;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerInterceptors;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.netty.NettyServerBuilder;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollEventLoopGroup;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerDomainSocketChannel;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerSocketChannel;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.unix.DomainSocketAddress;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.util.internal.ThreadLocalRandom;
-
-/**
- * A {@link Server gRPC Server} factory that returns a server based upon {@link PipelineOptions}
- * experiments. <br>
- * TODO: Kill {@link ServerFactory} instead use {@link
- * org.apache.beam.runners.fnexecution.ServerFactory}.
- */
-@Deprecated
-public abstract class ServerFactory {
- public static ServerFactory fromOptions(PipelineOptions options) {
- DataflowPipelineDebugOptions dataflowOptions = options.as(DataflowPipelineDebugOptions.class);
- if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api_epoll_domain_socket")) {
- return new EpollDomainSocket();
- } else if (DataflowRunner.hasExperiment(dataflowOptions, "beam_fn_api_epoll")) {
- return new EpollSocket();
- }
- return new Default();
- }
-
- /**
- * Allocates a port for a server using an ephemeral port chosen automatically. The chosen port is
- * accessible to the caller from the URL set in the input {@link
- * Endpoints.ApiServiceDescriptor.Builder}. Server applies {@link
- * GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
- */
- public abstract Server allocatePortAndCreate(
- Endpoints.ApiServiceDescriptor.Builder builder, List<BindableService> services)
- throws IOException;
-
- /**
- * Creates an instance of this server at the address specified by the given service descriptor.
- * Server applies {@link GrpcContextHeaderAccessorProvider#interceptor()} to all incoming
- * requests.
- */
- public abstract Server create(
- Endpoints.ApiServiceDescriptor serviceDescriptor, List<BindableService> services)
- throws IOException;
-
- /**
- * Creates a {@link Server gRPC Server} using a Unix domain socket. Note that this requires <a
- * href="http://netty.io/wiki/forked-tomcat-native.html">Netty TcNative</a> available to be able
- * to provide a {@link EpollServerDomainSocketChannel}.
- *
- * <p>The unix domain socket is located at ${java.io.tmpdir}/fnapi${random[0-10000)}.sock
- */
- private static class EpollDomainSocket extends ServerFactory {
- private static File getFileForPort(int port) {
- return new File(System.getProperty("java.io.tmpdir"), String.format("fnapi%d.sock", port));
- }
-
- @Override
- public Server allocatePortAndCreate(
- Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor, List<BindableService> services)
- throws IOException {
- File tmp;
- do {
- tmp = getFileForPort(ThreadLocalRandom.current().nextInt(10000));
- } while (tmp.exists());
- apiServiceDescriptor.setUrl("unix://" + tmp.getAbsolutePath());
- return create(apiServiceDescriptor.build(), services);
- }
-
- @Override
- public Server create(
- Endpoints.ApiServiceDescriptor serviceDescriptor, List<BindableService> services)
- throws IOException {
- SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
- checkArgument(
- socketAddress instanceof DomainSocketAddress,
- "%s requires a Unix domain socket address, got %s",
- EpollDomainSocket.class.getSimpleName(),
- serviceDescriptor.getUrl());
- return createServer((DomainSocketAddress) socketAddress, services);
- }
-
- private static Server createServer(
- DomainSocketAddress domainSocket, List<BindableService> services) throws IOException {
- NettyServerBuilder builder =
- NettyServerBuilder.forAddress(domainSocket)
- .channelType(EpollServerDomainSocketChannel.class)
- .workerEventLoopGroup(new EpollEventLoopGroup())
- .bossEventLoopGroup(new EpollEventLoopGroup())
- .maxMessageSize(Integer.MAX_VALUE);
- for (BindableService service : services) {
- // Wrap the service to extract headers
- builder.addService(
- ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()));
- }
- return builder.build().start();
- }
- }
-
- /**
- * Creates a {@link Server gRPC Server} using an Epoll socket. Note that this requires <a
- * href="http://netty.io/wiki/forked-tomcat-native.html">Netty TcNative</a> available to be able
- * to provide a {@link EpollServerSocketChannel}.
- *
- * <p>The server is created listening any open port on "localhost".
- */
- private static class EpollSocket extends ServerFactory {
- @Override
- public Server allocatePortAndCreate(
- Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor, List<BindableService> services)
- throws IOException {
- InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
- Server server = createServer(address, services);
- apiServiceDescriptor.setUrl(
- HostAndPort.fromParts(address.getHostName(), server.getPort()).toString());
- return server;
- }
-
- @Override
- public Server create(
- Endpoints.ApiServiceDescriptor serviceDescriptor, List<BindableService> services)
- throws IOException {
- SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
- checkArgument(
- socketAddress instanceof InetSocketAddress,
- "%s requires a host:port socket address, got %s",
- EpollSocket.class.getSimpleName(),
- serviceDescriptor.getUrl());
- return createServer((InetSocketAddress) socketAddress, services);
- }
-
- private static Server createServer(InetSocketAddress socket, List<BindableService> services)
- throws IOException {
- ServerBuilder builder =
- NettyServerBuilder.forAddress(socket)
- .channelType(EpollServerSocketChannel.class)
- .workerEventLoopGroup(new EpollEventLoopGroup())
- .bossEventLoopGroup(new EpollEventLoopGroup())
- .maxMessageSize(Integer.MAX_VALUE);
- for (BindableService service : services) {
- // Wrap the service to extract headers
- builder.addService(
- ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()));
- }
- return builder.build().start();
- }
- }
-
- /**
- * Creates a {@link Server gRPC Server} using the default server factory.
- *
- * <p>The server is created listening any open port on "localhost".
- */
- private static class Default extends ServerFactory {
- @Override
- public Server allocatePortAndCreate(
- Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor, List<BindableService> services)
- throws IOException {
- InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
- Server server = createServer(address, services);
- apiServiceDescriptor.setUrl(
- HostAndPort.fromParts(address.getHostName(), server.getPort()).toString());
- return server;
- }
-
- @Override
- public Server create(
- Endpoints.ApiServiceDescriptor serviceDescriptor, List<BindableService> services)
- throws IOException {
- SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
- checkArgument(
- socketAddress instanceof InetSocketAddress,
- "Default ServerFactory requires a host:port socket address, got %s",
- serviceDescriptor.getUrl());
- return createServer((InetSocketAddress) socketAddress, services);
- }
-
- private static Server createServer(InetSocketAddress socket, List<BindableService> services)
- throws IOException {
- NettyServerBuilder builder =
- NettyServerBuilder.forPort(socket.getPort())
- // Set the message size to max value here. The actual size is governed by the
- // buffer size in the layers above.
- .maxMessageSize(Integer.MAX_VALUE);
- for (BindableService service : services) {
- // Wrap the service to extract headers
- builder.addService(
- ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()));
- }
- return builder.build().start();
- }
- }
-}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
index be295f9..d81b902 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/fn/data/BeamFnDataGrpcService.java
@@ -32,6 +32,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.Elements;
import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.dataflow.worker.fn.grpc.BeamFnService;
+import org.apache.beam.runners.fnexecution.FnService;
import org.apache.beam.runners.fnexecution.HeaderAccessor;
import org.apache.beam.runners.fnexecution.data.FnDataService;
import org.apache.beam.sdk.coders.Coder;
@@ -204,42 +205,55 @@ public class BeamFnDataGrpcService extends BeamFnDataGrpc.BeamFnDataImplBase
}
}
- /** Get the DataService for the clientId */
- public FnDataService getDataService(final String clientId) {
- return new FnDataService() {
- @Override
- public <T> InboundDataClient receive(
- LogicalEndpoint inputLocation,
- Coder<WindowedValue<T>> coder,
- FnDataReceiver<WindowedValue<T>> consumer) {
- LOG.debug("Registering consumer for {}", inputLocation);
+ // A wrapper class
+ public class DataService extends BeamFnDataGrpc.BeamFnDataImplBase
+ implements FnDataService, FnService {
+ private final String clientId;
- return new DeferredInboundDataClient(clientId, inputLocation, coder, consumer);
- }
+ public DataService(String clientId) {
+ this.clientId = clientId;
+ }
- @Override
- public <T> CloseableFnDataReceiver<WindowedValue<T>> send(
- LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) {
- LOG.debug("Creating output consumer for {}", outputLocation);
- try {
- if (outboundBufferLimit.isPresent()) {
- return BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(
- outboundBufferLimit.get(),
- outputLocation,
- coder,
- getClientFuture(clientId).get().getOutboundObserver());
- } else {
- return BeamFnDataBufferingOutboundObserver.forLocation(
- outputLocation, coder, getClientFuture(clientId).get().getOutboundObserver());
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new RuntimeException(e);
- } catch (ExecutionException e) {
- throw new RuntimeException(e);
+ @Override
+ public <T> InboundDataClient receive(
+ LogicalEndpoint inputLocation,
+ Coder<WindowedValue<T>> coder,
+ FnDataReceiver<WindowedValue<T>> consumer) {
+ LOG.debug("Registering consumer for {}", inputLocation);
+
+ return new DeferredInboundDataClient(this.clientId, inputLocation, coder, consumer);
+ }
+
+ @Override
+ public <T> CloseableFnDataReceiver<WindowedValue<T>> send(
+ LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) {
+ LOG.debug("Creating output consumer for {}", outputLocation);
+ try {
+ if (outboundBufferLimit.isPresent()) {
+ return BeamFnDataBufferingOutboundObserver.forLocationWithBufferLimit(
+ outboundBufferLimit.get(),
+ outputLocation,
+ coder,
+ getClientFuture(this.clientId).get().getOutboundObserver());
+ } else {
+ return BeamFnDataBufferingOutboundObserver.forLocation(
+ outputLocation, coder, getClientFuture(this.clientId).get().getOutboundObserver());
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
}
- };
+ }
+
+ @Override
+ public void close() throws Exception {}
+ }
+
+ /** Get the DataService for the clientId */
+ public DataService getDataService(final String clientId) {
+ return new DataService(clientId);
}
@Override
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
index 4620ecb..b660732 100644
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
+++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/BeamFnControlServiceTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.runners.dataflow.worker.fn.stream.ServerStreamObserverFactory;
import org.apache.beam.runners.fnexecution.GrpcContextHeaderAccessorProvider;
+import org.apache.beam.runners.fnexecution.ServerFactory;
import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -73,8 +74,7 @@ public class BeamFnControlServiceTest {
descriptor,
ServerStreamObserverFactory.fromOptions(options)::from,
GrpcContextHeaderAccessorProvider.getHeaderAccessor());
- Server server =
- ServerFactory.fromOptions(options).create(descriptor, ImmutableList.of(service));
+ Server server = ServerFactory.createDefault().create(ImmutableList.of(service), descriptor);
String url = service.getApiServiceDescriptor().getUrl();
BeamFnControlGrpc.BeamFnControlStub clientStub =
BeamFnControlGrpc.newStub(ManagedChannelBuilder.forTarget(url).usePlaintext(true).build());
@@ -102,8 +102,7 @@ public class BeamFnControlServiceTest {
descriptor,
ServerStreamObserverFactory.fromOptions(options)::from,
GrpcContextHeaderAccessorProvider.getHeaderAccessor());
- Server server =
- ServerFactory.fromOptions(options).create(descriptor, ImmutableList.of(service));
+ Server server = ServerFactory.createDefault().create(ImmutableList.of(service), descriptor);
String url = service.getApiServiceDescriptor().getUrl();
BeamFnControlGrpc.BeamFnControlStub clientStub =
diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactoryTest.java
deleted file mode 100644
index 70b9980..0000000
--- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/fn/ServerFactoryTest.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.fn;
-
-import static org.hamcrest.Matchers.allOf;
-import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.contains;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.lessThan;
-import static org.hamcrest.Matchers.startsWith;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assume.assumeTrue;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.net.HostAndPort;
-import com.google.common.util.concurrent.Uninterruptibles;
-import java.net.Inet4Address;
-import java.net.InetAddress;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnDataGrpc;
-import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
-import org.apache.beam.runners.dataflow.harness.test.TestStreams;
-import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ManagedChannel;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ManagedChannelBuilder;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.netty.NettyChannelBuilder;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.CallStreamObserver;
-import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.StreamObserver;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.Epoll;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollDomainSocketChannel;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollEventLoopGroup;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollSocketChannel;
-import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.unix.DomainSocketAddress;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Tests for {@link ServerFactory}. */
-@RunWith(JUnit4.class)
-public class ServerFactoryTest {
- private static final BeamFnApi.Elements CLIENT_DATA =
- BeamFnApi.Elements.newBuilder()
- .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
- .build();
- private static final BeamFnApi.Elements SERVER_DATA =
- BeamFnApi.Elements.newBuilder()
- .addData(BeamFnApi.Elements.Data.newBuilder().setInstructionReference("1"))
- .build();
-
- @Test
- public void testCreatingDefaultServer() throws Exception {
- Endpoints.ApiServiceDescriptor apiServiceDescriptor =
- runTestUsing(PipelineOptionsFactory.create());
- HostAndPort hostAndPort = HostAndPort.fromString(apiServiceDescriptor.getUrl());
- assertThat(
- hostAndPort.getHost(),
- anyOf(
- equalTo(InetAddress.getLoopbackAddress().getHostName()),
- equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
- assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
- }
-
- @Test
- public void testCreatingEpollServer() throws Exception {
- assumeTrue(Epoll.isAvailable());
- // tcnative only supports the ipv4 address family
- assumeTrue(InetAddress.getLoopbackAddress() instanceof Inet4Address);
- Endpoints.ApiServiceDescriptor apiServiceDescriptor =
- runTestUsing(
- PipelineOptionsFactory.fromArgs(new String[] {"--experiments=beam_fn_api_epoll"})
- .create());
- HostAndPort hostAndPort = HostAndPort.fromString(apiServiceDescriptor.getUrl());
- assertThat(
- hostAndPort.getHost(),
- anyOf(
- equalTo(InetAddress.getLoopbackAddress().getHostName()),
- equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
- assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
- }
-
- @Test
- public void testCreatingUnixDomainSocketServer() throws Exception {
- assumeTrue(Epoll.isAvailable());
- Endpoints.ApiServiceDescriptor apiServiceDescriptor =
- runTestUsing(
- PipelineOptionsFactory.fromArgs(
- new String[] {
- "--experiments=beam_fn_api_epoll,beam_fn_api_epoll_domain_socket"
- })
- .create());
- assertThat(
- apiServiceDescriptor.getUrl(),
- startsWith("unix://" + System.getProperty("java.io.tmpdir")));
- }
-
- private Endpoints.ApiServiceDescriptor runTestUsing(PipelineOptions options) throws Exception {
- ManagedChannelFactory channelFactory = ManagedChannelFactory.from(options);
- Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
- Endpoints.ApiServiceDescriptor.newBuilder();
-
- Collection<BeamFnApi.Elements> serverElements = new ArrayList<>();
- CountDownLatch clientHangedUp = new CountDownLatch(1);
- CallStreamObserver<BeamFnApi.Elements> serverInboundObserver =
- TestStreams.withOnNext(serverElements::add)
- .withOnCompleted(clientHangedUp::countDown)
- .build();
- TestDataService service = new TestDataService(serverInboundObserver);
-
- ServerFactory serverFactory = ServerFactory.fromOptions(options);
- Server server =
- serverFactory.allocatePortAndCreate(apiServiceDescriptorBuilder, ImmutableList.of(service));
- assertFalse(server.isShutdown());
- ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());
- BeamFnDataGrpc.BeamFnDataStub stub = BeamFnDataGrpc.newStub(channel);
- Collection<BeamFnApi.Elements> clientElements = new ArrayList<>();
- CountDownLatch serverHangedUp = new CountDownLatch(1);
- CallStreamObserver<BeamFnApi.Elements> clientInboundObserver =
- TestStreams.withOnNext(clientElements::add)
- .withOnCompleted(serverHangedUp::countDown)
- .build();
-
- StreamObserver<BeamFnApi.Elements> clientOutboundObserver = stub.data(clientInboundObserver);
- StreamObserver<BeamFnApi.Elements> serverOutboundObserver = service.outboundObservers.take();
-
- clientOutboundObserver.onNext(CLIENT_DATA);
- serverOutboundObserver.onNext(SERVER_DATA);
- clientOutboundObserver.onCompleted();
- clientHangedUp.await();
- serverOutboundObserver.onCompleted();
- serverHangedUp.await();
-
- assertThat(clientElements, contains(SERVER_DATA));
- assertThat(serverElements, contains(CLIENT_DATA));
- server.shutdown();
- server.awaitTermination(1, TimeUnit.SECONDS);
- server.shutdownNow();
-
- return apiServiceDescriptorBuilder.build();
- }
-
- /** A test gRPC service that uses the provided inbound observer for all clients. */
- private static class TestDataService extends BeamFnDataGrpc.BeamFnDataImplBase {
- private final LinkedBlockingQueue<StreamObserver<BeamFnApi.Elements>> outboundObservers;
- private final StreamObserver<BeamFnApi.Elements> inboundObserver;
-
- private TestDataService(StreamObserver<BeamFnApi.Elements> inboundObserver) {
- this.inboundObserver = inboundObserver;
- this.outboundObservers = new LinkedBlockingQueue<>();
- }
-
- @Override
- public StreamObserver<BeamFnApi.Elements> data(
- StreamObserver<BeamFnApi.Elements> outboundObserver) {
- Uninterruptibles.putUninterruptibly(outboundObservers, outboundObserver);
- return inboundObserver;
- }
- }
-
- /**
- * Uses {@link PipelineOptions} to configure which underlying {@link ManagedChannel}
- * implementation to use.
- *
- * <p>TODO: Remove this fork once available from a common shared library.
- */
- public abstract static class ManagedChannelFactory {
- public static ManagedChannelFactory from(PipelineOptions options) {
- List<String> experiments = options.as(DataflowPipelineDebugOptions.class).getExperiments();
- if (experiments != null && experiments.contains("beam_fn_api_epoll")) {
- org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.Epoll.ensureAvailability();
- return new Epoll();
- }
- return new Default();
- }
-
- public abstract ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor);
-
- /**
- * Creates a {@link ManagedChannel} backed by an {@link EpollDomainSocketChannel} if the address
- * is a {@link DomainSocketAddress}. Otherwise creates a {@link ManagedChannel} backed by an
- * {@link EpollSocketChannel}.
- */
- private static class Epoll extends ManagedChannelFactory {
- @Override
- public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
- SocketAddress address = SocketAddressFactory.createFrom(apiServiceDescriptor.getUrl());
- return NettyChannelBuilder.forAddress(address)
- .channelType(
- address instanceof DomainSocketAddress
- ? EpollDomainSocketChannel.class
- : EpollSocketChannel.class)
- .eventLoopGroup(new EpollEventLoopGroup())
- .usePlaintext(true)
- // Set the message size to max value here. The actual size is governed by the
- // buffer size in the layers above.
- .maxInboundMessageSize(Integer.MAX_VALUE)
- .build();
- }
- }
-
- /**
- * Creates a {@link ManagedChannel} relying on the {@link ManagedChannelBuilder} to create
- * instances.
- */
- private static class Default extends ManagedChannelFactory {
- @Override
- public ManagedChannel forDescriptor(ApiServiceDescriptor apiServiceDescriptor) {
- return ManagedChannelBuilder.forTarget(apiServiceDescriptor.getUrl())
- .usePlaintext(true)
- // Set the message size to max value here. The actual size is governed by the
- // buffer size in the layers above.
- .maxInboundMessageSize(Integer.MAX_VALUE)
- .build();
- }
- }
- }
-}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
index 52bcbaf..aabfd1f 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.fnexecution;
+import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
@@ -33,7 +34,8 @@ public class GrpcFnServer<ServiceT extends FnService> implements AutoCloseable {
public static <ServiceT extends FnService> GrpcFnServer<ServiceT> allocatePortAndCreateFor(
ServiceT service, ServerFactory factory) throws IOException {
ApiServiceDescriptor.Builder apiServiceDescriptor = ApiServiceDescriptor.newBuilder();
- Server server = factory.allocatePortAndCreate(service, apiServiceDescriptor);
+ Server server =
+ factory.allocateAddressAndCreate(ImmutableList.of(service), apiServiceDescriptor);
return new GrpcFnServer<>(server, service, apiServiceDescriptor.build());
}
@@ -43,7 +45,18 @@ public class GrpcFnServer<ServiceT extends FnService> implements AutoCloseable {
*/
public static <ServiceT extends FnService> GrpcFnServer<ServiceT> create(
ServiceT service, ApiServiceDescriptor endpoint, ServerFactory factory) throws IOException {
- return new GrpcFnServer<>(factory.create(service, endpoint), service, endpoint);
+ return new GrpcFnServer<>(
+ factory.create(ImmutableList.of(service), endpoint), service, endpoint);
+ }
+
+ /** @deprecated This create function is used for Dataflow migration purpose only. */
+ @Deprecated
+ public static <ServiceT extends FnService> GrpcFnServer<ServiceT> create(
+ ServiceT service, ApiServiceDescriptor endpoint) {
+ return new GrpcFnServer(null, service, endpoint) {
+ @Override
+ public void close() throws Exception {}
+ };
}
private final Server server;
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
index 9300d6b..6f6afdd 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/InProcessServerFactory.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.fnexecution;
import java.io.IOException;
+import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.BindableService;
@@ -39,20 +40,32 @@ public class InProcessServerFactory extends ServerFactory {
private InProcessServerFactory() {}
@Override
- public Server allocatePortAndCreate(BindableService service, ApiServiceDescriptor.Builder builder)
- throws IOException {
+ public Server allocateAddressAndCreate(
+ List<BindableService> services, ApiServiceDescriptor.Builder builder) throws IOException {
String name = String.format("InProcessServer_%s", serviceNameUniqifier.getAndIncrement());
builder.setUrl(name);
- return InProcessServerBuilder.forName(name).addService(service).build().start();
+ InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName(name);
+ services
+ .stream()
+ .forEach(
+ service ->
+ serverBuilder.addService(
+ ServerInterceptors.intercept(
+ service, GrpcContextHeaderAccessorProvider.interceptor())));
+ return serverBuilder.build().start();
}
@Override
- public Server create(BindableService service, ApiServiceDescriptor serviceDescriptor)
+ public Server create(List<BindableService> services, ApiServiceDescriptor serviceDescriptor)
throws IOException {
- return InProcessServerBuilder.forName(serviceDescriptor.getUrl())
- .addService(
- ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()))
- .build()
- .start();
+ InProcessServerBuilder builder = InProcessServerBuilder.forName(serviceDescriptor.getUrl());
+ services
+ .stream()
+ .forEach(
+ service ->
+ builder.addService(
+ ServerInterceptors.intercept(
+ service, GrpcContextHeaderAccessorProvider.interceptor())));
+ return builder.build().start();
}
}
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
index 54a758c..18818c6 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/ServerFactory.java
@@ -20,58 +20,81 @@ package org.apache.beam.runners.fnexecution;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.net.HostAndPort;
+import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.List;
import java.util.function.Supplier;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.channel.SocketAddressFactory;
import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.BindableService;
import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
+import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerBuilder;
import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ServerInterceptors;
import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.netty.NettyServerBuilder;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollEventLoopGroup;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerDomainSocketChannel;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.EpollServerSocketChannel;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.unix.DomainSocketAddress;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.util.internal.ThreadLocalRandom;
/** A {@link Server gRPC server} factory. */
public abstract class ServerFactory {
- /** Create a default {@link ServerFactory}. */
+ /** Create a default {@link InetSocketAddressServerFactory}. */
public static ServerFactory createDefault() {
return new InetSocketAddressServerFactory(UrlFactory.createDefault());
}
- /** Create a {@link ServerFactory} that uses the given url factory. */
+ /** Create a {@link InetSocketAddressServerFactory} that uses the given url factory. */
public static ServerFactory createWithUrlFactory(UrlFactory urlFactory) {
return new InetSocketAddressServerFactory(urlFactory);
}
- /** Create a {@link ServerFactory} that uses ports from a supplier. */
+ /** Create a {@link InetSocketAddressServerFactory} that uses ports from a supplier. */
public static ServerFactory createWithPortSupplier(Supplier<Integer> portSupplier) {
return new InetSocketAddressServerFactory(UrlFactory.createDefault(), portSupplier);
}
- /** Create a {@link ServerFactory} that uses the given url factory and ports from a supplier. */
+ /**
+ * Create a {@link InetSocketAddressServerFactory} that uses the given url factory and ports from
+ * a supplier.
+ */
public static ServerFactory createWithUrlFactoryAndPortSupplier(
UrlFactory urlFactory, Supplier<Integer> portSupplier) {
return new InetSocketAddressServerFactory(urlFactory, portSupplier);
}
+ /** Create a {@link EpollSocket}. */
+ public static ServerFactory createEpollSocket() {
+ return new EpollSocket();
+ }
+
+ /** Create a {@link EpollDomainSocket}. */
+ public static ServerFactory createEpollDomainSocket() {
+ return new EpollDomainSocket();
+ }
+
/**
- * Creates an instance of this server using an ephemeral port chosen automatically. The chosen
- * port is accessible to the caller from the URL set in the input {@link
- * Endpoints.ApiServiceDescriptor.Builder}. Server applies {@link
+ * Creates an instance of this server using an ephemeral address. The allocation of the address is
+ * server type dependent, which means the address may be a port for certain type of server, or a
+ * file path for other certain types. The chosen address is accessible to the caller from the URL
+ * set in the input {@link Endpoints.ApiServiceDescriptor.Builder}. Server applies {@link
* GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
*/
- public abstract Server allocatePortAndCreate(
- BindableService service, Endpoints.ApiServiceDescriptor.Builder builder) throws IOException;
+ public abstract Server allocateAddressAndCreate(
+ List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder builder)
+ throws IOException;
/**
- * Creates an instance of this server at the address specified by the given service descriptor.
- * Server applies {@link GrpcContextHeaderAccessorProvider#interceptor()} to all incoming
- * requests.
+ * Creates an instance of this server at the address specified by the given service descriptor and
+ * bound to multiple services. Server applies {@link
+ * GrpcContextHeaderAccessorProvider#interceptor()} to all incoming requests.
*/
public abstract Server create(
- BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor) throws IOException;
-
+ List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
+ throws IOException;
/**
* Creates a {@link Server gRPC Server} using the default server factory.
*
@@ -91,18 +114,19 @@ public abstract class ServerFactory {
}
@Override
- public Server allocatePortAndCreate(
- BindableService service, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+ public Server allocateAddressAndCreate(
+ List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
throws IOException {
InetSocketAddress address =
new InetSocketAddress(InetAddress.getLoopbackAddress(), portSupplier.get());
- Server server = createServer(service, address);
+ Server server = createServer(services, address);
apiServiceDescriptor.setUrl(urlFactory.createUrl(address.getHostName(), server.getPort()));
return server;
}
@Override
- public Server create(BindableService service, Endpoints.ApiServiceDescriptor serviceDescriptor)
+ public Server create(
+ List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
throws IOException {
SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
checkArgument(
@@ -111,24 +135,128 @@ public abstract class ServerFactory {
getClass().getSimpleName(),
ServerFactory.class.getSimpleName(),
serviceDescriptor.getUrl());
- return createServer(service, (InetSocketAddress) socketAddress);
+ return createServer(services, (InetSocketAddress) socketAddress);
}
- private static Server createServer(BindableService service, InetSocketAddress socket)
+ private static Server createServer(List<BindableService> services, InetSocketAddress socket)
throws IOException {
- // Note: Every ServerFactory should apply GrpcContextHeaderAccessorProvider to the service.
- Server server =
+ NettyServerBuilder builder =
NettyServerBuilder.forPort(socket.getPort())
- .addService(
- ServerInterceptors.intercept(
- service, GrpcContextHeaderAccessorProvider.interceptor()))
// Set the message size to max value here. The actual size is governed by the
// buffer size in the layers above.
- .maxMessageSize(Integer.MAX_VALUE)
- .build();
- server.start();
+ .maxMessageSize(Integer.MAX_VALUE);
+ services
+ .stream()
+ .forEach(
+ service ->
+ builder.addService(
+ ServerInterceptors.intercept(
+ service, GrpcContextHeaderAccessorProvider.interceptor())));
+ return builder.build().start();
+ }
+ }
+
+ /**
+ * Creates a {@link Server gRPC Server} using a Unix domain socket. Note that this requires <a
+ * href="http://netty.io/wiki/forked-tomcat-native.html">Netty TcNative</a> available to be able
+ * to provide a {@link EpollServerDomainSocketChannel}.
+ *
+ * <p>The unix domain socket is located at ${java.io.tmpdir}/fnapi${random[0-10000)}.sock
+ */
+ private static class EpollDomainSocket extends ServerFactory {
+ private static File chooseRandomTmpFile(int port) {
+ return new File(System.getProperty("java.io.tmpdir"), String.format("fnapi%d.sock", port));
+ }
+
+ @Override
+ public Server allocateAddressAndCreate(
+ List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+ throws IOException {
+ File tmp;
+ do {
+ tmp = chooseRandomTmpFile(ThreadLocalRandom.current().nextInt(10000));
+ } while (tmp.exists());
+ apiServiceDescriptor.setUrl("unix://" + tmp.getAbsolutePath());
+ return create(services, apiServiceDescriptor.build());
+ }
+
+ @Override
+ public Server create(
+ List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
+ throws IOException {
+ SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+ checkArgument(
+ socketAddress instanceof DomainSocketAddress,
+ "%s requires a Unix domain socket address, got %s",
+ EpollDomainSocket.class.getSimpleName(),
+ serviceDescriptor.getUrl());
+ return createServer(services, (DomainSocketAddress) socketAddress);
+ }
+
+ private static Server createServer(
+ List<BindableService> services, DomainSocketAddress domainSocket) throws IOException {
+ NettyServerBuilder builder =
+ NettyServerBuilder.forAddress(domainSocket)
+ .channelType(EpollServerDomainSocketChannel.class)
+ .workerEventLoopGroup(new EpollEventLoopGroup())
+ .bossEventLoopGroup(new EpollEventLoopGroup())
+ .maxMessageSize(Integer.MAX_VALUE);
+ for (BindableService service : services) {
+ // Wrap the service to extract headers
+ builder.addService(
+ ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()));
+ }
+ return builder.build().start();
+ }
+ }
+
+ /**
+ * Creates a {@link Server gRPC Server} using an Epoll socket. Note that this requires <a
+ * href="http://netty.io/wiki/forked-tomcat-native.html">Netty TcNative</a> available to be able
+ * to provide a {@link EpollServerSocketChannel}.
+ *
+ * <p>The server is created listening any open port on "localhost".
+ */
+ private static class EpollSocket extends ServerFactory {
+ @Override
+ public Server allocateAddressAndCreate(
+ List<BindableService> services, Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptor)
+ throws IOException {
+ InetSocketAddress address = new InetSocketAddress(InetAddress.getLoopbackAddress(), 0);
+ Server server = createServer(services, address);
+ apiServiceDescriptor.setUrl(
+ HostAndPort.fromParts(address.getHostName(), server.getPort()).toString());
return server;
}
+
+ @Override
+ public Server create(
+ List<BindableService> services, Endpoints.ApiServiceDescriptor serviceDescriptor)
+ throws IOException {
+ SocketAddress socketAddress = SocketAddressFactory.createFrom(serviceDescriptor.getUrl());
+ checkArgument(
+ socketAddress instanceof InetSocketAddress,
+ "%s requires a host:port socket address, got %s",
+ EpollSocket.class.getSimpleName(),
+ serviceDescriptor.getUrl());
+ return createServer(services, (InetSocketAddress) socketAddress);
+ }
+
+ private static Server createServer(List<BindableService> services, InetSocketAddress socket)
+ throws IOException {
+ ServerBuilder builder =
+ NettyServerBuilder.forAddress(socket)
+ .channelType(EpollServerSocketChannel.class)
+ .workerEventLoopGroup(new EpollEventLoopGroup())
+ .bossEventLoopGroup(new EpollEventLoopGroup())
+ .maxMessageSize(Integer.MAX_VALUE);
+ for (BindableService service : services) {
+ // Wrap the service to extract headers
+ builder.addService(
+ ServerInterceptors.intercept(service, GrpcContextHeaderAccessorProvider.interceptor()));
+ }
+ return builder.build().start();
+ }
}
/**
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
index ece8e83..8d146b7 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/GrpcContextHeaderAccessorProviderTest.java
@@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.fnexecution;
+import com.google.common.collect.ImmutableList;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
@@ -54,7 +55,8 @@ public class GrpcContextHeaderAccessorProviderTest {
TestDataService testService = new TestDataService(Mockito.mock(StreamObserver.class), consumer);
ApiServiceDescriptor serviceDescriptor =
ApiServiceDescriptor.newBuilder().setUrl("testServer").build();
- Server server = InProcessServerFactory.create().create(testService, serviceDescriptor);
+ Server server =
+ InProcessServerFactory.create().create(ImmutableList.of(testService), serviceDescriptor);
final Metadata.Key<String> workerIdKey =
Metadata.Key.of("worker_id", Metadata.ASCII_STRING_MARSHALLER);
Channel channel =
diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
index 2942faa..58d7e1d 100644
--- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
+++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/ServerFactoryTest.java
@@ -24,11 +24,15 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
+import static org.junit.Assume.assumeTrue;
+import com.google.common.collect.ImmutableList;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.Uninterruptibles;
+import java.net.Inet4Address;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
@@ -45,6 +49,7 @@ import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.Server;
import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.CallStreamObserver;
import org.apache.beam.vendor.grpc.v1_13_1.io.grpc.stub.StreamObserver;
+import org.apache.beam.vendor.grpc.v1_13_1.io.netty.channel.epoll.Epoll;
import org.junit.Test;
/** Tests for {@link ServerFactory}. */
@@ -79,7 +84,8 @@ public class ServerFactoryTest {
TestStreams.withOnNext((Elements unused) -> {}).withOnCompleted(() -> {}).build();
TestDataService service = new TestDataService(observer);
ApiServiceDescriptor.Builder descriptorBuilder = ApiServiceDescriptor.newBuilder();
- Server server = serverFactory.allocatePortAndCreate(service, descriptorBuilder);
+ Server server =
+ serverFactory.allocateAddressAndCreate(ImmutableList.of(service), descriptorBuilder);
// Immediately terminate server. We don't actually use it here.
server.shutdown();
assertThat(descriptorBuilder.getUrl(), is("foo"));
@@ -111,7 +117,7 @@ public class ServerFactoryTest {
ApiServiceDescriptor.Builder descriptorBuilder = ApiServiceDescriptor.newBuilder();
Server server = null;
try {
- server = serverFactory.allocatePortAndCreate(service, descriptorBuilder);
+ server = serverFactory.allocateAddressAndCreate(ImmutableList.of(service), descriptorBuilder);
assertThat(descriptorBuilder.getUrl(), is("foo:65535"));
} finally {
if (server != null) {
@@ -120,6 +126,32 @@ public class ServerFactoryTest {
}
}
+ @Test
+ public void testCreatingEpollServer() throws Exception {
+ assumeTrue(Epoll.isAvailable());
+ // tcnative only supports the ipv4 address family
+ assumeTrue(InetAddress.getLoopbackAddress() instanceof Inet4Address);
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+ runTestUsing(ServerFactory.createEpollSocket(), ManagedChannelFactory.createEpoll());
+ HostAndPort hostAndPort = HostAndPort.fromString(apiServiceDescriptor.getUrl());
+ assertThat(
+ hostAndPort.getHost(),
+ anyOf(
+ equalTo(InetAddress.getLoopbackAddress().getHostName()),
+ equalTo(InetAddress.getLoopbackAddress().getHostAddress())));
+ assertThat(hostAndPort.getPort(), allOf(greaterThan(0), lessThan(65536)));
+ }
+
+ @Test
+ public void testCreatingUnixDomainSocketServer() throws Exception {
+ assumeTrue(Epoll.isAvailable());
+ Endpoints.ApiServiceDescriptor apiServiceDescriptor =
+ runTestUsing(ServerFactory.createEpollDomainSocket(), ManagedChannelFactory.createEpoll());
+ assertThat(
+ apiServiceDescriptor.getUrl(),
+ startsWith("unix://" + System.getProperty("java.io.tmpdir")));
+ }
+
private Endpoints.ApiServiceDescriptor runTestUsing(
ServerFactory serverFactory, ManagedChannelFactory channelFactory) throws Exception {
Endpoints.ApiServiceDescriptor.Builder apiServiceDescriptorBuilder =
@@ -132,7 +164,9 @@ public class ServerFactoryTest {
.withOnCompleted(clientHangedUp::countDown)
.build();
TestDataService service = new TestDataService(serverInboundObserver);
- Server server = serverFactory.allocatePortAndCreate(service, apiServiceDescriptorBuilder);
+ Server server =
+ serverFactory.allocateAddressAndCreate(
+ ImmutableList.of(service), apiServiceDescriptorBuilder);
assertFalse(server.isShutdown());
ManagedChannel channel = channelFactory.forDescriptor(apiServiceDescriptorBuilder.build());