You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Paweł Kordek <pa...@farfetch.com> on 2019/09/11 13:33:06 UTC

Python errors when using batch+windows+textio

Hi

I was developing a simple pipeline where I aggregate records by key and sum
values for a predefined window. I was getting some errors, and after
checking, I am getting exactly the same issues when running Wikipedia
example from the Beam repo. The output is as follows:
-------------------------------------------
INFO:root:Missing pipeline option (runner). Executing pipeline using the
default runner: DirectRunner.
INFO:root:==================== <function annotate_downstream_side_inputs at
0x7f333fc1fe60> ====================
INFO:root:==================== <function fix_side_input_pcoll_coders at
0x7f333fc1ff80> ====================
INFO:root:==================== <function lift_combiners at 0x7f333fc1d050>
====================
INFO:root:==================== <function expand_sdf at 0x7f333fc1d0e0>
====================
INFO:root:==================== <function expand_gbk at 0x7f333fc1d170>
====================
INFO:root:==================== <function sink_flattens at 0x7f333fc1d290>
====================
INFO:root:==================== <function greedily_fuse at 0x7f333fc1d320>
====================
INFO:root:==================== <function read_to_impulse at 0x7f333fc1d3b0>
====================
INFO:root:==================== <function impulse_to_input at
0x7f333fc1d440> ====================
INFO:root:==================== <function inject_timer_pcollections at
0x7f333fc1d5f0> ====================
INFO:root:==================== <function sort_stages at 0x7f333fc1d680>
====================
INFO:root:==================== <function window_pcollection_coders at
0x7f333fc1d710> ====================
INFO:root:Running
((((((ref_AppliedPTransform_ReadFromText/Read_3)+(ref_AppliedPTransform_ComputeTopSessions/ExtractUserAndTimestamp_5))+(ref_AppliedPTransform_ComputeTopSessions/Filter(<lambda
at
top_wikipedia_sessions.py:127>)_6))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/ComputeSessionsWindow_8))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/PerElement/PerElement:PairWithVoid_10))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Precombine))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Write)
INFO:root:Running
(((((((ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Read)+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Merge))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/SessionsToStrings_18))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/TopPerMonthWindow_20))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/KeyWithVoid_22))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Precombine))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Write)
INFO:root:Running
(((ref_AppliedPTransform_WriteToText/Write/WriteImpl/DoOnce/Read_36)+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/InitializeWrite_37))+(ref_PCollection_PCollection_19/Write))+(ref_PCollection_PCollection_20/Write)
INFO:root:Running
((((((((ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Read)+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Merge))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/UnKey_30))+(ref_AppliedPTransform_ComputeTopSessions/FormatOutput_31))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WriteBundles_38))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/Pair_39))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WindowInto(WindowIntoFn)_40))+(WriteToText/Write/WriteImpl/GroupByKey/Write)
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 829, in
apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 403, in
apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 406, in
apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 982, in
apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
  File "apache_beam/runners/worker/operations.py", line 142, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 122, in
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py", line 196, in
apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py", line 214, in
apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py", line 1014, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1030, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 814, in
apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
  File "apache_beam/coders/coder_impl.py", line 828, in
apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 145, in
apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 494, in
apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
TypeError: Cannot convert GlobalWindow to
apache_beam.utils.windowed_value._IntervalWindowBase

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "top_wikipedia_sessions.py", line 171, in <module>
    run()
  File "top_wikipedia_sessions.py", line 166, in run
    | WriteToText(known_args.output))
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 426, in __exit__
    self.run().wait_until_finish()
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 406, in run
    self._options).run(False)
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
line 419, in run
    return self.runner.run_pipeline(self, self._options)
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
line 129, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 366, in run_pipeline
    default_environment=self._default_environment))
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 373, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 455, in run_stages
    stage_context.safe_coders)
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 733, in _run_stage
    result, splits = bundle_manager.process_bundle(data_input, data_output)
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 1688, in process_bundle
    part, expected_outputs), part_inputs):
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
line 598, in result_iterator
    yield fs.pop().result()
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
line 435, in result
    return self.__get_result()
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
line 384, in __get_result
    raise self._exception
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/thread.py",
line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 1688, in <lambda>
    part, expected_outputs), part_inputs):
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 1626, in process_bundle
    result_future =
