You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Hequn Cheng (Jira)" <ji...@apache.org> on 2020/05/27 08:42:00 UTC

[jira] [Created] (FLINK-17959) Exception: "CANCELLED: call already cancelled" is thrown when run python udf

Hequn Cheng created FLINK-17959:
-----------------------------------

             Summary: Exception: "CANCELLED: call already cancelled" is thrown when run python udf
                 Key: FLINK-17959
                 URL: https://issues.apache.org/jira/browse/FLINK-17959
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.10.1, 1.11.0
            Reporter: Hequn Cheng


The exception is thrown when running Python UDF:
{code:java}
May 27, 2020 3:20:49 PM org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed@3960b30e
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status.asRuntimeException(Status.java:524)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:366)
	at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onError(GrpcStateService.java:145)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
	at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
{code}

The job can output the right results however it seems something goes wrong during the shutdown procedure.

You can reproduce the exception with the following code(note: the exception happens occasionally):
{code}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

t_env.register_function("add", add)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('a', DataTypes.BIGINT())
                 .field('b', DataTypes.BIGINT())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field('sum', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('sum', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

t_env.from_path('mySource')\
    .select("add(a, b)") \
    .insert_into('mySink')

t_env.execute("tutorial_job")
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)