You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Ning Kang (Jira)" <ji...@apache.org> on 2021/01/13 02:31:00 UTC

[jira] [Comment Edited] (BEAM-10708) InteractiveRunner cannot execute pipeline with cross-language transform

    [ https://issues.apache.org/jira/browse/BEAM-10708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17263715#comment-17263715 ] 

Ning Kang edited comment on BEAM-10708 at 1/13/21, 2:30 AM:
------------------------------------------------------------

As of today (2021-01-12), when a pipeline including a SqlTransform is executed with InteractiveRunner() or invoked in `from_runner_api(to_runner_api())`, an example failure stack trace can be found as following:

{code:python}
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-9-ae3e3b2a5b98> in <module>
----> 1 ib.show(pcoll)

~/beam/sdks/python/apache_beam/runners/interactive/utils.py in run_within_progress_indicator(*args, **kwargs)
    226   def run_within_progress_indicator(*args, **kwargs):
    227     with ProgressIndicator('Processing...', 'Done.'):
--> 228       return func(*args, **kwargs)
    229 
    230   return run_within_progress_indicator

~/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py in show(*pcolls, **configs)
    484   recording_manager = ie.current_env().get_recording_manager(
    485       user_pipeline, create_if_absent=True)
--> 486   recording = recording_manager.record(pcolls, max_n=n, max_duration=duration)
    487 
    488   # Catch a KeyboardInterrupt to gracefully cancel the recording and

~/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py in record(self, pcolls, max_n, max_duration)
    420     # arbitrary variables.
    421     self._watch(pcolls)
--> 422     pipeline_instrument = pi.PipelineInstrument(self.user_pipeline)
    423     self.record_pipeline()
    424 

~/beam/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py in __init__(self, pipeline, options)
    113     # proto is stable. The snapshot of pipeline will not be mutated within this
    114     # module and can be used to recover original pipeline if needed.
--> 115     self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api(
    116         pipeline.to_runner_api(use_fake_coders=True), pipeline.runner, options)
    117     ie.current_env().add_derived_pipeline(self._pipeline, self._pipeline_snap)

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, runner, options, return_context)
    900     if proto.root_transform_ids:
    901       root_transform_id, = proto.root_transform_ids
--> 902       p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
    903     else:
    904       p.transforms_stack = [AppliedPTransform(None, None, '', None)]

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1227     ]
   1228 
-> 1229     transform = ptransform.PTransform.from_runner_api(proto, context)
   1230     # Ordering is important here.
   1231     # TODO(BEAM-9635): use key, value pairs instead of depending on tags with

~/beam/sdks/python/apache_beam/transforms/ptransform.py in from_runner_api(cls, proto, context)
    728     parameter_type, constructor = cls._known_urns[proto.spec.urn]
    729 
--> 730     return constructor(
    731         proto,
    732         proto_utils.parse_Bytes(proto.spec.payload, parameter_type),

~/beam/sdks/python/apache_beam/transforms/core.py in from_runner_api_parameter(unused_ptransform, pardo_payload, context)
   1417   def from_runner_api_parameter(unused_ptransform, pardo_payload, context):
   1418     fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
-> 1419         DoFnInfo.from_runner_api(
   1420             pardo_payload.do_fn, context).serialized_dofn_data())
   1421     if si_tags_and_types:

~/beam/sdks/python/apache_beam/transforms/core.py in from_runner_api(spec, unused_context)
   1491       return StatelessDoFnInfo(spec.urn)
   1492     else:
-> 1493       raise ValueError('Unexpected DoFn type: %s' % spec.urn)
   1494 
   1495   @staticmethod

ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
{code}



was (Author: ningk):
As of today (2021-01-12), when a pipeline including a SqlTransform is executed with InteractiveRunner(), an example failure stack trace can be found as following:

{code:python}
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-9-ae3e3b2a5b98> in <module>
----> 1 ib.show(pcoll)

~/beam/sdks/python/apache_beam/runners/interactive/utils.py in run_within_progress_indicator(*args, **kwargs)
    226   def run_within_progress_indicator(*args, **kwargs):
    227     with ProgressIndicator('Processing...', 'Done.'):
--> 228       return func(*args, **kwargs)
    229 
    230   return run_within_progress_indicator

~/beam/sdks/python/apache_beam/runners/interactive/interactive_beam.py in show(*pcolls, **configs)
    484   recording_manager = ie.current_env().get_recording_manager(
    485       user_pipeline, create_if_absent=True)
--> 486   recording = recording_manager.record(pcolls, max_n=n, max_duration=duration)
    487 
    488   # Catch a KeyboardInterrupt to gracefully cancel the recording and

