You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by di...@apache.org on 2020/06/03 15:15:14 UTC

[flink] branch release-1.10 updated (ffc3d86 -> c70460ad)

This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from ffc3d86  [FLINK-18012] Deactivate slot timeout when calling TaskSlotTable.tryMarkSlotActive
     new bd2ca49  [FLINK-17959][checkstyle] Exclude all beam classes
     new eafc602  [FLINK-17959][python] Port Beam GrpcStateService class to flink-python module
     new c70460ad [FLINK-17959][python] Fix the 'call already cancelled' exception when executing Python UDF

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../fnexecution/state/GrpcStateService.java        | 171 +++++++++++++++++++++
 tools/maven/suppressions.xml                       |   3 +-
 2 files changed, 172 insertions(+), 2 deletions(-)
 create mode 100644 flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java


[flink] 01/03: [FLINK-17959][checkstyle] Exclude all beam classes

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bd2ca499263a9b5819d3cc2320f59ad5cb7ef5e9
Author: Dian Fu <di...@apache.org>
AuthorDate: Wed Jun 3 17:22:39 2020 +0800

    [FLINK-17959][checkstyle] Exclude all beam classes
    
    This closes #12459.
---
 tools/maven/suppressions.xml | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 0a18753..117dd85 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -29,8 +29,7 @@ under the License.
 		<suppress files="org[\\/]apache[\\/]calcite.*" checks="[a-zA-Z0-9]*"/>
 
 		<!-- Temporarily fix TM Metaspace memory leak caused by Apache Beam sdk harness. -->
-		<suppress files="org[\\/]apache[\\/]beam[\\/]vendor[\\/]grpc[\\/]v1p21p0[\\/]io[\\/]netty[\\/]buffer.*.java" checks="[a-zA-Z0-9]*"/>
-		<suppress files="org[\\/]apache[\\/]beam[\\/]runners[\\/]fnexecution[\\/]environment.*.java" checks="[a-zA-Z0-9]*"/>
+		<suppress files="org[\\/]apache[\\/]beam.*.java" checks="[a-zA-Z0-9]*"/>
 
 		<!-- Python streaming API follows python naming conventions -->
 		<suppress


[flink] 03/03: [FLINK-17959][python] Fix the 'call already cancelled' exception when executing Python UDF

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c70460ad878a5c33aa4a16a054e21e94df524d81
Author: Dian Fu <di...@apache.org>
AuthorDate: Wed Jun 3 16:45:18 2020 +0800

    [FLINK-17959][python] Fix the 'call already cancelled' exception when executing Python UDF
    
    This closes #12459.
---
 .../org/apache/beam/runners/fnexecution/state/GrpcStateService.java  | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
index 9081778..010232a 100644
--- a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
+++ b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -142,7 +142,10 @@ public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase
 
     @Override
     public void onError(Throwable t) {
-      outboundObserver.onCompleted();
+      if (!t.getMessage().contains("cancelled before receiving half close")) {
+        // ignore the exception "cancelled before receiving half close" as we don't care about it.
+        outboundObserver.onError(t);
+      }
     }
 
     @Override


[flink] 02/03: [FLINK-17959][python] Port Beam GrpcStateService class to flink-python module

Posted by di...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit eafc602022668af68e520e734f9faa15eaa6ac61
Author: Dian Fu <di...@apache.org>
AuthorDate: Wed Jun 3 17:24:26 2020 +0800

    [FLINK-17959][python] Port Beam GrpcStateService class to flink-python module
    
    This closes #12459.
---
 .../fnexecution/state/GrpcStateService.java        | 168 +++++++++++++++++++++
 1 file changed, 168 insertions(+)

