You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/06/03 15:11:19 UTC

[flink] 02/03: [FLINK-17959][python] Port Beam GrpcStateService class to flink-python module

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit de05d81df34a7e7ccfc9e5fb5d75c4cc3cfa1006
Author: Dian Fu <di...@apache.org>
AuthorDate: Wed Jun 3 16:43:18 2020 +0800

    [FLINK-17959][python] Port Beam GrpcStateService class to flink-python module
    
    This closes #12459.
---
 .../fnexecution/state/GrpcStateService.java        | 167 +++++++++++++++++++++
 1 file changed, 167 insertions(+)

diff --git a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
new file mode 100644
index 0000000..9c72d81
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables.getStackTraceAsString;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCallStreamObserver;
+import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
+
+/** An implementation of the Beam Fn State service. */
+public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase
+    implements StateDelegator, FnService {
+  /** Create a new {@link GrpcStateService}. */
+  public static GrpcStateService create() {
+    return new GrpcStateService();
+  }
+
+  private final ConcurrentLinkedQueue<Inbound> clients;
+  private final ConcurrentMap<String, StateRequestHandler> requestHandlers;
+
+  private GrpcStateService() {
+    this.requestHandlers = new ConcurrentHashMap<>();
+    this.clients = new ConcurrentLinkedQueue<>();
+  }
+
+  @Override
+  public void close() throws Exception {
+    Exception thrown = null;
+    for (Inbound inbound : clients) {
+      try {
+        // the call may be cancelled because the sdk harness hung up
+        // (we terminate the environment before terminating the service endpoints)
+        if (inbound.outboundObserver instanceof ServerCallStreamObserver) {
+          if (((ServerCallStreamObserver) inbound.outboundObserver).isCancelled()) {
+            // skip to avoid call already closed exception
+            continue;
+          }
+        }
+        inbound.outboundObserver.onCompleted();
+      } catch (Exception t) {
+        if (thrown == null) {
+          thrown = t;
+        } else {
+          thrown.addSuppressed(t);
+        }
+      }
+    }
+    if (thrown != null) {
+      throw thrown;
+    }
+  }
+
+  @Override
+  public StreamObserver<StateRequest> state(StreamObserver<StateResponse> responseObserver) {
+    Inbound rval = new Inbound(responseObserver);
+    clients.add(rval);
+    return rval;
+  }
+
+  @Override
+  public StateDelegator.Registration registerForProcessBundleInstructionId(
+      String processBundleInstructionId, StateRequestHandler handler) {
+    requestHandlers.putIfAbsent(processBundleInstructionId, handler);
+    return new Registration(processBundleInstructionId);
+  }
+
+  private class Registration implements StateDelegator.Registration {
+    private final String processBundleInstructionId;
+
+    private Registration(String processBundleInstructionId) {
+      this.processBundleInstructionId = processBundleInstructionId;
+    }
+
+    @Override
+    public void deregister() {
+      requestHandlers.remove(processBundleInstructionId);
+    }
+
+    @Override
+    public void abort() {
+      deregister();
+      // TODO: Abort in-flight state requests. Flag this processBundleInstructionId as a fail.
+    }
+  }
+
+  /**
+   * An inbound {@link StreamObserver} which delegates requests to registered handlers.
+   *
+   * <p>Is only threadsafe if the outbound observer is threadsafe.
+   *
+   * <p>TODO: Handle when the client indicates completion or an error on the inbound stream and
+   * there are pending requests.
+   */
+  private class Inbound implements StreamObserver<StateRequest> {
+    private final StreamObserver<StateResponse> outboundObserver;
+
+    Inbound(StreamObserver<StateResponse> outboundObserver) {
+      this.outboundObserver = outboundObserver;
+    }
+
+    @Override
+    public void onNext(StateRequest request) {
+      StateRequestHandler handler =
+          requestHandlers.getOrDefault(request.getInstructionId(), this::handlerNotFound);
+      try {
+        CompletionStage<StateResponse.Builder> result = handler.handle(request);
+        result.whenComplete(
+            (StateResponse.Builder responseBuilder, Throwable t) ->
+                // note that this is threadsafe if and only if outboundObserver is threadsafe.
+                outboundObserver.onNext(
+                    t == null
+                        ? responseBuilder.setId(request.getId()).build()
+                        : createErrorResponse(request.getId(), t)));
+      } catch (Exception e) {
+        outboundObserver.onNext(createErrorResponse(request.getId(), e));
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      outboundObserver.onCompleted();
+    }
+
+    @Override
+    public void onCompleted() {
+      outboundObserver.onCompleted();
+    }
+
+    private CompletionStage<StateResponse.Builder> handlerNotFound(StateRequest request) {
+      CompletableFuture<StateResponse.Builder> result = new CompletableFuture<>();
+      result.complete(
+          StateResponse.newBuilder()
+              .setError(
+                  String.format(
+                      "Unknown process bundle instruction id '%s'", request.getInstructionId())));
+      return result;
+    }
+
+    private StateResponse createErrorResponse(String id, Throwable t) {
+      return StateResponse.newBuilder().setId(id).setError(getStackTraceAsString(t)).build();
+    }
+  }
+}