You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by "casel.chen" <ca...@126.com> on 2023/04/15 09:02:12 UTC

ValueError: unsupported pickle protocol: 5

我在尝试提交pyflink作业到k8s,按照这篇文章[1]介绍操作的,pyflink镜像文件[2],flink版本是1.15.2,执行wordcount jar作业没遇到问题,而在提交pyflink作业时发现作业失败了,日志显示如下。我本地安装的python 3.7.9和pyflink镜像中的版本是一致的,
请问是不是pickle包版本有问题?
怎样查看当前pickle包版本号是多少?
期望用的pickle包版本号是多少?
如何将当前pickle包安装成期望的版本?


./bin/flink run \
-m localhost:8081 \
-py ./examples/python/table/word_count.py


2023-04-1516:52:27
org.apache.flink.runtime.taskmanager.AsynchronousException: Caughtexceptionwhile processing timer.
at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1535)
at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1510)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$21(StreamTask.java:1639)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.base/java.lang.Thread.run(UnknownSource)
Causedby: TimerException{java.lang.RuntimeException: Errorwhile waiting forBeamPythonFunctionRunner flush}
... 14 more
Causedby: java.lang.RuntimeException: Errorwhile waiting forBeamPythonFunctionRunner flush
at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.invokeFinishBundle(AbstractExternalPythonFunctionOperator.java:106)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:299)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:115)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1648)
... 13 more
Causedby: java.lang.RuntimeException: Failed to close remote bundle
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:382)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:366)
at org.apache.flink.streaming.api.operators.python.AbstractExternalPythonFunctionOperator.lambda$invokeFinishBundle$0(AbstractExternalPythonFunctionOperator.java:85)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(UnknownSource)
at java.base/java.util.concurrent.FutureTask.run(UnknownSource)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(UnknownSource)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(UnknownSource)
... 1 more
Causedby: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received fromSDK harness for instruction 1: Traceback (most recent call last):
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 638, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 467, inget
self.data_channel_factory)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 925, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp>
get_operation(transform_id))) for transform_id in sorted(
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 909, in get_operation
transform_id, transform_consumers)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 58, in create_table_function
table_operations.TableFunctionOperation)
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 206, in _create_user_defined_function_operation
internal_operation_cls)
File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 131, in __init__
super(TableFunctionOperation, self).__init__(serialized_fn)
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 80, in __init__
self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 141, in generate_func
operation_utils.extract_user_defined_function(serialized_fn.udfs[0])
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 143, in extract_user_defined_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
ValueError: unsupported pickle protocol: 5


at java.base/java.util.concurrent.CompletableFuture.reportGet(UnknownSource)
at java.base/java.util.concurrent.CompletableFuture.get(UnknownSource)
at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:60)
at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:504)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:555)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:380)
... 7 more
Causedby: java.lang.RuntimeException: Error received fromSDK harness for instruction 1: Traceback (most recent call last):
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 289, in _execute
response = task()
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 362, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 607, in do_instruction
getattr(request, request_type), request.instruction_id)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 638, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 467, inget
self.data_channel_factory)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 868, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 925, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)])
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in <listcomp>
get_operation(transform_id))) for transform_id in sorted(
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 904, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in wrapper
result = cache[args] = func(*args)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 909, in get_operation
transform_id, transform_consumers)
File"/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1198, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 58, in create_table_function
table_operations.TableFunctionOperation)
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_operations.py", line 206, in _create_user_defined_function_operation
internal_operation_cls)
File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 198, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.__init__
File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 129, in pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 202, in pyflink.fn_execution.beam.beam_operations_fast.StatelessFunctionOperation.generate_operation
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 131, in __init__
super(TableFunctionOperation, self).__init__(serialized_fn)
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 80, in __init__
self.func, self.user_defined_funcs = self.generate_func(serialized_fn)
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/table/operations.py", line 141, in generate_func
operation_utils.extract_user_defined_function(serialized_fn.udfs[0])
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/utils/operation_utils.py", line 143, in extract_user_defined_function
user_defined_func = pickle.loads(user_defined_function_proto.payload)
File"/usr/local/lib/python3.7/site-packages/pyflink/fn_execution/pickle.py", line 29, in loads
return cloudpickle.loads(payload)
ValueError: unsupported pickle protocol: 5


at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:180)
at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:160)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
... 3 more


[1] https://blog.devgenius.io/playing-pyflink-from-scratch-65c18908c366
[2] https://hub.docker.com/r/wirelessr/pyflink

Re: Re: Re: sink mysql id自增表数据会丢失

Posted by Shammon FY <zj...@gmail.com>.
退订请发送任意邮件到 user-zh-unsubscribe@flink.apache.org
<us...@flink.apache.org>,可以参考
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list

On Wed, Apr 19, 2023 at 9:31 AM 王国成 <ta...@163.com> wrote:

