You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ning Kang (Jira)" <ji...@apache.org> on 2021/01/13 02:31:00 UTC
[jira] [Comment Edited] (BEAM-10708) InteractiveRunner cannot
execute pipeline with cross-language transform
[ https://issues.apache.org/jira/browse/BEAM-10708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263715#comment-17263715 ]
Ning Kang edited comment on BEAM-10708 at 1/13/21, 2:30 AM:
------------------------------------------------------------
As of today (2021-01-12), when a pipeline including a SqlTransform is executed with InteractiveRunner() or invoked in `from_runner_api(to_runner_api())`, an example failure stack trace can be found as following:
{code:python}
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-9-ae3e3b2a5b98> in <module>
----> 1 ib.show(pcoll)
~/beam/sdks/python/apache_beam/runners/interactive/utils.py in run_within_progress_indicator(*args, **kwargs)
226 def run_within_progress_indicator(*args, **kwargs):
227 with ProgressIndicator('Processing...', 'Done.'):
--> 228 return func(*args, **kwargs)
229
230 return run_within_progress_indicator
~/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py in show(*pcolls, **configs)
484 recording_manager = ie.current_env().get_recording_manager(
485 user_pipeline, create_if_absent=True)
--> 486 recording = recording_manager.record(pcolls, max_n=n, max_duration=duration)
487
488 # Catch a KeyboardInterrupt to gracefully cancel the recording and
~/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py in record(self, pcolls, max_n, max_duration)
420 # arbitrary variables.
421 self._watch(pcolls)
--> 422 pipeline_instrument = pi.PipelineInstrument(self.user_pipeline)
423 self.record_pipeline()
424
~/beam/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py in __init__(self, pipeline, options)
113 # proto is stable. The snapshot of pipeline will not be mutated within this
114 # module and can be used to recover original pipeline if needed.
--> 115 self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api(
116 pipeline.to_runner_api(use_fake_coders=True), pipeline.runner, options)
117 ie.current_env().add_derived_pipeline(self._pipeline, self._pipeline_snap)
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, runner, options, return_context)
900 if proto.root_transform_ids:
901 root_transform_id, = proto.root_transform_ids
--> 902 p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
903 else:
904 p.transforms_stack = [AppliedPTransform(None, None, '', None)]
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1250 result.parts = []
1251 for transform_id in proto.subtransforms:
-> 1252 part = context.transforms.get_by_id(transform_id)
1253 part.parent = result
1254 result.parts.append(part)
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1250 result.parts = []
1251 for transform_id in proto.subtransforms:
-> 1252 part = context.transforms.get_by_id(transform_id)
1253 part.parent = result
1254 result.parts.append(part)
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1250 result.parts = []
1251 for transform_id in proto.subtransforms:
-> 1252 part = context.transforms.get_by_id(transform_id)
1253 part.parent = result
1254 result.parts.append(part)
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1250 result.parts = []
1251 for transform_id in proto.subtransforms:
-> 1252 part = context.transforms.get_by_id(transform_id)
1253 part.parent = result
1254 result.parts.append(part)
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1250 result.parts = []
1251 for transform_id in proto.subtransforms:
-> 1252 part = context.transforms.get_by_id(transform_id)
1253 part.parent = result
1254 result.parts.append(part)
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1227 ]
1228
-> 1229 transform = ptransform.PTransform.from_runner_api(proto, context)
1230 # Ordering is important here.
1231 # TODO(BEAM-9635): use key, value pairs instead of depending on tags with
~/beam/sdks/python/apache_beam/transforms/ptransform.py in from_runner_api(cls, proto, context)
728 parameter_type, constructor = cls._known_urns[proto.spec.urn]
729
--> 730 return constructor(
731 proto,
732 proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
~/beam/sdks/python/apache_beam/transforms/core.py in from_runner_api_parameter(unused_ptransform, pardo_payload, context)
1417 def from_runner_api_parameter(unused_ptransform, pardo_payload, context):
1418 fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
-> 1419 DoFnInfo.from_runner_api(
1420 pardo_payload.do_fn, context).serialized_dofn_data())
1421 if si_tags_and_types:
~/beam/sdks/python/apache_beam/transforms/core.py in from_runner_api(spec, unused_context)
1491 return StatelessDoFnInfo(spec.urn)
1492 else:
-> 1493 raise ValueError('Unexpected DoFn type: %s' % spec.urn)
1494
1495 @staticmethod
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
{code}
was (Author: ningk):
As of today (2021-01-12), when a pipeline including a SqlTransform is executed with InteractiveRunner(), an example failure stack trace can be found as following:
{code:python}
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-9-ae3e3b2a5b98> in <module>
----> 1 ib.show(pcoll)
~/beam/sdks/python/apache_beam/runners/interactive/utils.py in run_within_progress_indicator(*args, **kwargs)
226 def run_within_progress_indicator(*args, **kwargs):
227 with ProgressIndicator('Processing...', 'Done.'):
--> 228 return func(*args, **kwargs)
229
230 return run_within_progress_indicator
~/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py in show(*pcolls, **configs)
484 recording_manager = ie.current_env().get_recording_manager(
485 user_pipeline, create_if_absent=True)
--> 486 recording = recording_manager.record(pcolls, max_n=n, max_duration=duration)
487
488 # Catch a KeyboardInterrupt to gracefully cancel the recording and
~/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py in record(self, pcolls, max_n, max_duration)
420 # arbitrary variables.
421 self._watch(pcolls)
--> 422 pipeline_instrument = pi.PipelineInstrument(self.user_pipeline)
423 self.record_pipeline()
424
~/beam/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py in __init__(self, pipeline, options)
113 # proto is stable. The snapshot of pipeline will not be mutated within this
114 # module and can be used to recover original pipeline if needed.
--> 115 self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api(
116 pipeline.to_runner_api(use_fake_coders=True), pipeline.runner, options)
117 ie.current_env().add_derived_pipeline(self._pipeline, self._pipeline_snap)
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, runner, options, return_context)
900 if proto.root_transform_ids:
901 root_transform_id, = proto.root_transform_ids
--> 902 p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
903 else:
904 p.transforms_stack = [AppliedPTransform(None, None, '', None)]
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1250 result.parts = []
1251 for transform_id in proto.subtransforms:
-> 1252 part = context.transforms.get_by_id(transform_id)
1253 part.parent = result
1254 result.parts.append(part)
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1250 result.parts = []
1251 for transform_id in proto.subtransforms:
-> 1252 part = context.transforms.get_by_id(transform_id)
1253 part.parent = result
1254 result.parts.append(part)
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1250 result.parts = []
1251 for transform_id in proto.subtransforms:
-> 1252 part = context.transforms.get_by_id(transform_id)
1253 part.parent = result
1254 result.parts.append(part)
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1250 result.parts = []
1251 for transform_id in proto.subtransforms:
-> 1252 part = context.transforms.get_by_id(transform_id)
1253 part.parent = result
1254 result.parts.append(part)
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1250 result.parts = []
1251 for transform_id in proto.subtransforms:
-> 1252 part = context.transforms.get_by_id(transform_id)
1253 part.parent = result
1254 result.parts.append(part)
~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
113 # type: (str) -> PortableObjectT
114 if id not in self._id_to_obj:
--> 115 self._id_to_obj[id] = self._obj_type.from_runner_api(
116 self._id_to_proto[id], self._pipeline_context)
117 return self._id_to_obj[id]
~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
1227 ]
1228
-> 1229 transform = ptransform.PTransform.from_runner_api(proto, context)
1230 # Ordering is important here.
1231 # TODO(BEAM-9635): use key, value pairs instead of depending on tags with
~/beam/sdks/python/apache_beam/transforms/ptransform.py in from_runner_api(cls, proto, context)
728 parameter_type, constructor = cls._known_urns[proto.spec.urn]
729
--> 730 return constructor(
731 proto,
732 proto_utils.parse_Bytes(proto.spec.payload, parameter_type),
~/beam/sdks/python/apache_beam/transforms/core.py in from_runner_api_parameter(unused_ptransform, pardo_payload, context)
1417 def from_runner_api_parameter(unused_ptransform, pardo_payload, context):
1418 fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
-> 1419 DoFnInfo.from_runner_api(
1420 pardo_payload.do_fn, context).serialized_dofn_data())
1421 if si_tags_and_types:
~/beam/sdks/python/apache_beam/transforms/core.py in from_runner_api(spec, unused_context)
1491 return StatelessDoFnInfo(spec.urn)
1492 else:
-> 1493 raise ValueError('Unexpected DoFn type: %s' % spec.urn)
1494
1495 @staticmethod
ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
{code}
> InteractiveRunner cannot execute pipeline with cross-language transform
> -----------------------------------------------------------------------
>
> Key: BEAM-10708
> URL: https://issues.apache.org/jira/browse/BEAM-10708
> Project: Beam
> Issue Type: Bug
> Components: cross-language
> Reporter: Brian Hulette
> Assignee: Ning Kang
> Priority: P2
>
> The InteractiveRunner crashes when given a pipeline that includes a cross-language transform.
> Here's the example I tried to run in a jupyter notebook:
> {code:python}
> p = beam.Pipeline(InteractiveRunner())
> pc = (p | SqlTransform("""SELECT
> CAST(1 AS INT) AS `id`,
> CAST('foo' AS VARCHAR) AS `str`,
> CAST(3.14 AS DOUBLE) AS `flt`"""))
> df = interactive_beam.collect(pc)
> {code}
> The problem occurs when [pipeline_fragment.py|https://github.com/apache/beam/blob/dce1eb83b8d5137c56ac58568820c24bd8fda526/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py#L66] creates a copy of the pipeline by [writing it to proto and reading it back|https://github.com/apache/beam/blob/dce1eb83b8d5137c56ac58568820c24bd8fda526/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py#L120]. Reading it back fails because some of the pipeline is not written in Python.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)