self._worker_handler.control_conn.push(process_bundle_req)
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
line 1080, in push
    response = self.worker.do_instruction(request)
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 343, in do_instruction
    request.instruction_id)
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 369, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 598, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 611, in
apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 612, in
apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/worker/operations.py", line 613, in
apache_beam.runners.worker.operations.DoOperation.finish
  File "apache_beam/runners/common.py", line 847, in
apache_beam.runners.common.DoFnRunner.finish
  File "apache_beam/runners/common.py", line 831, in
apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 872, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
  File
"/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/future/utils/__init__.py",
line 421, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 829, in
apache_beam.runners.common.DoFnRunner._invoke_bundle_method
  File "apache_beam/runners/common.py", line 403, in
apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 406, in
apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
  File "apache_beam/runners/common.py", line 982, in
apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
  File "apache_beam/runners/worker/operations.py", line 142, in
apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 122, in
apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
  File "apache_beam/runners/worker/opcounters.py", line 196, in
apache_beam.runners.worker.opcounters.OperationCounters.update_from
  File "apache_beam/runners/worker/opcounters.py", line 214, in
apache_beam.runners.worker.opcounters.OperationCounters.do_sample
  File "apache_beam/coders/coder_impl.py", line 1014, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 1030, in
apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 814, in
apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
  File "apache_beam/coders/coder_impl.py", line 828, in
apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 145, in
apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
  File "apache_beam/coders/coder_impl.py", line 494, in
apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
TypeError: Cannot convert GlobalWindow to
apache_beam.utils.windowed_value._IntervalWindowBase [while running
'WriteToText/Write/WriteImpl/WriteBundles']
-------------------------------------------------------

