You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Stephan Hoyer (Jira)" <ji...@apache.org> on 2021/04/27 18:44:00 UTC
[jira] [Updated] (BEAM-4805) beam.Map doesn't work on functions
defined with *args
[ https://issues.apache.org/jira/browse/BEAM-4805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stephan Hoyer updated BEAM-4805:
--------------------------------
Resolution: Fixed
Status: Resolved (was: Open)
> beam.Map doesn't work on functions defined with *args
> -----------------------------------------------------
>
> Key: BEAM-4805
> URL: https://issues.apache.org/jira/browse/BEAM-4805
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Stephan Hoyer
> Priority: P3
>
> Consider the following example:
> {code:python}
> import apache_beam as beam
> def f(*args, **kwargs):
> return args, kwargs
> [1, 2, 3] | beam.Map(f)
> {code}
> When I run this code using the latest released version of Beam (2.5.0), I see the following error:
> {noformat}
> TypeErrorTraceback (most recent call last)
> <ipython-input-20-9003b3f5887a> in <module>()
> ----> 1 range(3) | beam.Map(f)
> /usr/local/lib/python2.7/dist-packages/apache_beam/transforms/ptransform.pyc in __ror__(self, left, label)
> 491 _allocate_materialized_pipeline(p)
> 492 materialized_result = _AddMaterializationTransforms().visit(result)
> --> 493 p.run().wait_until_finish()
> 494 _release_materialized_pipeline(p)
> 495 return _FinalizeMaterialization().visit(materialized_result)
> /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, test_runner_api)
> 388 if test_runner_api and self._verify_runner_api_compatible():
> 389 return Pipeline.from_runner_api(
> --> 390 self.to_runner_api(), self.runner, self._options).run(False)
> 391
> 392 if self._options.view_as(TypeOptions).runtime_type_check:
> /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, test_runner_api)
> 401 finally:
> 402 shutil.rmtree(tmpdir)
> --> 403 return self.runner.run_pipeline(self)
> 404
> 405 def __enter__(self):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.pyc in run_pipeline(self, pipeline)
> 132 runner = BundleBasedDirectRunner()
> 133
> --> 134 return runner.run_pipeline(pipeline)
> 135
> 136
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_pipeline(self, pipeline)
> 216 from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
> 217 pipeline.visit(DataflowRunner.group_by_key_input_visitor())
> --> 218 return self.run_via_runner_api(pipeline.to_runner_api())
> 219
> 220 def run_via_runner_api(self, pipeline_proto):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_via_runner_api(self, pipeline_proto)
> 219
> 220 def run_via_runner_api(self, pipeline_proto):
> --> 221 return self.run_stages(*self.create_stages(pipeline_proto))
> 222
> 223 def create_stages(self, pipeline_proto):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stages(self, pipeline_components, stages, safe_coders)
> 857 metrics_by_stage[stage.name] = self.run_stage(
> 858 controller, pipeline_components, stage,
> --> 859 pcoll_buffers, safe_coders).process_bundle.metrics
> 860 finally:
> 861 controller.close()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, safe_coders)
> 968 return BundleManager(
> 969 controller, get_buffer, process_bundle_descriptor,
> --> 970 self._progress_frequency).process_bundle(data_input, data_output)
> 971
> 972 # These classes are used to interact with the worker.
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in process_bundle(self, inputs, expected_outputs)
> 1172 process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
> 1173 process_bundle_descriptor_reference=self._bundle_descriptor.id))
> -> 1174 result_future = self._controller.control_handler.push(process_bundle)
> 1175
> 1176 with ProgressRequester(
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in push(self, request)
> 1052 request.instruction_id = 'control_%s' % self._uid_counter
> 1053 logging.debug('CONTROL REQUEST %s', request)
> -> 1054 response = self.worker.do_instruction(request)
> 1055 logging.debug('CONTROL RESPONSE %s', response)
> 1056 return ControlFuture(request.instruction_id, response)
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc in do_instruction(self, request)
> 206 # E.g. if register is set, this will call self.register(request.register))
> 207 return getattr(self, request_type)(getattr(request, request_type),
> --> 208 request.instruction_id)
> 209 else:
> 210 raise NotImplementedError
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc in process_bundle(self, request, instruction_id)
> 228 try:
> 229 with state_handler.process_instruction_id(instruction_id):
> --> 230 processor.process_bundle(instruction_id)
> 231 finally:
> 232 del self.bundle_processors[instruction_id]
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.pyc in process_bundle(self, instruction_id)
> 287 for op in reversed(self.ops.values()):
> 288 logging.info('start %s', op)
> --> 289 op.start()
> 290
> 291 # Inject inputs from data plane.
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.Operation.output()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.receive()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.receive()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common._OutputProcessor.process_outputs()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.receive()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so in apache_beam.runners.worker.opcounters.OperationCounters.update_from()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so in apache_beam.runners.worker.opcounters.OperationCounters.do_sample()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/stream.pyx in apache_beam.coders.stream.get_varint_size()
> 220 return (<double*><char*>&as_long)[0]
> 221
> --> 222 cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value):
> 223 """Returns the size of the given integer value when encode as a VarInt."""
> 224 cdef libc.stdint.int64_t varint_size = 0
> TypeError: an integer is required [while running 'Map(f)']
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)