> 退订
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-04-19 09:15:09,"Shammon FY" <zj...@gmail.com> 写道:
> >如果想让mysql生成自增主键,可以在flink ddl的table里不增加主键字段,然后flink作业直接写入数据到table就可以了
> >
> >On Tue, Apr 18, 2023 at 5:38 PM Jeff <zi...@126.com> wrote:
> >
> >> 在sink时指定字段不可以不包括自增主键的列。
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2023-04-17 07:29:16,"Shammon FY" <zj...@gmail.com> 写道:
> >> >Hi
> >> >
> >> >如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试?
> >> >
> >> >On Sun, Apr 16, 2023 at 7:58 PM Jeff <zi...@126.com> wrote:
> >> >
> >> >> sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。
> >> >>
> >> >>
> >> >>  mysql内表ddl:
> >> >>
> >> >> create table test (id bigint primary key auto_increment , passport
> >> >> varchar);
> >> >>
> >> >>
> >> >> flink sql:
> >> >> insert into mysql_catalog.test select 0, passport from source_table;
> >> >>
> >> >> 之所以select 0是表示使用物理表的自增值。
> >>
>

Re:Re: Re: sink mysql id自增表数据会丢失

Posted by 王国成 <ta...@163.com>.
退订











在 2023-04-19 09:15:09,"Shammon FY" <zj...@gmail.com> 写道:
>如果想让mysql生成自增主键,可以在flink ddl的table里不增加主键字段,然后flink作业直接写入数据到table就可以了
>
>On Tue, Apr 18, 2023 at 5:38 PM Jeff <zi...@126.com> wrote:
>
>> 在sink时指定字段不可以不包括自增主键的列。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2023-04-17 07:29:16,"Shammon FY" <zj...@gmail.com> 写道:
>> >Hi
>> >
>> >如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试?
>> >
>> >On Sun, Apr 16, 2023 at 7:58 PM Jeff <zi...@126.com> wrote:
>> >
>> >> sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。
>> >>
>> >>
>> >>  mysql内表ddl:
>> >>
>> >> create table test (id bigint primary key auto_increment , passport
>> >> varchar);
>> >>
>> >>
>> >> flink sql:
>> >> insert into mysql_catalog.test select 0, passport from source_table;
>> >>
>> >> 之所以select 0是表示使用物理表的自增值。
>>

Re: Re: sink mysql id自增表数据会丢失

Posted by Shammon FY <zj...@gmail.com>.
如果想让mysql生成自增主键,可以在flink ddl的table里不增加主键字段,然后flink作业直接写入数据到table就可以了

On Tue, Apr 18, 2023 at 5:38 PM Jeff <zi...@126.com> wrote:

> 在sink时指定字段不可以不包括自增主键的列。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2023-04-17 07:29:16,"Shammon FY" <zj...@gmail.com> 写道:
> >Hi
> >
> >如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试?
> >
> >On Sun, Apr 16, 2023 at 7:58 PM Jeff <zi...@126.com> wrote:
> >
> >> sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。
> >>
> >>
> >>  mysql内表ddl:
> >>
> >> create table test (id bigint primary key auto_increment , passport
> >> varchar);
> >>
> >>
> >> flink sql:
> >> insert into mysql_catalog.test select 0, passport from source_table;
> >>
> >> 之所以select 0是表示使用物理表的自增值。
>

Re:Re: sink mysql id自增表数据会丢失

Posted by Jeff <zi...@126.com>.
在sink时指定字段不可以不包括自增主键的列。

















在 2023-04-17 07:29:16,"Shammon FY" <zj...@gmail.com> 写道:
>Hi
>
>如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试?
>
>On Sun, Apr 16, 2023 at 7:58 PM Jeff <zi...@126.com> wrote:
>
>> sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。
>>
>>
>>  mysql内表ddl:
>>
>> create table test (id bigint primary key auto_increment , passport
>> varchar);
>>
>>
>> flink sql:
>> insert into mysql_catalog.test select 0, passport from source_table;
>>
>> 之所以select 0是表示使用物理表的自增值。

Re: sink mysql id自增表数据会丢失

Posted by Shammon FY <zj...@gmail.com>.
Hi

如果想使用mysql的自增主键,应该是在插入的时候不要写自增主键的列吧,可以在insert的时候直接指定需要插入的列试试?

On Sun, Apr 16, 2023 at 7:58 PM Jeff <zi...@126.com> wrote:

> sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。
>
>
>  mysql内表ddl:
>
> create table test (id bigint primary key auto_increment , passport
> varchar);
>
>
> flink sql:
> insert into mysql_catalog.test select 0, passport from source_table;
>
> 之所以select 0是表示使用物理表的自增值。

sink mysql id自增表数据会丢失

Posted by Jeff <zi...@126.com>.
sink数据到mysql catalog内的表时,当表只一个自增主键id无其唯一索引时,同一批写入的数据只会保存一条,其它数据会丢失。 


 mysql内表ddl:

create table test (id bigint primary key auto_increment , passport varchar);


flink sql:
insert into mysql_catalog.test select 0, passport from source_table;
 
之所以select 0是表示使用物理表的自增值。