~/beam/sdks/python/apache_beam/runners/interactive/recording_manager.py in record(self, pcolls, max_n, max_duration)
    420     # arbitrary variables.
    421     self._watch(pcolls)
--> 422     pipeline_instrument = pi.PipelineInstrument(self.user_pipeline)
    423     self.record_pipeline()
    424 

~/beam/sdks/python/apache_beam/runners/interactive/pipeline_instrument.py in __init__(self, pipeline, options)
    113     # proto is stable. The snapshot of pipeline will not be mutated within this
    114     # module and can be used to recover original pipeline if needed.
--> 115     self._pipeline_snap = beam.pipeline.Pipeline.from_runner_api(
    116         pipeline.to_runner_api(use_fake_coders=True), pipeline.runner, options)
    117     ie.current_env().add_derived_pipeline(self._pipeline, self._pipeline_snap)

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, runner, options, return_context)
    900     if proto.root_transform_ids:
    901       root_transform_id, = proto.root_transform_ids
--> 902       p.transforms_stack = [context.transforms.get_by_id(root_transform_id)]
    903     else:
    904       p.transforms_stack = [AppliedPTransform(None, None, '', None)]

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1250     result.parts = []
   1251     for transform_id in proto.subtransforms:
-> 1252       part = context.transforms.get_by_id(transform_id)
   1253       part.parent = result
   1254       result.parts.append(part)

~/beam/sdks/python/apache_beam/runners/pipeline_context.py in get_by_id(self, id)
    113     # type: (str) -> PortableObjectT
    114     if id not in self._id_to_obj:
--> 115       self._id_to_obj[id] = self._obj_type.from_runner_api(
    116           self._id_to_proto[id], self._pipeline_context)
    117     return self._id_to_obj[id]

~/beam/sdks/python/apache_beam/pipeline.py in from_runner_api(proto, context)
   1227     ]
   1228 
-> 1229     transform = ptransform.PTransform.from_runner_api(proto, context)
   1230     # Ordering is important here.
   1231     # TODO(BEAM-9635): use key, value pairs instead of depending on tags with

~/beam/sdks/python/apache_beam/transforms/ptransform.py in from_runner_api(cls, proto, context)
    728     parameter_type, constructor = cls._known_urns[proto.spec.urn]
    729 
--> 730     return constructor(
    731         proto,
    732         proto_utils.parse_Bytes(proto.spec.payload, parameter_type),

~/beam/sdks/python/apache_beam/transforms/core.py in from_runner_api_parameter(unused_ptransform, pardo_payload, context)
   1417   def from_runner_api_parameter(unused_ptransform, pardo_payload, context):
   1418     fn, args, kwargs, si_tags_and_types, windowing = pickler.loads(
-> 1419         DoFnInfo.from_runner_api(
   1420             pardo_payload.do_fn, context).serialized_dofn_data())
   1421     if si_tags_and_types:

~/beam/sdks/python/apache_beam/transforms/core.py in from_runner_api(spec, unused_context)
   1491       return StatelessDoFnInfo(spec.urn)
   1492     else:
-> 1493       raise ValueError('Unexpected DoFn type: %s' % spec.urn)
   1494 
   1495   @staticmethod

ValueError: Unexpected DoFn type: beam:dofn:javasdk:0.1
{code}


> InteractiveRunner cannot execute pipeline with cross-language transform
> -----------------------------------------------------------------------
>
>                 Key: BEAM-10708
>                 URL: https://issues.apache.org/jira/browse/BEAM-10708
>             Project: Beam
>          Issue Type: Bug
>          Components: cross-language
>            Reporter: Brian Hulette
>            Assignee: Ning Kang
>            Priority: P2
>
> The InteractiveRunner crashes when given a pipeline that includes a cross-language transform.
> Here's the example I tried to run in a jupyter notebook:
> {code:python}
> p = beam.Pipeline(InteractiveRunner())
> pc = (p | SqlTransform("""SELECT
>             CAST(1 AS INT) AS `id`,
>             CAST('foo' AS VARCHAR) AS `str`,
>             CAST(3.14  AS DOUBLE) AS `flt`"""))
> df = interactive_beam.collect(pc)
> {code}
> The problem occurs when [pipeline_fragment.py|https://github.com/apache/beam/blob/dce1eb83b8d5137c56ac58568820c24bd8fda526/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py#L66] creates a copy of the pipeline by [writing it to proto and reading it back|https://github.com/apache/beam/blob/dce1eb83b8d5137c56ac58568820c24bd8fda526/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py#L120]. Reading it back fails because some of the pipeline is not written in Python.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)