To run it I've downloaded a single json with Wiki data and run is as
follows (running from BEAM_REPO/sdks/python/apache_beam/examples/complete

*python top_wikipedia_sessions.py --input
/data/wiki/wiki_data-000000000492.json --output /tmp/beam/wiki*

This fails somewhere close to the end, in fact I can find some results
(possibly complete) in
*/tmp/beam/beam-temp-wiki-5971176ad49411e9b16448ba4ef75ccc/c18aa9d5-221f-4dc3-bc1a-83c101ee54ba.wiki*

I tried to find the exact cause in the code but I don't understand Beam's
codebase enough, would appreciate some hints/explanations. There is a question
on SO
<https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python/54791913#54791913>,
but that doesn't help much in explaining why this error is observed.

Thanks!
Pawel

-- 


This email and any files transmitted
with it are confidential and 
intended solely for the use of the individual or
entity to whom they are 
addressed. If you have received this email in error
please notify the 
system manager. This message contains confidential
information and is 
intended only for the individual named. If you are not the
named addressee 
you should not disseminate, distribute or copy this e-mail.
Please notify 
the sender immediately by e-mail if you have received this e-mail
by 
mistake and delete this e-mail from your system. If you are not the 
intended
recipient you are notified that disclosing, copying, distributing 
or taking any
action in reliance on the contents of this information is 
strictly prohibited.

Re: Python errors when using batch+windows+textio

Posted by Paweł Kordek <pa...@farfetch.com>.
Hi Kyle

I'm on 2.15. Thanks for pointing me to the JIRA, I'll watch it and also try
to see what's causing the problem.

Best regards
Pawel

On Fri, 13 Sep 2019 at 01:43, Kyle Weaver <kc...@google.com> wrote:

> Hi Pawel, could you tell us which version of the Beam Python SDK you are
> using?
>
> For the record, this looks like a known issue:
> https://issues.apache.org/jira/browse/BEAM-6860
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com
>
>
> On Wed, Sep 11, 2019 at 6:33 AM Paweł Kordek <pa...@farfetch.com>
> wrote:
>
>> Hi
>>
>> I was developing a simple pipeline where I aggregate records by key and
>> sum values for a predefined window. I was getting some errors, and after
>> checking, I am getting exactly the same issues when running Wikipedia
>> example from the Beam repo. The output is as follows:
>> -------------------------------------------
>> INFO:root:Missing pipeline option (runner). Executing pipeline using the
>> default runner: DirectRunner.
>> INFO:root:==================== <function annotate_downstream_side_inputs
>> at 0x7f333fc1fe60> ====================
>> INFO:root:==================== <function fix_side_input_pcoll_coders at
>> 0x7f333fc1ff80> ====================
>> INFO:root:==================== <function lift_combiners at
>> 0x7f333fc1d050> ====================
>> INFO:root:==================== <function expand_sdf at 0x7f333fc1d0e0>
>> ====================
>> INFO:root:==================== <function expand_gbk at 0x7f333fc1d170>
>> ====================
>> INFO:root:==================== <function sink_flattens at 0x7f333fc1d290>
>> ====================
>> INFO:root:==================== <function greedily_fuse at 0x7f333fc1d320>
>> ====================
>> INFO:root:==================== <function read_to_impulse at
>> 0x7f333fc1d3b0> ====================
>> INFO:root:==================== <function impulse_to_input at
>> 0x7f333fc1d440> ====================
>> INFO:root:==================== <function inject_timer_pcollections at
>> 0x7f333fc1d5f0> ====================
>> INFO:root:==================== <function sort_stages at 0x7f333fc1d680>
>> ====================
>> INFO:root:==================== <function window_pcollection_coders at
>> 0x7f333fc1d710> ====================
>> INFO:root:Running
>> ((((((ref_AppliedPTransform_ReadFromText/Read_3)+(ref_AppliedPTransform_ComputeTopSessions/ExtractUserAndTimestamp_5))+(ref_AppliedPTransform_ComputeTopSessions/Filter(<lambda
>> at
>> top_wikipedia_sessions.py:127>)_6))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/ComputeSessionsWindow_8))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/PerElement/PerElement:PairWithVoid_10))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Precombine))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Write)
>> INFO:root:Running
>> (((((((ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Read)+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Merge))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/SessionsToStrings_18))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/TopPerMonthWindow_20))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/KeyWithVoid_22))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Precombine))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Write)
>> INFO:root:Running
>> (((ref_AppliedPTransform_WriteToText/Write/WriteImpl/DoOnce/Read_36)+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/InitializeWrite_37))+(ref_PCollection_PCollection_19/Write))+(ref_PCollection_PCollection_20/Write)
>> INFO:root:Running
>> ((((((((ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Read)+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Merge))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/UnKey_30))+(ref_AppliedPTransform_ComputeTopSessions/FormatOutput_31))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WriteBundles_38))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/Pair_39))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WindowInto(WindowIntoFn)_40))+(WriteToText/Write/WriteImpl/GroupByKey/Write)
>> Traceback (most recent call last):
>>   File "apache_beam/runners/common.py", line 829, in
>> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>>   File "apache_beam/runners/common.py", line 403, in
>> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>>   File "apache_beam/runners/common.py", line 406, in
>> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>>   File "apache_beam/runners/common.py", line 982, in
>> apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
>>   File "apache_beam/runners/worker/operations.py", line 142, in
>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>>   File "apache_beam/runners/worker/operations.py", line 122, in
>> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>>   File "apache_beam/runners/worker/opcounters.py", line 196, in
>> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>>   File "apache_beam/runners/worker/opcounters.py", line 214, in
>> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>>   File "apache_beam/coders/coder_impl.py", line 1014, in
>> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>>   File "apache_beam/coders/coder_impl.py", line 1030, in
>> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>>   File "apache_beam/coders/coder_impl.py", line 814, in
>> apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
>>   File "apache_beam/coders/coder_impl.py", line 828, in
>> apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
>>   File "apache_beam/coders/coder_impl.py", line 145, in
>> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>>   File "apache_beam/coders/coder_impl.py", line 494, in
>> apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
>> TypeError: Cannot convert GlobalWindow to
>> apache_beam.utils.windowed_value._IntervalWindowBase
>>
>> During handling of the above exception, another exception occurred:
>>
>> Traceback (most recent call last):
>>   File "top_wikipedia_sessions.py", line 171, in <module>
>>     run()
>>   File "top_wikipedia_sessions.py", line 166, in run
>>     | WriteToText(known_args.output))
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 426, in __exit__
>>     self.run().wait_until_finish()
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 406, in run
>>     self._options).run(False)
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
>> line 419, in run
>>     return self.runner.run_pipeline(self, self._options)
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
>> line 129, in run_pipeline
>>     return runner.run_pipeline(pipeline, options)
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>> line 366, in run_pipeline
>>     default_environment=self._default_environment))
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>> line 373, in run_via_runner_api
>>     return self.run_stages(stage_context, stages)
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>> line 455, in run_stages
>>     stage_context.safe_coders)
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>> line 733, in _run_stage
>>     result, splits = bundle_manager.process_bundle(data_input,
>> data_output)
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>> line 1688, in process_bundle
>>     part, expected_outputs), part_inputs):
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
>> line 598, in result_iterator
>>     yield fs.pop().result()
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
>> line 435, in result
>>     return self.__get_result()
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
>> line 384, in __get_result
>>     raise self._exception
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/thread.py",
>> line 57, in run
>>     result = self.fn(*self.args, **self.kwargs)
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>> line 1688, in <lambda>
>>     part, expected_outputs), part_inputs):
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>> line 1626, in process_bundle
>>     result_future =
>> self._worker_handler.control_conn.push(process_bundle_req)
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
>> line 1080, in push
>>     response = self.worker.do_instruction(request)
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 343, in do_instruction
>>     request.instruction_id)
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
>> line 369, in process_bundle
>>     bundle_processor.process_bundle(instruction_id))
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 598, in process_bundle
>>     op.finish()
>>   File "apache_beam/runners/worker/operations.py", line 611, in
>> apache_beam.runners.worker.operations.DoOperation.finish
>>   File "apache_beam/runners/worker/operations.py", line 612, in
>> apache_beam.runners.worker.operations.DoOperation.finish
>>   File "apache_beam/runners/worker/operations.py", line 613, in
>> apache_beam.runners.worker.operations.DoOperation.finish
>>   File "apache_beam/runners/common.py", line 847, in
>> apache_beam.runners.common.DoFnRunner.finish
>>   File "apache_beam/runners/common.py", line 831, in
>> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>>   File "apache_beam/runners/common.py", line 872, in
>> apache_beam.runners.common.DoFnRunner._reraise_augmented
>>   File
>> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/future/utils/__init__.py",
>> line 421, in raise_with_traceback
>>     raise exc.with_traceback(traceback)
>>   File "apache_beam/runners/common.py", line 829, in
>> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>>   File "apache_beam/runners/common.py", line 403, in
>> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>>   File "apache_beam/runners/common.py", line 406, in
>> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>>   File "apache_beam/runners/common.py", line 982, in
>> apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
>>   File "apache_beam/runners/worker/operations.py", line 142, in
>> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>>   File "apache_beam/runners/worker/operations.py", line 122, in
>> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>>   File "apache_beam/runners/worker/opcounters.py", line 196, in
>> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>>   File "apache_beam/runners/worker/opcounters.py", line 214, in
>> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>>   File "apache_beam/coders/coder_impl.py", line 1014, in
>> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>>   File "apache_beam/coders/coder_impl.py", line 1030, in
>> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>>   File "apache_beam/coders/coder_impl.py", line 814, in
>> apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
>>   File "apache_beam/coders/coder_impl.py", line 828, in
>> apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
>>   File "apache_beam/coders/coder_impl.py", line 145, in
>> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>>   File "apache_beam/coders/coder_impl.py", line 494, in
>> apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
>> TypeError: Cannot convert GlobalWindow to
>> apache_beam.utils.windowed_value._IntervalWindowBase [while running
>> 'WriteToText/Write/WriteImpl/WriteBundles']
>> -------------------------------------------------------
>>
>> To run it I've downloaded a single json with Wiki data and run is as
>> follows (running from BEAM_REPO/sdks/python/apache_beam/examples/complete
>>
>> *python top_wikipedia_sessions.py --input
>> /data/wiki/wiki_data-000000000492.json --output /tmp/beam/wiki*
>>
>> This fails somewhere close to the end, in fact I can find some results
>> (possibly complete) in
>> */tmp/beam/beam-temp-wiki-5971176ad49411e9b16448ba4ef75ccc/c18aa9d5-221f-4dc3-bc1a-83c101ee54ba.wiki*
>>
>> I tried to find the exact cause in the code but I don't understand Beam's
>> codebase enough, would appreciate some hints/explanations. There is a question
>> on SO
>> <https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python/54791913#54791913>,
>> but that doesn't help much in explaining why this error is observed.
>>
>> Thanks!
>> Pawel
>>
>> This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity to whom they are
>> addressed. If you have received this email in error please notify the
>> system manager. This message contains confidential information and is
>> intended only for the individual named. If you are not the named addressee
>> you should not disseminate, distribute or copy this e-mail. Please notify
>> the sender immediately by e-mail if you have received this e-mail by
>> mistake and delete this e-mail from your system. If you are not the
>> intended recipient you are notified that disclosing, copying, distributing
>> or taking any action in reliance on the contents of this information is
>> strictly prohibited.
>>
>

-- 


This email and any files transmitted
with it are confidential and 
intended solely for the use of the individual or
entity to whom they are 
addressed. If you have received this email in error
please notify the 
system manager. This message contains confidential
information and is 
intended only for the individual named. If you are not the
named addressee 
you should not disseminate, distribute or copy this e-mail.
Please notify 
the sender immediately by e-mail if you have received this e-mail
by 
mistake and delete this e-mail from your system. If you are not the 
intended
recipient you are notified that disclosing, copying, distributing 
or taking any
action in reliance on the contents of this information is 
strictly prohibited.

Re: Python errors when using batch+windows+textio

Posted by Kyle Weaver <kc...@google.com>.
Hi Pawel, could you tell us which version of the Beam Python SDK you are
using?

For the record, this looks like a known issue:
https://issues.apache.org/jira/browse/BEAM-6860

Kyle Weaver | Software Engineer | github.com/ibzib | kcweaver@google.com


On Wed, Sep 11, 2019 at 6:33 AM Paweł Kordek <pa...@farfetch.com>
wrote:

> Hi
>
> I was developing a simple pipeline where I aggregate records by key and
> sum values for a predefined window. I was getting some errors, and after
> checking, I am getting exactly the same issues when running Wikipedia
> example from the Beam repo. The output is as follows:
> -------------------------------------------
> INFO:root:Missing pipeline option (runner). Executing pipeline using the
> default runner: DirectRunner.
> INFO:root:==================== <function annotate_downstream_side_inputs
> at 0x7f333fc1fe60> ====================
> INFO:root:==================== <function fix_side_input_pcoll_coders at
> 0x7f333fc1ff80> ====================
> INFO:root:==================== <function lift_combiners at 0x7f333fc1d050>
> ====================
> INFO:root:==================== <function expand_sdf at 0x7f333fc1d0e0>
> ====================
> INFO:root:==================== <function expand_gbk at 0x7f333fc1d170>
> ====================
> INFO:root:==================== <function sink_flattens at 0x7f333fc1d290>
> ====================
> INFO:root:==================== <function greedily_fuse at 0x7f333fc1d320>
> ====================
> INFO:root:==================== <function read_to_impulse at
> 0x7f333fc1d3b0> ====================
> INFO:root:==================== <function impulse_to_input at
> 0x7f333fc1d440> ====================
> INFO:root:==================== <function inject_timer_pcollections at
> 0x7f333fc1d5f0> ====================
> INFO:root:==================== <function sort_stages at 0x7f333fc1d680>
> ====================
> INFO:root:==================== <function window_pcollection_coders at
> 0x7f333fc1d710> ====================
> INFO:root:Running
> ((((((ref_AppliedPTransform_ReadFromText/Read_3)+(ref_AppliedPTransform_ComputeTopSessions/ExtractUserAndTimestamp_5))+(ref_AppliedPTransform_ComputeTopSessions/Filter(<lambda
> at
> top_wikipedia_sessions.py:127>)_6))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/ComputeSessionsWindow_8))+(ref_AppliedPTransform_ComputeTopSessions/ComputeSessions/PerElement/PerElement:PairWithVoid_10))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Precombine))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Write)
> INFO:root:Running
> (((((((ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Group/Read)+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/Merge))+(ComputeTopSessions/ComputeSessions/PerElement/CombinePerKey(CountCombineFn)/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/SessionsToStrings_18))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/TopPerMonthWindow_20))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/KeyWithVoid_22))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Precombine))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Write)
> INFO:root:Running
> (((ref_AppliedPTransform_WriteToText/Write/WriteImpl/DoOnce/Read_36)+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/InitializeWrite_37))+(ref_PCollection_PCollection_19/Write))+(ref_PCollection_PCollection_20/Write)
> INFO:root:Running
> ((((((((ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Group/Read)+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/Merge))+(ComputeTopSessions/TopPerMonth/Top/CombinePerKey/ExtractOutputs))+(ref_AppliedPTransform_ComputeTopSessions/TopPerMonth/Top/UnKey_30))+(ref_AppliedPTransform_ComputeTopSessions/FormatOutput_31))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WriteBundles_38))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/Pair_39))+(ref_AppliedPTransform_WriteToText/Write/WriteImpl/WindowInto(WindowIntoFn)_40))+(WriteToText/Write/WriteImpl/GroupByKey/Write)
> Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 829, in
> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>   File "apache_beam/runners/common.py", line 403, in
> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>   File "apache_beam/runners/common.py", line 406, in
> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>   File "apache_beam/runners/common.py", line 982, in
> apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
>   File "apache_beam/runners/worker/operations.py", line 142, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 122, in
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 196, in
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 214, in
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1014, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1030, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 814, in
> apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 828, in
> apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 145, in
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 494, in
> apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
> TypeError: Cannot convert GlobalWindow to
> apache_beam.utils.windowed_value._IntervalWindowBase
>
> During handling of the above exception, another exception occurred:
>
> Traceback (most recent call last):
>   File "top_wikipedia_sessions.py", line 171, in <module>
>     run()
>   File "top_wikipedia_sessions.py", line 166, in run
>     | WriteToText(known_args.output))
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 426, in __exit__
>     self.run().wait_until_finish()
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 406, in run
>     self._options).run(False)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/pipeline.py",
> line 419, in run
>     return self.runner.run_pipeline(self, self._options)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py",
> line 129, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 366, in run_pipeline
>     default_environment=self._default_environment))
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 373, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 455, in run_stages
>     stage_context.safe_coders)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 733, in _run_stage
>     result, splits = bundle_manager.process_bundle(data_input, data_output)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 1688, in process_bundle
>     part, expected_outputs), part_inputs):
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
> line 598, in result_iterator
>     yield fs.pop().result()
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
> line 435, in result
>     return self.__get_result()
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/_base.py",
> line 384, in __get_result
>     raise self._exception
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/concurrent/futures/thread.py",
> line 57, in run
>     result = self.fn(*self.args, **self.kwargs)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 1688, in <lambda>
>     part, expected_outputs), part_inputs):
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 1626, in process_bundle
>     result_future =
> self._worker_handler.control_conn.push(process_bundle_req)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner.py",
> line 1080, in push
>     response = self.worker.do_instruction(request)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 343, in do_instruction
>     request.instruction_id)
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py",
> line 369, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 598, in process_bundle
>     op.finish()
>   File "apache_beam/runners/worker/operations.py", line 611, in
> apache_beam.runners.worker.operations.DoOperation.finish
>   File "apache_beam/runners/worker/operations.py", line 612, in
> apache_beam.runners.worker.operations.DoOperation.finish
>   File "apache_beam/runners/worker/operations.py", line 613, in
> apache_beam.runners.worker.operations.DoOperation.finish
>   File "apache_beam/runners/common.py", line 847, in
> apache_beam.runners.common.DoFnRunner.finish
>   File "apache_beam/runners/common.py", line 831, in
> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>   File "apache_beam/runners/common.py", line 872, in
> apache_beam.runners.common.DoFnRunner._reraise_augmented
>   File
> "/home/pawel/miniconda3/envs/beam/lib/python3.7/site-packages/future/utils/__init__.py",
> line 421, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "apache_beam/runners/common.py", line 829, in
> apache_beam.runners.common.DoFnRunner._invoke_bundle_method
>   File "apache_beam/runners/common.py", line 403, in
> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>   File "apache_beam/runners/common.py", line 406, in
> apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
>   File "apache_beam/runners/common.py", line 982, in
> apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
>   File "apache_beam/runners/worker/operations.py", line 142, in
> apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 122, in
> apache_beam.runners.worker.operations.ConsumerSet.update_counters_start
>   File "apache_beam/runners/worker/opcounters.py", line 196, in
> apache_beam.runners.worker.opcounters.OperationCounters.update_from
>   File "apache_beam/runners/worker/opcounters.py", line 214, in
> apache_beam.runners.worker.opcounters.OperationCounters.do_sample
>   File "apache_beam/coders/coder_impl.py", line 1014, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 1030, in
> apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 814, in
> apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size
>   File "apache_beam/coders/coder_impl.py", line 828, in
> apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 145, in
> apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables
>   File "apache_beam/coders/coder_impl.py", line 494, in
> apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size
> TypeError: Cannot convert GlobalWindow to
> apache_beam.utils.windowed_value._IntervalWindowBase [while running
> 'WriteToText/Write/WriteImpl/WriteBundles']
> -------------------------------------------------------
>
> To run it I've downloaded a single json with Wiki data and run is as
> follows (running from BEAM_REPO/sdks/python/apache_beam/examples/complete
>
> *python top_wikipedia_sessions.py --input
> /data/wiki/wiki_data-000000000492.json --output /tmp/beam/wiki*
>
> This fails somewhere close to the end, in fact I can find some results
> (possibly complete) in
> */tmp/beam/beam-temp-wiki-5971176ad49411e9b16448ba4ef75ccc/c18aa9d5-221f-4dc3-bc1a-83c101ee54ba.wiki*
>
> I tried to find the exact cause in the code but I don't understand Beam's
> codebase enough, would appreciate some hints/explanations. There is a question
> on SO
> <https://stackoverflow.com/questions/54745869/how-to-create-a-dataflow-pipeline-from-pub-sub-to-gcs-in-python/54791913#54791913>,
> but that doesn't help much in explaining why this error is observed.
>
> Thanks!
> Pawel
>
> This email and any files transmitted with it are confidential and intended
> solely for the use of the individual or entity to whom they are addressed.
> If you have received this email in error please notify the system manager.
> This message contains confidential information and is intended only for the
> individual named. If you are not the named addressee you should not
> disseminate, distribute or copy this e-mail. Please notify the sender
> immediately by e-mail if you have received this e-mail by mistake and
> delete this e-mail from your system. If you are not the intended recipient
> you are notified that disclosing, copying, distributing or taking any
> action in reliance on the contents of this information is strictly
> prohibited.
>