You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Ahmet Altay (JIRA)" <ji...@apache.org> on 2018/07/18 00:03:00 UTC
[jira] [Commented] (BEAM-4805) beam.Map doesn't work on functions
defined with *args
[ https://issues.apache.org/jira/browse/BEAM-4805?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16547237#comment-16547237 ]
Ahmet Altay commented on BEAM-4805:
-----------------------------------
Map expects a function with the following signature:
f(element, *args, **kwargs)
Type hints will also be used in the first elements. Args and kwargs are used for passing side inputs. We could improve the pydoc or documentation for this. However, this is intended behaviour.
> beam.Map doesn't work on functions defined with *args
> -----------------------------------------------------
>
> Key: BEAM-4805
> URL: https://issues.apache.org/jira/browse/BEAM-4805
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Stephan Hoyer
> Assignee: Ahmet Altay
> Priority: Major
>
> Consider the following example:
> {code:python}
> import apache_beam as beam
> def f(*args, **kwargs):
> return args, kwargs
> [1, 2, 3] | beam.Map(f)
> {code}
> When I run this code using the latest released version of Beam (2.5.0), I see the following error:
> {noformat}
> TypeErrorTraceback (most recent call last)
> <ipython-input-20-9003b3f5887a> in <module>()
> ----> 1 range(3) | beam.Map(f)
> /usr/local/lib/python2.7/dist-packages/apache_beam/transforms/ptransform.pyc in __ror__(self, left, label)
> 491 _allocate_materialized_pipeline(p)
> 492 materialized_result = _AddMaterializationTransforms().visit(result)
> --> 493 p.run().wait_until_finish()
> 494 _release_materialized_pipeline(p)
> 495 return _FinalizeMaterialization().visit(materialized_result)
> /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, test_runner_api)
> 388 if test_runner_api and self._verify_runner_api_compatible():
> 389 return Pipeline.from_runner_api(
> --> 390 self.to_runner_api(), self.runner, self._options).run(False)
> 391
> 392 if self._options.view_as(TypeOptions).runtime_type_check:
> /usr/local/lib/python2.7/dist-packages/apache_beam/pipeline.pyc in run(self, test_runner_api)
> 401 finally:
> 402 shutil.rmtree(tmpdir)
> --> 403 return self.runner.run_pipeline(self)
> 404
> 405 def __enter__(self):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/direct/direct_runner.pyc in run_pipeline(self, pipeline)
> 132 runner = BundleBasedDirectRunner()
> 133
> --> 134 return runner.run_pipeline(pipeline)
> 135
> 136
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_pipeline(self, pipeline)
> 216 from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
> 217 pipeline.visit(DataflowRunner.group_by_key_input_visitor())
> --> 218 return self.run_via_runner_api(pipeline.to_runner_api())
> 219
> 220 def run_via_runner_api(self, pipeline_proto):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_via_runner_api(self, pipeline_proto)
> 219
> 220 def run_via_runner_api(self, pipeline_proto):
> --> 221 return self.run_stages(*self.create_stages(pipeline_proto))
> 222
> 223 def create_stages(self, pipeline_proto):
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stages(self, pipeline_components, stages, safe_coders)
> 857 metrics_by_stage[stage.name] = self.run_stage(
> 858 controller, pipeline_components, stage,
> --> 859 pcoll_buffers, safe_coders).process_bundle.metrics
> 860 finally:
> 861 controller.close()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in run_stage(self, controller, pipeline_components, stage, pcoll_buffers, safe_coders)
> 968 return BundleManager(
> 969 controller, get_buffer, process_bundle_descriptor,
> --> 970 self._progress_frequency).process_bundle(data_input, data_output)
> 971
> 972 # These classes are used to interact with the worker.
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in process_bundle(self, inputs, expected_outputs)
> 1172 process_bundle=beam_fn_api_pb2.ProcessBundleRequest(
> 1173 process_bundle_descriptor_reference=self._bundle_descriptor.id))
> -> 1174 result_future = self._controller.control_handler.push(process_bundle)
> 1175
> 1176 with ProgressRequester(
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/portability/fn_api_runner.pyc in push(self, request)
> 1052 request.instruction_id = 'control_%s' % self._uid_counter
> 1053 logging.debug('CONTROL REQUEST %s', request)
> -> 1054 response = self.worker.do_instruction(request)
> 1055 logging.debug('CONTROL RESPONSE %s', response)
> 1056 return ControlFuture(request.instruction_id, response)
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc in do_instruction(self, request)
> 206 # E.g. if register is set, this will call self.register(request.register))
> 207 return getattr(self, request_type)(getattr(request, request_type),
> --> 208 request.instruction_id)
> 209 else:
> 210 raise NotImplementedError
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.pyc in process_bundle(self, request, instruction_id)
> 228 try:
> 229 with state_handler.process_instruction_id(instruction_id):
> --> 230 processor.process_bundle(instruction_id)
> 231 finally:
> 232 del self.bundle_processors[instruction_id]
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.pyc in process_bundle(self, instruction_id)
> 287 for op in reversed(self.ops.values()):
> 288 logging.info('start %s', op)
> --> 289 op.start()
> 290
> 291 # Inject inputs from data plane.
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ReadOperation.start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.Operation.output()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.receive()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.DoOperation.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.receive()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.DoFnRunner.process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common.SimpleInvoker.invoke_process()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/common.so in apache_beam.runners.common._OutputProcessor.process_outputs()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.receive()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/operations.so in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so in apache_beam.runners.worker.opcounters.OperationCounters.update_from()
> /usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/opcounters.so in apache_beam.runners.worker.opcounters.OperationCounters.do_sample()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.AbstractComponentCoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/coder_impl.so in apache_beam.coders.coder_impl.VarIntCoderImpl.estimate_size()
> /usr/local/lib/python2.7/dist-packages/apache_beam/coders/stream.pyx in apache_beam.coders.stream.get_varint_size()
> 220 return (<double*><char*>&as_long)[0]
> 221
> --> 222 cpdef libc.stdint.int64_t get_varint_size(libc.stdint.int64_t value):
> 223 """Returns the size of the given integer value when encode as a VarInt."""
> 224 cdef libc.stdint.int64_t varint_size = 0
> TypeError: an integer is required [while running 'Map(f)']
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)