You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Hequn Cheng (Jira)" <ji...@apache.org> on 2020/05/27 08:42:00 UTC
[jira] [Created] (FLINK-17959) Exception: "CANCELLED: call already
cancelled" is thrown when run python udf
Hequn Cheng created FLINK-17959:
-----------------------------------
Summary: Exception: "CANCELLED: call already cancelled" is thrown when run python udf
Key: FLINK-17959
URL: https://issues.apache.org/jira/browse/FLINK-17959
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.10.1, 1.11.0
Reporter: Hequn Cheng
The exception is thrown when running Python UDF:
{code:java}
May 27, 2020 3:20:49 PM org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor run
SEVERE: Exception while executing runnable org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed@3960b30e
org.apache.beam.vendor.grpc.v1p21p0.io.grpc.StatusRuntimeException: CANCELLED: call already cancelled
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Status.asRuntimeException(Status.java:524)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$ServerCallStreamObserverImpl.onCompleted(ServerCalls.java:366)
at org.apache.beam.runners.fnexecution.state.GrpcStateService$Inbound.onError(GrpcStateService.java:145)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:270)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:337)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:793)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p21p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
The job can output the right results however it seems something goes wrong during the shutdown procedure.
You can reproduce the exception with the following code(note: the exception happens occasionally):
{code}
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.udf import udf
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
t_env.register_function("add", add)
t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')
t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')
t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')
t_env.execute("tutorial_job")
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)