diff --git a/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
new file mode 100644
index 0000000..9081778
--- /dev/null
+++ b/flink-python/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.fnexecution.state;
+
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables.getStackTraceAsString;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateResponse;
+import org.apache.beam.model.fnexecution.v1.BeamFnStateGrpc;
+import org.apache.beam.runners.fnexecution.FnService;
+import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCallStreamObserver;
+import org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.StreamObserver;
+
+/** An implementation of the Beam Fn State service. */
+public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase
+    implements StateDelegator, FnService {
+  /** Create a new {@link GrpcStateService}. */
+  public static GrpcStateService create() {
+    return new GrpcStateService();
+  }
+
+  private final ConcurrentLinkedQueue<Inbound> clients;
+  private final ConcurrentMap<String, StateRequestHandler> requestHandlers;
+
+  private GrpcStateService() {
+    this.requestHandlers = new ConcurrentHashMap<>();
+    this.clients = new ConcurrentLinkedQueue<>();
+  }
+
+  @Override
+  public void close() throws Exception {
+    Exception thrown = null;
+    for (Inbound inbound : clients) {
+      try {
+        // the call may be cancelled because the sdk harness hung up
+        // (we terminate the environment before terminating the service endpoints)
+        if (inbound.outboundObserver instanceof ServerCallStreamObserver) {
+          if (((ServerCallStreamObserver) inbound.outboundObserver).isCancelled()) {
+            // skip to avoid call already closed exception
+            continue;
+          }
+        }
+        inbound.outboundObserver.onCompleted();
+      } catch (Exception t) {
+        if (thrown == null) {
+          thrown = t;
+        } else {
+          thrown.addSuppressed(t);
+        }
+      }
+    }
+    if (thrown != null) {
+      throw thrown;
+    }
+  }
+
+  @Override
+  public StreamObserver<StateRequest> state(StreamObserver<StateResponse> responseObserver) {
+    Inbound rval = new Inbound(responseObserver);
+    clients.add(rval);
+    return rval;
+  }
+
+  @Override
+  public StateDelegator.Registration registerForProcessBundleInstructionId(
+      String processBundleInstructionId, StateRequestHandler handler) {
+    requestHandlers.putIfAbsent(processBundleInstructionId, handler);
+    return new Registration(processBundleInstructionId);
+  }
+
+  private class Registration implements StateDelegator.Registration {
+    private final String processBundleInstructionId;
+
+    private Registration(String processBundleInstructionId) {
+      this.processBundleInstructionId = processBundleInstructionId;
+    }
+
+    @Override
+    public void deregister() {
+      requestHandlers.remove(processBundleInstructionId);
+    }
+
+    @Override
+    public void abort() {
+      deregister();
+      // TODO: Abort in-flight state requests. Flag this processBundleInstructionId as a fail.
+    }
+  }
+
+  /**
+   * An inbound {@link StreamObserver} which delegates requests to registered handlers.
+   *
+   * <p>Is only threadsafe if the outbound observer is threadsafe.
+   *
+   * <p>TODO: Handle when the client indicates completion or an error on the inbound stream and
+   * there are pending requests.
+   */
+  private class Inbound implements StreamObserver<StateRequest> {
+    private final StreamObserver<StateResponse> outboundObserver;
+
+    Inbound(StreamObserver<StateResponse> outboundObserver) {
+      this.outboundObserver = outboundObserver;
+    }
+
+    @Override
+    public void onNext(StateRequest request) {
+      StateRequestHandler handler =
+          requestHandlers.getOrDefault(request.getInstructionReference(), this::handlerNotFound);
+      try {
+        CompletionStage<StateResponse.Builder> result = handler.handle(request);
+        result.whenComplete(
+            (StateResponse.Builder responseBuilder, Throwable t) ->
+                // note that this is threadsafe if and only if outboundObserver is threadsafe.
+                outboundObserver.onNext(
+                    t == null
+                        ? responseBuilder.setId(request.getId()).build()
+                        : createErrorResponse(request.getId(), t)));
+      } catch (Exception e) {
+        outboundObserver.onNext(createErrorResponse(request.getId(), e));
+      }
+    }
+
+    @Override
+    public void onError(Throwable t) {
+      outboundObserver.onCompleted();
+    }
+
+    @Override
+    public void onCompleted() {
+      outboundObserver.onCompleted();
+    }
+
+    private CompletionStage<StateResponse.Builder> handlerNotFound(StateRequest request) {
+      CompletableFuture<StateResponse.Builder> result = new CompletableFuture<>();
+      result.complete(
+          StateResponse.newBuilder()
+              .setError(
+                  String.format(
+                      "Unknown process bundle instruction id '%s'",
+                      request.getInstructionReference())));
+      return result;
+    }
+
+    private StateResponse createErrorResponse(String id, Throwable t) {
+      return StateResponse.newBuilder().setId(id).setError(getStackTraceAsString(t)).build();
+    }
+  }
+}