You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Xingbo Huang <hx...@gmail.com> on 2021/02/02 03:01:19 UTC

Re: PyFlink Expected IPC message of type schema but got record batch

Hi,

Sorry for the late reply. Thanks for reporting this issue which has been
recorded in FLINK-21208[1]. I will fix it as soon as possible.

[1] https://issues.apache.org/jira/browse/FLINK-21208

Best,
Xingbo

苗红宾 <ho...@126.com> 于2021年1月31日周日 下午3:28写道:

> Hi:
>
> Hope you are good! I have a question for pyflink, details as below:
>
> Feature: Windows of size 10 minutes that slides by 5 minutes for data
> aggregate, then do something, almost 2GB data per window, 1 million data
> items.
>
> Job params:
>
> bin/yarn-session.sh -s 2 -jm 2048 -tm 48768 \
> -Dyarn.containers.vcores=4 \
> -Dtaskmanager.memory.managed.consumer-weights=DATAPROC:30,PYTHON:70 \
> -Dtaskmanager.memory.managed.fraction=0.7 \
> -Dtaskmanager.memory.task.off-heap.size=5120m \
> -nm $task_name -qu $queue -d
>
>
> Exception msg as below:
>
> Traceback (most recent call last):
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 253, in _execute
>     response = task()
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 310, in <lambda>
>     lambda: self.create_worker().do_instruction(request), request)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 515, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 978, in process_bundle
>     element.data)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 218, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 330, in
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 332, in
> apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 195, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 71, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 73, in
> pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.process
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 627, in decode_from_stream
>     yield self._decode_one_batch_from_stream(in_stream,
> in_stream.read_var_int64())
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 638, in _decode_one_batch_from_stream
>     return arrow_to_pandas(self._timezone, self._field_types,
> [next(self._batch_reader)])
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyflink/fn_execution/beam/beam_coder_impl_slow.py",
> line 631, in _load_from_stream
>     reader = pa.ipc.open_stream(stream)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py",
> line 137, in open_stream
>     return RecordBatchStreamReader(source)
>   File
> "/data1/hadoopdata/nodemanager/local/usercache/prod_intl_discount_car/appcache/application_1571902879759_12031/python-dist-2659d300-efda-4c34-863d-d5a3a8aa369f/python-archives/venv.zip/venv/lib/python3.7/site-packages/pyarrow/ipc.py",
> line 61, in __init__
>     self._open(source)
>   File "pyarrow/ipc.pxi", line 352, in
> pyarrow.lib._RecordBatchStreamReader._open
>   File "pyarrow/error.pxi", line 99, in pyarrow.lib.check_status
> OSError: Expected IPC message of type schema but got record batch
>
>
> Many Thanks!
>
>
>
>
>