You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2019/10/02 00:21:00 UTC

[jira] [Work logged] (BEAM-7981) ParDo function wrapper doesn't support Iterable output types

     [ https://issues.apache.org/jira/browse/BEAM-7981?focusedWorklogId=321635&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-321635 ]

ASF GitHub Bot logged work on BEAM-7981:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Oct/19 00:20
            Start Date: 02/Oct/19 00:20
    Worklog Time Spent: 10m 
      Work Description: udim commented on pull request #9708: [BEAM-7981] Fix double iterable stripping
URL: https://github.com/apache/beam/pull/9708
 
 
   ... in CallableWrapperDoFn
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 321635)
    Remaining Estimate: 0h
            Time Spent: 10m

> ParDo function wrapper doesn't support Iterable output types
> ------------------------------------------------------------
>
>                 Key: BEAM-7981
>                 URL: https://issues.apache.org/jira/browse/BEAM-7981
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Udi Meiri
>            Assignee: Udi Meiri
>            Priority: Major
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> I believe the bug is in CallableWrapperDoFn.default_type_hints, which converts Iterable[str] to str.
> This test will be included (commented out) in https://github.com/apache/beam/pull/9283
> {code}
>   def test_typed_callable_iterable_output(self):
>     @typehints.with_input_types(int)
>     @typehints.with_output_types(typehints.Iterable[str])
>     def do_fn(element):
>       return [[str(element)] * 2]
>     result = [1, 2] | beam.ParDo(do_fn)
>     self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
> {code}
> Result:
> {code}
> ======================================================================
> ERROR: test_typed_callable_iterable_output (apache_beam.typehints.typed_pipeline_test.MainInputTest)
> ----------------------------------------------------------------------
> Traceback (most recent call last):
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/typehints/typed_pipeline_test.py", line 104, in test_typed_callable_iterable_output
>     result = [1, 2] | beam.ParDo(do_fn)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/transforms/ptransform.py", line 519, in __ror__
>     p.run().wait_until_finish()
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", line 406, in run
>     self._options).run(False)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", line 419, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", line 129, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 366, in run_pipeline
>     default_environment=self._default_environment))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 373, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 455, in run_stages
>     stage_context.safe_coders)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 733, in _run_stage
>     result, splits = bundle_manager.process_bundle(data_input, data_output)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1663, in process_bundle
>     part, expected_outputs), part_inputs):
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 586, in result_iterator
>     yield fs.pop().result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 432, in result
>     return self.__get_result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
>     raise self._exception
>   File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run
>     result = self.fn(*self.args, **self.kwargs)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1663, in <lambda>
>     part, expected_outputs), part_inputs):
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1601, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1080, in push
>     response = self.worker.do_instruction(request)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
>     request.instruction_id)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 593, in process_bundle
>     data.ptransform_id].process_encoded(data.data)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
>     self.output(decoded_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 256, in output
>     cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 143, in receive
>     self.consumer.process(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 435, in process
>     self.output(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 256, in output
>     cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 143, in receive
>     self.consumer.process(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 594, in process
>     delayed_application = self.dofn_receiver.receive(o)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 776, in receive
>     self.process(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 782, in process
>     self._reraise_augmented(exn)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 849, in _reraise_augmented
>     raise_with_traceback(new_exn)
>   File "/usr/local/google/home/ehudm/virtualenvs/beam-py37/lib/python3.7/site-packages/future/utils/__init__.py", line 419, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 780, in process
>     return self.do_fn_invoker.invoke_process(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 441, in invoke_process
>     windowed_value, self.process_method(windowed_value.value))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 919, in process_outputs
>     self.main_receivers.receive(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 142, in receive
>     self.update_counters_start(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 122, in update_counters_start
>     self.opcounter.update_from(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py", line 196, in update_from
>     self.do_sample(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py", line 214, in do_sample
>     self.coder_impl.get_estimated_size_and_observables(windowed_value))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", line 1024, in get_estimated_size_and_observables
>     value.value, nested=nested))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", line 220, in get_estimated_size_and_observables
>     return self.estimate_size(value, nested), []
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", line 212, in estimate_size
>     return self._get_nested_size(self._size_estimator(value), nested)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py", line 135, in estimate_size
>     return len(self.encode(value))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py", line 326, in encode
>     return value.encode('utf-8')
> AttributeError: 'list' object has no attribute 'encode' [while running 'ParDo(CallableWrapperDoFn)']
> -------------------- >> begin captured logging << --------------------
> root: INFO: Generating grammar tables from /usr/lib/python3.7/lib2to3/Grammar.txt
> root: INFO: Generating grammar tables from /usr/lib/python3.7/lib2to3/PatternGrammar.txt
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.Header'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.magic'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.sync'
> root: INFO: ==================== <function annotate_downstream_side_inputs at 0x7f3a39918158> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function fix_side_input_pcoll_coders at 0x7f3a39918268> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function lift_combiners at 0x7f3a399182f0> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function expand_sdf at 0x7f3a39918378> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function expand_gbk at 0x7f3a39918400> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function sink_flattens at 0x7f3a39918510> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function greedily_fuse at 0x7f3a39918598> ====================
> root: DEBUG: 1 [3]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  CreatePInput0/Read:beam:transform:read:v1\nParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function read_to_impulse at 0x7f3a39918620> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read/Impulse:beam:transform:impulse:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function impulse_to_input at 0x7f3a399186a8> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function inject_timer_pcollections at 0x7f3a39918840> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function sort_stages at 0x7f3a399188c8> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function window_pcollection_coders at 0x7f3a39918950> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: Running ((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)
> root: DEBUG: start <DoOperation _MaterializeValues0 output_tags=['out'], receivers=[ConsumerSet[_MaterializeValues0.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
> root: DEBUG: start <DoOperation ParDo(CallableWrapperDoFn) output_tags=['out'], receivers=[SingletonConsumerSet[ParDo(CallableWrapperDoFn).out0, coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
> root: DEBUG: start <ImpulseReadOperation receivers=[SingletonConsumerSet[CreatePInput0/Read.out0, coder=WindowedValueCoder[VarIntCoder], len(consumers)=1]]>
> root: DEBUG: start <DataInputOperation receivers=[SingletonConsumerSet[CreatePInput0/Read/Impulse.out0, coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
> --------------------- >> end captured logging << ---------------------
> ----------------------------------------------------------------------
> Ran 1 test in 0.068s
> FAILED (errors=1)
> Error
> Traceback (most recent call last):
>   File "/usr/lib/python3.7/unittest/case.py", line 59, in testPartExecutor
>     yield
>   File "/usr/lib/python3.7/unittest/case.py", line 615, in run
>     testMethod()
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/typehints/typed_pipeline_test.py", line 104, in test_typed_callable_iterable_output
>     result = [1, 2] | beam.ParDo(do_fn)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/transforms/ptransform.py", line 519, in __ror__
>     p.run().wait_until_finish()
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", line 406, in run
>     self._options).run(False)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/pipeline.py", line 419, in run
>     return self.runner.run_pipeline(self, self._options)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/direct/direct_runner.py", line 129, in run_pipeline
>     return runner.run_pipeline(pipeline, options)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 366, in run_pipeline
>     default_environment=self._default_environment))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 373, in run_via_runner_api
>     return self.run_stages(stage_context, stages)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 455, in run_stages
>     stage_context.safe_coders)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 733, in _run_stage
>     result, splits = bundle_manager.process_bundle(data_input, data_output)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1663, in process_bundle
>     part, expected_outputs), part_inputs):
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 586, in result_iterator
>     yield fs.pop().result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 432, in result
>     return self.__get_result()
>   File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
>     raise self._exception
>   File "/usr/lib/python3.7/concurrent/futures/thread.py", line 57, in run
>     result = self.fn(*self.args, **self.kwargs)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1663, in <lambda>
>     part, expected_outputs), part_inputs):
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1601, in process_bundle
>     result_future = self._worker_handler.control_conn.push(process_bundle_req)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/portability/fn_api_runner.py", line 1080, in push
>     response = self.worker.do_instruction(request)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 343, in do_instruction
>     request.instruction_id)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/sdk_worker.py", line 369, in process_bundle
>     bundle_processor.process_bundle(instruction_id))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 593, in process_bundle
>     data.ptransform_id].process_encoded(data.data)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/bundle_processor.py", line 143, in process_encoded
>     self.output(decoded_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 256, in output
>     cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 143, in receive
>     self.consumer.process(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 435, in process
>     self.output(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 256, in output
>     cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 143, in receive
>     self.consumer.process(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 594, in process
>     delayed_application = self.dofn_receiver.receive(o)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 776, in receive
>     self.process(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 782, in process
>     self._reraise_augmented(exn)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 849, in _reraise_augmented
>     raise_with_traceback(new_exn)
>   File "/usr/local/google/home/ehudm/virtualenvs/beam-py37/lib/python3.7/site-packages/future/utils/__init__.py", line 419, in raise_with_traceback
>     raise exc.with_traceback(traceback)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 780, in process
>     return self.do_fn_invoker.invoke_process(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 441, in invoke_process
>     windowed_value, self.process_method(windowed_value.value))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/common.py", line 919, in process_outputs
>     self.main_receivers.receive(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 142, in receive
>     self.update_counters_start(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/operations.py", line 122, in update_counters_start
>     self.opcounter.update_from(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py", line 196, in update_from
>     self.do_sample(windowed_value)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/runners/worker/opcounters.py", line 214, in do_sample
>     self.coder_impl.get_estimated_size_and_observables(windowed_value))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", line 1024, in get_estimated_size_and_observables
>     value.value, nested=nested))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", line 220, in get_estimated_size_and_observables
>     return self.estimate_size(value, nested), []
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coder_impl.py", line 212, in estimate_size
>     return self._get_nested_size(self._size_estimator(value), nested)
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py", line 135, in estimate_size
>     return len(self.encode(value))
>   File "/usr/local/google/home/ehudm/src/beam/sdks/python/apache_beam/coders/coders.py", line 326, in encode
>     return value.encode('utf-8')
> Exception: 'list' object has no attribute 'encode' [while running 'ParDo(CallableWrapperDoFn)']
> -------------------- >> begin captured logging << --------------------
> root: INFO: Generating grammar tables from /usr/lib/python3.7/lib2to3/Grammar.txt
> root: INFO: Generating grammar tables from /usr/lib/python3.7/lib2to3/PatternGrammar.txt
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.Header'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.magic'
> avro.schema: Level 5: Register new name for 'org.apache.avro.file.sync'
> root: INFO: ==================== <function annotate_downstream_side_inputs at 0x7f3a39918158> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function fix_side_input_pcoll_coders at 0x7f3a39918268> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function lift_combiners at 0x7f3a399182f0> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function expand_sdf at 0x7f3a39918378> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function expand_gbk at 0x7f3a39918400> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function sink_flattens at 0x7f3a39918510> ====================
> root: DEBUG: 3 [1, 1, 1]
> root: DEBUG: Stages: ['ref_AppliedPTransform_CreatePInput0/Read_3\n  CreatePInput0/Read:beam:transform:read:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ', 'ref_AppliedPTransform__MaterializeValues0_5\n  _MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function greedily_fuse at 0x7f3a39918598> ====================
> root: DEBUG: 1 [3]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  CreatePInput0/Read:beam:transform:read:v1\nParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function read_to_impulse at 0x7f3a39918620> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read/Impulse:beam:transform:impulse:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function impulse_to_input at 0x7f3a399186a8> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function inject_timer_pcollections at 0x7f3a39918840> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function sort_stages at 0x7f3a399188c8> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: ==================== <function window_pcollection_coders at 0x7f3a39918950> ====================
> root: DEBUG: 1 [4]
> root: DEBUG: Stages: ['((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)\n  ParDo(CallableWrapperDoFn):beam:transform:pardo:v1\n_MaterializeValues0:beam:transform:pardo:v1\nCreatePInput0/Read:beam:transform:read_from_impulse_python:v1\nCreatePInput0/Read/Impulse:beam:source:runner:0.1\n  must follow: \n  downstream_side_inputs: ']
> root: INFO: Running ((ref_AppliedPTransform_CreatePInput0/Read_3)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_4))+(ref_AppliedPTransform__MaterializeValues0_5)
> root: DEBUG: start <DoOperation _MaterializeValues0 output_tags=['out'], receivers=[ConsumerSet[_MaterializeValues0.out0, coder=WindowedValueCoder[FastPrimitivesCoder], len(consumers)=0]]>
> root: DEBUG: start <DoOperation ParDo(CallableWrapperDoFn) output_tags=['out'], receivers=[SingletonConsumerSet[ParDo(CallableWrapperDoFn).out0, coder=WindowedValueCoder[StrUtf8Coder], len(consumers)=1]]>
> root: DEBUG: start <ImpulseReadOperation receivers=[SingletonConsumerSet[CreatePInput0/Read.out0, coder=WindowedValueCoder[VarIntCoder], len(consumers)=1]]>
> root: DEBUG: start <DataInputOperation receivers=[SingletonConsumerSet[CreatePInput0/Read/Impulse.out0, coder=WindowedValueCoder[BytesCoder], len(consumers)=1]]>
> --------------------- >> end captured logging << ---------------------
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)