You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Patrick Madsen (Jira)" <ji...@apache.org> on 2020/09/07 08:55:00 UTC

[jira] [Updated] (BEAM-10854) PeriodicImpulse default arguments are not valid values

     [ https://issues.apache.org/jira/browse/BEAM-10854?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Patrick Madsen updated BEAM-10854:
----------------------------------
    Labels: Periodic  (was: )

> PeriodicImpulse default arguments are not valid values
> ------------------------------------------------------
>
>                 Key: BEAM-10854
>                 URL: https://issues.apache.org/jira/browse/BEAM-10854
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>    Affects Versions: 2.23.0
>            Reporter: Patrick Madsen
>            Priority: P2
>              Labels: Periodic
>
> Based on the examples from [Slowly updating side input using windowing|https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing] I attempted to test the PeriodicImpulse using no variables, such that it triggered immediately and ran forever.
> The below code shows how:
> {code:python}
>         def pair_account_ids(
>             api_key: str, account_ids: Dict[str, str]
>         ) -> Optional[Tuple[str, str, int]]:
>             if api_key not in account_ids:
>                 return None
>             return (api_key, account_ids[api_key], int(time.time()))
>         def echo(elm) -> Dict[str, str]:
>             print(elm)
>             return elm
>         def api_keys(elm) -> Dict[str, str]:
>             return {"<api_key_1>": "<account_id_1>", "<api_key_2>": "<account_id_2>"}
>         pipeline_options = PipelineOptions(streaming=True)
>         with TestPipeline(
>             options=pipeline_options, runner=beam.runners.DirectRunner()
>         ) as p:
>             side_input = (
>                 p
>                 | "PeriodicImpulse"
>                 >> PeriodicImpulse(
>                     # start_timestamp=start,
>                     # stop_timestamp=stop,
>                     fire_interval=5,
>                     apply_windowing=True,
>                 )
>                 | "api_keys" >> beam.Map(api_keys)
>             )
>             main_input = (
>                 p
>                 | "MpImpulse"
>                 >> beam.Create(["<api_key_1>", "<api_key_2>", "<unknown_api_key>"])
>                 | "MapMpToTimestamped"
>                 >> beam.Map(lambda src: TimestampedValue(src, time.time()))
>                 | "WindowMpInto" >> beam.WindowInto(beam.window.FixedWindows(5))
>             )
>             result = (
>                 main_input
>                 | "Pair with AccountIDs"
>                 >> beam.Map(
>                     pair_account_ids, account_ids=beam.pvalue.AsSingleton(side_input)
>                 )
>                 | "filter" >> beam.Filter(lambda x: x is not None)
>                 | "echo 2" >> beam.Map(lambda x: print(f"{int(time.time())}: {x}"))
>             )
>         print(f"done:  {int(time.time())}")
> {code}
> The above pipeline throws the following exception however:
> {code}
> Traceback (most recent call last):
>   File "/test/not_test.py", line 141, in test_side_input
>     | "echo 2" >> beam.Map(lambda x: print(f"{int(time.time())}: {x}"))
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 555, in __exit__
>     self.run().wait_until_finish()
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/testing/test_pipeline.py", line 112, in run
>     False if self.not_use_test_runner_api else test_runner_api))
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 521, in run
>     allow_proto_holders=True).run(False)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line 534, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/direct/direct_runner.py", line 119, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 173, in run_pipeline
>     pipeline.to_runner_api(default_environment=self._default_environment))
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 183, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 340, in run_stages
>     bundle_context_manager,
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 519, in _run_stage
>     bundle_manager)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 557, in _run_bundle
>     data_input, data_output, input_timers, expected_timer_output)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 941, in process_bundle
>     timer_inputs)):
>   File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 598, in result_iterator
>     yield fs.pop().result()
>   File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 435, in result
>     return self.__get_result()
>   File "/usr/local/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
>     raise self._exception
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/thread_pool_executor.py", line 44, in run
>     self._future.set_result(self._fn(*self._fn_args, **self._fn_kwargs))
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 937, in execute
>     dry_run)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 837, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 352, in push
>     response = self.worker.do_instruction(request)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 480, in do_instruction
>     getattr(request, request_type), request.instruction_id)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 515, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 978, in process_bundle
>     element.data)
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 218, in process_encoded
>     self.output(decoded_value)
>   File "apache_beam/runners/worker/operations.py", line 330, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 332, in apache_beam.runners.worker.operations.Operation.output
>   File "apache_beam/runners/worker/operations.py", line 195, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
>   File "apache_beam/runners/worker/operations.py", line 755, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
>   File "apache_beam/runners/worker/operations.py", line 764, in apache_beam.runners.worker.operations.SdfProcessSizedElements.process
>   File "apache_beam/runners/common.py", line 971, in apache_beam.runners.common.DoFnRunner.process_with_sized_restriction
>   File "apache_beam/runners/common.py", line 711, in apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 807, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
>   File "apache_beam/runners/common.py", line 1095, in apache_beam.runners.common._OutputProcessor.process_outputs
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/transforms/periodicsequence.py", line 124, in process
>     timestamp.Timestamp(current_output_timestamp))
>   File "/usr/local/lib/python3.7/site-packages/apache_beam/utils/timestamp.py", line 64, in __init__
>     'Cannot interpret %s %s as seconds.' % (seconds, type(seconds)))
> TypeError: Cannot interpret Timestamp(1599216802.136201) <class 'apache_beam.utils.timestamp.Timestamp'> as seconds.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)