You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2018/12/04 13:34:34 UTC

[beam] branch release-2.9.0 updated: [BEAM-6111] Fix flaky PortableTimersExecutionTest

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.9.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.9.0 by this push:
     new 3b6ce57  [BEAM-6111] Fix flaky PortableTimersExecutionTest
3b6ce57 is described below

commit 3b6ce57116b11ffc37cc42409e57d5723336eedd
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Fri Nov 30 16:24:10 2018 +0100

    [BEAM-6111] Fix flaky PortableTimersExecutionTest
    
    This is caused by executing the state completion request in GrpcStateService
    asynchronously with the default threadpool. I think this wasn't intended because
    whenCompleteAsync was used instead of whenComplete. The default thread pool in
    the test is Flink's thread pool which doesn't take care to log exceptions.
    
    https://github.com/apache/beam/blob/c526f6bf62a1d63c7181eb7252c134e42d5c8677/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java#L82
    
    With the current implementation the StateRequest will always be a completed
    CompleteableFuture anyways, so there is no need to schedule asynchronously.
    
    The following exception was thrown here:
    https://github.com/apache/beam/blob/c526f6bf62a1d63c7181eb7252c134e42d5c8677/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java#L85
    
    ```
    java.lang.IllegalStateException: sendHeaders has already been called
    ```
---
 .../org/apache/beam/runners/fnexecution/state/GrpcStateService.java     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
index eee0305..88738f3 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/GrpcStateService.java
@@ -128,7 +128,7 @@ public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase
           requestHandlers.getOrDefault(request.getInstructionReference(), this::handlerNotFound);
       try {
         CompletionStage<StateResponse.Builder> result = handler.handle(request);
-        result.whenCompleteAsync(
+        result.whenComplete(
             (StateResponse.Builder responseBuilder, Throwable t) ->
                 // note that this is threadsafe if and only if outboundObserver is threadsafe.
                 outboundObserver.onNext(