You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/11/17 20:30:57 UTC
[06/50] [abbrv] beam git commit: Add A FnService,
FnServer to runner Fn Execution
Add A FnService, FnServer to runner Fn Execution
A FnService is a closeable BindableService. A FnServer is exactly one
FnService and a server which exposes it as an endpoint.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd4146d4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd4146d4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd4146d4
Branch: refs/heads/tez-runner
Commit: bd4146d4eebe744ba9453a245e32f397394f9366
Parents: 6e4781b
Author: Thomas Groh <tg...@google.com>
Authored: Mon Oct 30 17:36:37 2017 -0700
Committer: Luke Cwik <lc...@google.com>
Committed: Fri Nov 10 10:21:55 2017 -0800
----------------------------------------------------------------------
.../beam/runners/fnexecution/FnService.java | 24 ++++++
.../beam/runners/fnexecution/GrpcFnServer.java | 88 ++++++++++++++++++++
2 files changed, 112 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/bd4146d4/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
new file mode 100644
index 0000000..9ea0fce
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/FnService.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import io.grpc.BindableService;
+
+/** An interface sharing common behavior with services used during execution of user Fns. */
+public interface FnService extends AutoCloseable, BindableService {}
http://git-wip-us.apache.org/repos/asf/beam/blob/bd4146d4/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
----------------------------------------------------------------------
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
new file mode 100644
index 0000000..9f3dd3d
--- /dev/null
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/GrpcFnServer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import io.grpc.Server;
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+
+/**
+ * A {@link Server gRPC Server} which manages a single {@link FnService}. The lifetime of the
+ * service is bound to the {@link GrpcFnServer}.
+ */
+public class GrpcFnServer<ServiceT extends FnService> implements AutoCloseable {
+ /**
+ * Create a {@link GrpcFnServer} for the provided {@link FnService} running on an arbitrary
+ * port.
+ */
+ public static <ServiceT extends FnService> GrpcFnServer<ServiceT> allocatePortAndCreateFor(
+ ServiceT service, ServerFactory factory) throws IOException {
+ ApiServiceDescriptor.Builder apiServiceDescriptor = ApiServiceDescriptor.newBuilder();
+ Server server = factory.allocatePortAndCreate(service, apiServiceDescriptor);
+ return new GrpcFnServer<>(server, service, apiServiceDescriptor.build());
+ }
+
+ /**
+ * Create a {@link GrpcFnServer} for the provided {@link FnService} which will run at the
+ * endpoint specified in the {@link ApiServiceDescriptor}.
+ */
+ public static <ServiceT extends FnService> GrpcFnServer<ServiceT> create(
+ ServiceT service, ApiServiceDescriptor endpoint, ServerFactory factory) throws IOException {
+ return new GrpcFnServer<>(factory.create(service, endpoint), service, endpoint);
+ }
+
+ private final Server server;
+ private final ServiceT service;
+ private final ApiServiceDescriptor apiServiceDescriptor;
+
+ private GrpcFnServer(Server server, ServiceT service, ApiServiceDescriptor apiServiceDescriptor)
+ throws IOException {
+ this.server = server;
+ this.service = service;
+ this.apiServiceDescriptor = apiServiceDescriptor;
+ server.start();
+ }
+
+ /**
+ * Get an {@link ApiServiceDescriptor} describing the endpoint this {@link GrpcFnServer} is bound
+ * to.
+ */
+ public ApiServiceDescriptor getApiServiceDescriptor() {
+ return apiServiceDescriptor;
+ }
+
+ /** Get the service exposed by this {@link GrpcFnServer}. */
+ public ServiceT getService() {
+ return service;
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ // The server has been closed, and should not respond to any new incoming calls.
+ server.shutdown();
+ service.close();
+ server.awaitTermination(60, TimeUnit.SECONDS);
+ } finally {
+ server.shutdownNow();
+ server.awaitTermination();
+ }
+ }
+}