You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "YufeiLiu (Jira)" <ji...@apache.org> on 2021/01/30 13:23:00 UTC

[jira] [Issue Comment Deleted] (FLINK-21208) pyarrow exception when using window with pandas udaf

     [ https://issues.apache.org/jira/browse/FLINK-21208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

YufeiLiu updated FLINK-21208:
-----------------------------
    Comment: was deleted

(was: I found that have start more than one beam worker, and seems share the same stream, only write schema at begin of stream, the second worker throw this error. Another question is why beam worker didn't exit after job failure.
cc [~dian.fu]])

> pyarrow exception when using window with pandas udaf
> ----------------------------------------------------
>
>                 Key: FLINK-21208
>                 URL: https://issues.apache.org/jira/browse/FLINK-21208
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.12.0, 1.12.1
>            Reporter: YufeiLiu
>            Priority: Major
>
> I write a pyflink demo and execute in local environment, the logic is simple:generate records and aggerate in 100s tumle window, using a pandas udaf.
> But the job failed after several minutes, I don't think it's a resource problem because the amount of data is small, here is the error trace.
> {code:java}
> Caused by: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer.
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1108)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1082)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1213)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$17(StreamTask.java:1202)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:302)
> 	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:184)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:575)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:539)
> 	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: TimerException{java.lang.RuntimeException: Failed to close remote bundle}
> 	... 11 more
> Caused by: java.lang.RuntimeException: Failed to close remote bundle
> 	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:371)
> 	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.flush(BeamPythonFunctionRunner.java:325)
> 	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.invokeFinishBundle(AbstractPythonFunctionOperator.java:291)
> 	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.checkInvokeFinishBundleByTime(AbstractPythonFunctionOperator.java:285)
> 	at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionOperator.lambda$open$0(AbstractPythonFunctionOperator.java:134)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1211)
> 	... 10 more
> Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction 3: Traceback (most recent call last):
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
>     response = task()
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 978, in process_bundle
>     element.data)
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 292, in apache_beam.runners.worker.operations.Operation.process
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 73, in process
>     for value in o.value:
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 625, in decode_from_stream
>     yield self._decode_one_batch_from_stream(in_stream, in_stream.read_var_int64())
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 636, in _decode_one_batch_from_stream
>     return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)])
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 629, in _load_from_stream
>     reader = pa.ipc.open_stream(stream)
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 146, in open_stream
>     return RecordBatchStreamReader(source)
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 62, in __init__
>     self._open(source)
>   File "pyarrow/ipc.pxi", line 360, in pyarrow.lib._RecordBatchStreamReader._open
>   File "pyarrow/error.pxi", line 123, in pyarrow.lib.pyarrow_internal_check_status
>   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
> OSError: Expected IPC message of type schema but got record batch
> 	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> 	at org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
> 	at org.apache.beam.runners.fnexecution.control.SdkHarnessClient$BundleProcessor$ActiveBundle.close(SdkHarnessClient.java:458)
> 	at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory$1.close(DefaultJobBundleFactory.java:547)
> 	at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.finishBundle(BeamPythonFunctionRunner.java:369)
> 	... 15 more
> Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction 3: Traceback (most recent call last):
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 253, in _execute
>     response = task()
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 310, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 978, in process_bundle
>     element.data)
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 292, in apache_beam.runners.worker.operations.Operation.process
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_operations_slow.py", line 73, in process
>     for value in o.value:
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 625, in decode_from_stream
>     yield self._decode_one_batch_from_stream(in_stream, in_stream.read_var_int64())
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 636, in _decode_one_batch_from_stream
>     return arrow_to_pandas(self._timezone, self._field_types, [next(self._batch_reader)])
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py", line 629, in _load_from_stream
>     reader = pa.ipc.open_stream(stream)
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 146, in open_stream
>     return RecordBatchStreamReader(source)
>   File "/Users/lyf/Library/Python/3.7/lib/python/site-packages/pyarrow/ipc.py", line 62, in __init__
>     self._open(source)
>   File "pyarrow/ipc.pxi", line 360, in pyarrow.lib._RecordBatchStreamReader._open
>   File "pyarrow/error.pxi", line 123, in pyarrow.lib.pyarrow_internal_check_status
>   File "pyarrow/error.pxi", line 100, in pyarrow.lib.check_status
> OSError: Expected IPC message of type schema but got record batch
> 	at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:177)
> 	at org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157)
> 	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:251)
> 	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33)
> 	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76)
> 	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailableInternal(ServerCallImpl.java:309)
> 	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:292)
> 	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:782)
> 	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
> 	at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	... 1 more
> {code}
> And my test code:
> {code:python}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import *
> from pyflink.table.udf import udaf, AggregateFunction
> from pyflink.table.window import Tumble
> class MyTestAggregateFunction(AggregateFunction):
>     def get_value(self, accumulator):
>         return accumulator[0]
>     def create_accumulator(self):
>         return Row(0)
>     def accumulate(self, accumulator, *args):
>         accumulator[0] = len(args[0])
>     def get_result_type(self):
>         return DataTypes.BIGINT()
> if __name__ == '__main__':
>     env = StreamExecutionEnvironment.get_execution_environment()
>     f_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
>     t_env = StreamTableEnvironment.create(env, None, f_s_settings)
>     my_udaf = udaf(MyTestAggregateFunction(), func_type="pandas")
>     t_env.register_function('my_udaf', my_udaf)
>     t_env.sql_update("""
>     CREATE TABLE `source_table` (
>         `header` STRING,
>         ts AS PROCTIME()
>     ) WITH (
>           'connector' = 'datagen',
>           'rows-per-second' = '100'
>     )
>     """)
>     t_env.sql_update("""
>     CREATE TABLE `sink_table` (
>         `content` BIGINT,
>         `wstart` TIMESTAMP(3)
>     ) WITH (
>         'connector' = 'print'
>     )
>     """)
>     t_env.scan("source_table") \
>         .window(Tumble.over("100.second").on("ts").alias("w")) \
>         .group_by('w') \
>         .select("my_udaf(header), w.start")\
>         .insert_into("sink_table")
>     t_env.execute("test_job")
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)