You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Ahmet Altay (JIRA)" <ji...@apache.org> on 2018/07/18 00:03:00 UTC

[jira] [Commented] (BEAM-4805) beam.Map doesn't work on functions defined with *args

    [ https://issues.apache.org/jira/browse/BEAM-4805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547237#comment-16547237 ] 

Ahmet Altay commented on BEAM-4805:
-----------------------------------

Map expects a function with the following signature:

f(element, *args, **kwargs)

Type hints will also be used in the first elements. Args and kwargs are used for passing side inputs. We could improve the pydoc or documentation for this. However, this is intended behaviour.

> beam.Map doesn't work on functions defined with *args
> -----------------------------------------------------
>
>                 Key: BEAM-4805
>                 URL: https://issues.apache.org/jira/browse/BEAM-4805
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Stephan Hoyer
>            Assignee: Ahmet Altay
>            Priority: Major
>
> Consider the following example:
> {code:python}
> import apache_beam as beam
> def f(*args, **kwargs):
>     return args, kwargs
> [1, 2, 3] | beam.Map(f)
> {code}
> When I run this code using the latest released version of Beam (2.5.0), I see the following error:
> {noformat}
> TypeErrorTraceback (most recent call last)
> <ipython-input-20-9003b3f5887a> in <module>()
> ----> 1 range(3) | beam.Map(f)
> /usr/local/lib/python2.7/dist-packages/apache_beam/transforms/ptransform.pyc in __ror__(self, left, label)
>     491     _allocate_materialized_pipeline(p)
>     492     materialized_result = _AddMaterializationTransforms().visit(result)
> --> 493     p.run().wait_until_finish()
>     494     _release_materialized_pipeline(p)
>     495     return _FinalizeMaterialization().visit(materialized_result)
> /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, test_runner_api)
>     388     if test_runner_api and self._verify_runner_api_compatible():
>     389       return Pipeline.from_runner_api(
> --> 390           self.to_runner_api(), self.runner, self._options).run(False)
>     391 
>     392     if self._options.view_as(TypeOptions).runtime_type_check:
> /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, test_runner_api)
>     401       finally:
>     402         shutil.rmtree(tmpdir)
> --> 403     return self.runner.run_pipeline(self)
>     404 
>     405   def __enter__(self):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.pyc in run_pipeline(self, pipeline)
>     132       runner = BundleBasedDirectRunner()
>     133 
> --> 134     return runner.run_pipeline(pipeline)
>     135 
>     136 
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_pipeline(self, pipeline)
>     216     from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
>     217     pipeline.visit(DataflowRunner.group_by_key_input_visitor())
> --> 218     return self.run_via_runner_api(pipeline.to_runner_api())
>     219 
>     220   def run_via_runner_api(self, pipeline_proto):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_via_runner_api(self, pipeline_proto)
>     219 
>     220   def run_via_runner_api(self, pipeline_proto):
> --> 221     return self.run_stages(*self.create_stages(pipeline_proto))
>     222 
>     223   def create_stages(self, pipeline_proto):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stages(self, pipeline_components, stages, safe_coders)
>     857         metrics_by_stage[stage.name] = self.run_stage(
>     858             controller, pipeline_components, stage,
> --> 859             pcoll_buffers, safe_coders).process_bundle.metrics
>     860     finally:
>     861       controller.close()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, safe_coders)
>     968     return BundleManager(
>     969         controller, get_buffer, process_bundle_descriptor,
> --> 970         self._progress_frequency).process_bundle(data_input, data_output)
>     971 
>     972   # These classes are used to interact with the worker.
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in process_bundle(self, inputs, expected_outputs)
>    1172         process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
>    1173             process_bundle_descriptor_reference=self._bundle_descriptor.id))
> -> 1174     result_future = self._controller.control_handler.push(process_bundle)
>    1175 
>    1176     with ProgressRequester(
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in push(self, request)
>    1052         request.instruction_id = 'control_%s' % self._uid_counter
>    1053       logging.debug('CONTROL REQUEST %s', request)
> -> 1054       response = self.worker.do_instruction(request)
>    1055       logging.debug('CONTROL RESPONSE %s', response)
>    1056       return ControlFuture(request.instruction_id, response)
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc in do_instruction(self, request)
>     206       # E.g. if register is set, this will call self.register(request.register))
>     207       return getattr(self, request_type)(getattr(request, request_type),
> --> 208                                          request.instruction_id)
>     209     else:
>     210       raise NotImplementedError
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc in process_bundle(self, request, instruction_id)
>     228     try:
>     229       with state_handler.process_instruction_id(instruction_id):
> --> 230         processor.process_bundle(instruction_id)
>     231     finally:
>     232       del self.bundle_processors[instruction_id]
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.pyc in process_bundle(self, instruction_id)
>     287       for op in reversed(self.ops.values()):
>     288         logging.info('start %s', op)
> --> 289         op.start()
>     290 
>     291       # Inject inputs from data plane.
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.Operation.output()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.receive()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.receive()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common._OutputProcessor.process_outputs()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.receive()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so in apache_beam.runners.worker.opcounters.OperationCounters.update_from()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so in apache_beam.runners.worker.opcounters.OperationCounters.do_sample()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/stream.pyx in apache_beam.coders.stream.get_varint_size()
>     220     return (<double*><char*>&as_long)[0]
>     221 
> --> 222 cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value):
>     223   """Returns the size of the given integer value when encode as a VarInt."""
>     224   cdef libc.stdint.int64_t varint_size = 0
> TypeError: an integer is required [while running 'Map(f)']
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)