You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Alexey Romanenko <ar...@gmail.com> on 2020/06/01 15:01:20 UTC

Re: Cross-Language pipeline fails with PROCESS SDK Harness

Thanks! It was an issue with a setting virtualenv for a worker console where it should be running.

It would be useful to print out such errors with Error level log, I think.

> On 29 May 2020, at 18:55, Kyle Weaver <kc...@google.com> wrote:
> 
> That's probably a problem with your worker. You'll need to get additional logs to debug (see https://jira.apache.org/jira/browse/BEAM-8278 <https://jira.apache.org/jira/browse/BEAM-8278>)
> 
> On Fri, May 29, 2020 at 12:48 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> Many thanks! It helped to avoid the error. I saw this option in the xlang tests before but I didn’t add it since I was confused because of the name =)
> Also, I think we need to added “—sdk_location=container” for Expansion Service
> 
> Finally, I've managed to only Java and xlang pipeline (with Python external) and it works for Docker Harness (though, I observe some new exceptions in the runtime).
> 
> On the other hand, with Process Harness it still fails with an error:
> 
> 20/05/29 18:33:30 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory: Still waiting for startup of environment '/dev/github/beam2/sdks/python/container/build/target/launcher/darwin_amd64/boot' for worker id 1-10
> 20/05/29 18:33:30 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1
> 
> If it’s unknown issue, I’ll create a Jira for that.
> 
>> On 29 May 2020, at 16:46, Kyle Weaver <kcweaver@google.com <ma...@google.com>> wrote:
>> 
>> Alexey, can you try adding --experiments=beam_fn_api to your pipeline options? We add the option automatically in Python [1] but we don't in Java.
>> 
>> I filed BEAM-10151 [2] to document this workflow. Alexey, perhaps you can help with that.
>> 
>> [1] https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209 <https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209>
>> [2] https://jira.apache.org/jira/browse/BEAM-10151 <https://jira.apache.org/jira/browse/BEAM-10151>
>> On Fri, May 29, 2020 at 10:05 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> Yes, I did run only Java pipeline with Portable Runner and there is the same error.
>> 
>> Also, I did the same (without cross-language component) against Beam 2.19 and 2.20. 
>> It works fine against Beam 2.19 (as expected, since I tested it already before) and fails with kind the same error against Beam 2.20:
>> 
>> 20/05/29 15:59:23 ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error during job invocation classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1.
>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root nodes to be runner-implemented beam:transform:impulse:v1 or beam:transform:read:v1 primitives, but transform Create.Values/Read(CreateSource) executes in environment Optional[urn: "beam:env:docker:v1"
>> payload: "\n\033apache/beam_java_sdk:2.20.0"
>> 
>> Do you think it’s a bug or I miss something in configuration?
>> 
>>> On 28 May 2020, at 22:25, Kyle Weaver <kcweaver@google.com <ma...@google.com>> wrote:
>>> 
>>> Can you try removing the cross-language component(s) from the pipeline and see if it still has the same error?
>>> 
>>> On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"
>>> 
>>>> On 28 May 2020, at 19:29, Kyle Weaver <kcweaver@google.com <ma...@google.com>> wrote:
>>>> 
>>>> What source are you using?
>>>> 
>>>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>>> Hello,
>>>> 
>>>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline with an external Python transform) with a PROCESS SDK Harness and Spark Portable Runner but it fails.
>>>> To do that I have a running Spark Runner Job Server (Spark local) and standalone Expansion Service (Python) which contains a code of my Python transform that should be called from main Java pipeline.
>>>> 
>>>> Once job has been submitted on Job Server and started running, it fails with this error:
>>>> 
>>>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: Invoking job classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>> 20/05/28 18:55:12 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting job invocation classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>> 20/05/28 18:55:12 ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error during job invocation classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
>>>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root nodes to be runner-implemented beam:transform:impulse:v1 or beam:transform:read:v1 primitives, but transform Create.Values/Read(CreateSource) executes in environment Optional[urn: "beam:env:docker:v1"
>>>> payload: "\n\033apache/beam_java_sdk:2.21.0"
>>>> capabilities: "beam:coder:bytes:v1”
>>>> ….
>>>> 
>>>> 
>>>> Some code snippets of my pipeline that can be helpful.
>>>> 
>>>> Java transform:
>>>> private static final String URN = "ml:genreclassifier:python:v1";
>>>> @Override
>>>> public PCollection<KV<String, String>> expand(PCollection<String> input) {
>>>>   PCollection<KV<String, String>> output =
>>>>       input.apply(
>>>>           "ExternalGenreClassifier",
>>>>           External.of(URN, new byte[] {}, options.getExpansionServiceURL())
>>>>               .<KV<String, String>>withOutputType());
>>>>   return output;
>>>> }
>>>> 
>>>> expansion_service.py
>>>> 
>>>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
>>>> class GenreClassifier(ptransform.PTransform):
>>>>     def __init__(self):
>>>>         super(GenreClassifier, self).__init__()
>>>> 
>>>>     def expand(self, pcoll):
>>>>         return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())
>>>> 
>>>>     def to_runner_api_parameter(self, unused_context):
>>>>         return 'ml:genreclassifier:python:v1', None
>>>> 
>>>>     @staticmethod
>>>>     def from_runner_api_parameter(unused_ptransform, unused_parameter, unused_context):
>>>>         return GenreClassifier()
>>>> 
>>>> def main(unused_argv):
>>>>     ...
>>>>     server = grpc.server(UnboundedThreadPoolExecutor())
>>>>     beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>>>>         expansion_service.ExpansionServiceServicer(
>>>>             PipelineOptions.from_dictionary({
>>>>                 'environment_type': 'PROCESS',
>>>>                 'environment_config': '{"command": “/dev/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}',
>>>>                 'sdk_location': 'container',
>>>>             })
>>>>         ), server
>>>>     )
>>>>     server.add_insecure_port('localhost:{}'.format(options.port))
>>>>     server.start()
>>>> 
>>>> Does anyone have an idea what’s wrong with my setup/pipeline and how to fix it?
>>>> 
>>>> 
>>> 
>> 
> 


Re: Cross-Language pipeline fails with PROCESS SDK Harness

Posted by Chamikara Jayalath <ch...@google.com>.
Great. Thanks.

On Mon, Jun 1, 2020 at 9:14 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Yes, I tested it with the cross-language transform (Java pipeline with
> Python external transform).
>
> On 1 Jun 2020, at 17:49, Chamikara Jayalath <ch...@google.com> wrote:
>
> To clarify, is the error resolved with the cross-language transform as
> well ? If not please file a Jira.
>
> On Mon, Jun 1, 2020 at 8:24 AM Kyle Weaver <kc...@google.com> wrote:
>
>> > It would be useful to print out such errors with Error level log, I
>> think.
>>
>> I agree, using environment_type=PROCESS is difficult enough without
>> hiding the logs by default. I re-opened the issue.
>>
>> On Mon, Jun 1, 2020 at 11:01 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Thanks! It was an issue with a setting *virtualenv* for a worker
>>> console where it should be running.
>>>
>>> It would be useful to print out such errors with Error level log, I
>>> think.
>>>
>>> On 29 May 2020, at 18:55, Kyle Weaver <kc...@google.com> wrote:
>>>
>>> That's probably a problem with your worker. You'll need to get
>>> additional logs to debug (see
>>> https://jira.apache.org/jira/browse/BEAM-8278)
>>>
>>> On Fri, May 29, 2020 at 12:48 PM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> Many thanks! It helped to avoid the error. I saw this option in the
>>>> xlang tests before but I didn’t add it since I was confused because of the
>>>> name =)
>>>> Also, I think we need to added “*—**sdk_location=container*” for
>>>> Expansion Service
>>>>
>>>> Finally, I've managed to only Java and xlang pipeline (with Python
>>>> external) and it works for Docker Harness (though, I observe some new
>>>> exceptions in the runtime).
>>>>
>>>> On the other hand, with Process Harness it still fails with an error:
>>>>
>>>> 20/05/29 18:33:30 INFO
>>>> org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory:
>>>> Still waiting for startup of
>>>> environment '/dev/github/beam2/sdks/python/container/build/target/launcher/darwin_amd64/boot'
>>>> for worker id 1-10
>>>> 20/05/29 18:33:30 ERROR org.apache.spark.executor.Executor: Exception
>>>> in task 1.0 in stage 0.0 (TID 1)
>>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>>>> java.lang.IllegalStateException: Process died with exit code 1
>>>>
>>>> If it’s unknown issue, I’ll create a Jira for that.
>>>>
>>>> On 29 May 2020, at 16:46, Kyle Weaver <kc...@google.com> wrote:
>>>>
>>>> Alexey, can you try adding --experiments=beam_fn_api to your pipeline
>>>> options? We add the option automatically in Python [1] but we don't in Java.
>>>>
>>>> I filed BEAM-10151 [2] to document this workflow. Alexey, perhaps you
>>>> can help with that.
>>>>
>>>> [1]
>>>> https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209
>>>> [2] https://jira.apache.org/jira/browse/BEAM-10151
>>>>
>>>> On Fri, May 29, 2020 at 10:05 AM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>> Yes, I did run only Java pipeline with Portable Runner and there is
>>>>> the same error.
>>>>>
>>>>> Also, I did the same (without cross-language component) against Beam
>>>>> 2.19 and 2.20.
>>>>> It works fine against Beam 2.19 (as expected, since I tested it
>>>>> already before) and fails with kind the same error against Beam 2.20:
>>>>>
>>>>> 20/05/29 15:59:23 ERROR
>>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
>>>>> during job invocation
>>>>> classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1.
>>>>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all
>>>>> root nodes to be runner-implemented beam:transform:impulse:v1 or
>>>>> beam:transform:read:v1 primitives, but
>>>>> transform Create.Values/Read(CreateSource) executes in environment
>>>>> Optional[urn: "beam:env:docker:v1"
>>>>> payload: "\n\033apache/beam_java_sdk:2.20.0"
>>>>>
>>>>> Do you think it’s a bug or I miss something in configuration?
>>>>>
>>>>> On 28 May 2020, at 22:25, Kyle Weaver <kc...@google.com> wrote:
>>>>>
>>>>> Can you try removing the cross-language component(s) from the
>>>>> pipeline and see if it still has the same error?
>>>>>
>>>>> On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>
>>>>>> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"
>>>>>>
>>>>>> On 28 May 2020, at 19:29, Kyle Weaver <kc...@google.com> wrote:
>>>>>>
>>>>>> What source are you using?
>>>>>>
>>>>>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <
>>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java
>>>>>>> pipeline with an external Python transform) with a PROCESS SDK Harness and
>>>>>>> Spark Portable Runner but it fails.
>>>>>>> To do that I have a running Spark Runner Job Server (Spark local)
>>>>>>> and standalone Expansion Service (Python) which contains a code of my
>>>>>>> Python transform that should be called from main Java pipeline.
>>>>>>>
>>>>>>> Once job has been submitted on Job Server and started running, it
>>>>>>> fails with this error:
>>>>>>>
>>>>>>> 20/05/28 18:55:12 INFO
>>>>>>> org.apache.beam.runners.spark.SparkJobInvoker: Invoking job
>>>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>>>>> 20/05/28 18:55:12 INFO
>>>>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting
>>>>>>> job invocation
>>>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>>>>> 20/05/28 18:55:12 ERROR
>>>>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
>>>>>>> during job invocation
>>>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
>>>>>>>
>>>>>>> *java.lang.IllegalArgumentException: GreedyPipelineFuser requires
>>>>>>> all root nodes to be runner-implemented beam:transform:impulse:v1 or
>>>>>>> beam:transform:read:v1 primitives, but
>>>>>>> transform Create.Values/Read(CreateSource) executes in environment
>>>>>>> Optional[urn: "beam:env:docker:v1"*payload:
>>>>>>> "\n\033apache/beam_java_sdk:2.21.0"
>>>>>>> capabilities: "beam:coder:bytes:v1”
>>>>>>> ….
>>>>>>>
>>>>>>>
>>>>>>> Some code snippets of my pipeline that can be helpful.
>>>>>>>
>>>>>>> *Java transform:*
>>>>>>>
>>>>>>> private static final String URN = "ml:genreclassifier:python:v1";
>>>>>>>
>>>>>>> @Override
>>>>>>> public PCollection<KV<String, String>> expand(PCollection<String> input) {
>>>>>>>   PCollection<KV<String, String>> output =
>>>>>>>       input.apply(
>>>>>>>           "ExternalGenreClassifier",
>>>>>>>           External.of(URN, new byte[] {}, options.getExpansionServiceURL())
>>>>>>>               .<KV<String, String>>withOutputType());
>>>>>>>   return output;
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> *expansion_service.py*
>>>>>>>
>>>>>>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
>>>>>>> class GenreClassifier(ptransform.PTransform):
>>>>>>>     def __init__(self):
>>>>>>>         super(GenreClassifier, self).__init__()
>>>>>>>
>>>>>>>     def expand(self, pcoll):
>>>>>>>         return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())
>>>>>>>
>>>>>>>     def to_runner_api_parameter(self, unused_context):
>>>>>>>         return 'ml:genreclassifier:python:v1', None
>>>>>>>
>>>>>>>     @staticmethod
>>>>>>>     def from_runner_api_parameter(unused_ptransform, unused_parameter, unused_context):
>>>>>>>         return GenreClassifier()
>>>>>>>
>>>>>>>
>>>>>>> def main(unused_argv):
>>>>>>>     ...
>>>>>>>     server = grpc.server(UnboundedThreadPoolExecutor())
>>>>>>>     beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>>>>>>>         expansion_service.ExpansionServiceServicer(
>>>>>>>             PipelineOptions.from_dictionary({
>>>>>>>                 'environment_type': 'PROCESS',
>>>>>>>                 'environment_config': *'{"command": **“/dev**/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}'*,
>>>>>>>                 'sdk_location': 'container',
>>>>>>>             })
>>>>>>>         ), server
>>>>>>>     )
>>>>>>>     server.add_insecure_port('localhost:{}'.format(options.port))
>>>>>>>     server.start()
>>>>>>>
>>>>>>> Does anyone have an idea what’s wrong with my setup/pipeline and how
>>>>>>> to fix it?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>

Re: Cross-Language pipeline fails with PROCESS SDK Harness

Posted by Alexey Romanenko <ar...@gmail.com>.
Yes, I tested it with the cross-language transform (Java pipeline with Python external transform).

> On 1 Jun 2020, at 17:49, Chamikara Jayalath <ch...@google.com> wrote:
> 
> To clarify, is the error resolved with the cross-language transform as well ? If not please file a Jira.
> 
> On Mon, Jun 1, 2020 at 8:24 AM Kyle Weaver <kcweaver@google.com <ma...@google.com>> wrote:
> > It would be useful to print out such errors with Error level log, I think.
> 
> I agree, using environment_type=PROCESS is difficult enough without hiding the logs by default. I re-opened the issue.
> 
> On Mon, Jun 1, 2020 at 11:01 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> Thanks! It was an issue with a setting virtualenv for a worker console where it should be running.
> 
> It would be useful to print out such errors with Error level log, I think.
> 
>> On 29 May 2020, at 18:55, Kyle Weaver <kcweaver@google.com <ma...@google.com>> wrote:
>> 
>> That's probably a problem with your worker. You'll need to get additional logs to debug (see https://jira.apache.org/jira/browse/BEAM-8278 <https://jira.apache.org/jira/browse/BEAM-8278>)
>> 
>> On Fri, May 29, 2020 at 12:48 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>> Many thanks! It helped to avoid the error. I saw this option in the xlang tests before but I didn’t add it since I was confused because of the name =)
>> Also, I think we need to added “—sdk_location=container” for Expansion Service
>> 
>> Finally, I've managed to only Java and xlang pipeline (with Python external) and it works for Docker Harness (though, I observe some new exceptions in the runtime).
>> 
>> On the other hand, with Process Harness it still fails with an error:
>> 
>> 20/05/29 18:33:30 INFO org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory: Still waiting for startup of environment '/dev/github/beam2/sdks/python/container/build/target/launcher/darwin_amd64/boot' for worker id 1-10
>> 20/05/29 18:33:30 ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1)
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 1
>> 
>> If it’s unknown issue, I’ll create a Jira for that.
>> 
>>> On 29 May 2020, at 16:46, Kyle Weaver <kcweaver@google.com <ma...@google.com>> wrote:
>>> 
>>> Alexey, can you try adding --experiments=beam_fn_api to your pipeline options? We add the option automatically in Python [1] but we don't in Java.
>>> 
>>> I filed BEAM-10151 [2] to document this workflow. Alexey, perhaps you can help with that.
>>> 
>>> [1] https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209 <https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209>
>>> [2] https://jira.apache.org/jira/browse/BEAM-10151 <https://jira.apache.org/jira/browse/BEAM-10151>
>>> On Fri, May 29, 2020 at 10:05 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>> Yes, I did run only Java pipeline with Portable Runner and there is the same error.
>>> 
>>> Also, I did the same (without cross-language component) against Beam 2.19 and 2.20. 
>>> It works fine against Beam 2.19 (as expected, since I tested it already before) and fails with kind the same error against Beam 2.20:
>>> 
>>> 20/05/29 15:59:23 ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error during job invocation classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1.
>>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root nodes to be runner-implemented beam:transform:impulse:v1 or beam:transform:read:v1 primitives, but transform Create.Values/Read(CreateSource) executes in environment Optional[urn: "beam:env:docker:v1"
>>> payload: "\n\033apache/beam_java_sdk:2.20.0"
>>> 
>>> Do you think it’s a bug or I miss something in configuration?
>>> 
>>>> On 28 May 2020, at 22:25, Kyle Weaver <kcweaver@google.com <ma...@google.com>> wrote:
>>>> 
>>>> Can you try removing the cross-language component(s) from the pipeline and see if it still has the same error?
>>>> 
>>>> On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>>> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"
>>>> 
>>>>> On 28 May 2020, at 19:29, Kyle Weaver <kcweaver@google.com <ma...@google.com>> wrote:
>>>>> 
>>>>> What source are you using?
>>>>> 
>>>>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
>>>>> Hello,
>>>>> 
>>>>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline with an external Python transform) with a PROCESS SDK Harness and Spark Portable Runner but it fails.
>>>>> To do that I have a running Spark Runner Job Server (Spark local) and standalone Expansion Service (Python) which contains a code of my Python transform that should be called from main Java pipeline.
>>>>> 
>>>>> Once job has been submitted on Job Server and started running, it fails with this error:
>>>>> 
>>>>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker: Invoking job classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>>> 20/05/28 18:55:12 INFO org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting job invocation classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>>> 20/05/28 18:55:12 ERROR org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error during job invocation classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
>>>>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all root nodes to be runner-implemented beam:transform:impulse:v1 or beam:transform:read:v1 primitives, but transform Create.Values/Read(CreateSource) executes in environment Optional[urn: "beam:env:docker:v1"
>>>>> payload: "\n\033apache/beam_java_sdk:2.21.0"
>>>>> capabilities: "beam:coder:bytes:v1”
>>>>> ….
>>>>> 
>>>>> 
>>>>> Some code snippets of my pipeline that can be helpful.
>>>>> 
>>>>> Java transform:
>>>>> private static final String URN = "ml:genreclassifier:python:v1";
>>>>> @Override
>>>>> public PCollection<KV<String, String>> expand(PCollection<String> input) {
>>>>>   PCollection<KV<String, String>> output =
>>>>>       input.apply(
>>>>>           "ExternalGenreClassifier",
>>>>>           External.of(URN, new byte[] {}, options.getExpansionServiceURL())
>>>>>               .<KV<String, String>>withOutputType());
>>>>>   return output;
>>>>> }
>>>>> 
>>>>> expansion_service.py
>>>>> 
>>>>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
>>>>> class GenreClassifier(ptransform.PTransform):
>>>>>     def __init__(self):
>>>>>         super(GenreClassifier, self).__init__()
>>>>> 
>>>>>     def expand(self, pcoll):
>>>>>         return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())
>>>>> 
>>>>>     def to_runner_api_parameter(self, unused_context):
>>>>>         return 'ml:genreclassifier:python:v1', None
>>>>> 
>>>>>     @staticmethod
>>>>>     def from_runner_api_parameter(unused_ptransform, unused_parameter, unused_context):
>>>>>         return GenreClassifier()
>>>>> 
>>>>> def main(unused_argv):
>>>>>     ...
>>>>>     server = grpc.server(UnboundedThreadPoolExecutor())
>>>>>     beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>>>>>         expansion_service.ExpansionServiceServicer(
>>>>>             PipelineOptions.from_dictionary({
>>>>>                 'environment_type': 'PROCESS',
>>>>>                 'environment_config': '{"command": “/dev/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}',
>>>>>                 'sdk_location': 'container',
>>>>>             })
>>>>>         ), server
>>>>>     )
>>>>>     server.add_insecure_port('localhost:{}'.format(options.port))
>>>>>     server.start()
>>>>> 
>>>>> Does anyone have an idea what’s wrong with my setup/pipeline and how to fix it?
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 


Re: Cross-Language pipeline fails with PROCESS SDK Harness

Posted by Chamikara Jayalath <ch...@google.com>.
To clarify, is the error resolved with the cross-language transform as well
? If not please file a Jira.

On Mon, Jun 1, 2020 at 8:24 AM Kyle Weaver <kc...@google.com> wrote:

> > It would be useful to print out such errors with Error level log, I
> think.
>
> I agree, using environment_type=PROCESS is difficult enough without hiding
> the logs by default. I re-opened the issue.
>
> On Mon, Jun 1, 2020 at 11:01 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> Thanks! It was an issue with a setting *virtualenv* for a worker console
>> where it should be running.
>>
>> It would be useful to print out such errors with Error level log, I think.
>>
>> On 29 May 2020, at 18:55, Kyle Weaver <kc...@google.com> wrote:
>>
>> That's probably a problem with your worker. You'll need to get additional
>> logs to debug (see https://jira.apache.org/jira/browse/BEAM-8278)
>>
>> On Fri, May 29, 2020 at 12:48 PM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Many thanks! It helped to avoid the error. I saw this option in the
>>> xlang tests before but I didn’t add it since I was confused because of the
>>> name =)
>>> Also, I think we need to added “*—**sdk_location=container*” for
>>> Expansion Service
>>>
>>> Finally, I've managed to only Java and xlang pipeline (with Python
>>> external) and it works for Docker Harness (though, I observe some new
>>> exceptions in the runtime).
>>>
>>> On the other hand, with Process Harness it still fails with an error:
>>>
>>> 20/05/29 18:33:30 INFO
>>> org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory:
>>> Still waiting for startup of
>>> environment '/dev/github/beam2/sdks/python/container/build/target/launcher/darwin_amd64/boot'
>>> for worker id 1-10
>>> 20/05/29 18:33:30 ERROR org.apache.spark.executor.Executor: Exception in
>>> task 1.0 in stage 0.0 (TID 1)
>>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>>> java.lang.IllegalStateException: Process died with exit code 1
>>>
>>> If it’s unknown issue, I’ll create a Jira for that.
>>>
>>> On 29 May 2020, at 16:46, Kyle Weaver <kc...@google.com> wrote:
>>>
>>> Alexey, can you try adding --experiments=beam_fn_api to your pipeline
>>> options? We add the option automatically in Python [1] but we don't in Java.
>>>
>>> I filed BEAM-10151 [2] to document this workflow. Alexey, perhaps you
>>> can help with that.
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209
>>> [2] https://jira.apache.org/jira/browse/BEAM-10151
>>>
>>> On Fri, May 29, 2020 at 10:05 AM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> Yes, I did run only Java pipeline with Portable Runner and there is the
>>>> same error.
>>>>
>>>> Also, I did the same (without cross-language component) against Beam
>>>> 2.19 and 2.20.
>>>> It works fine against Beam 2.19 (as expected, since I tested it already
>>>> before) and fails with kind the same error against Beam 2.20:
>>>>
>>>> 20/05/29 15:59:23 ERROR
>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
>>>> during job invocation
>>>> classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1.
>>>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all
>>>> root nodes to be runner-implemented beam:transform:impulse:v1 or
>>>> beam:transform:read:v1 primitives, but
>>>> transform Create.Values/Read(CreateSource) executes in environment
>>>> Optional[urn: "beam:env:docker:v1"
>>>> payload: "\n\033apache/beam_java_sdk:2.20.0"
>>>>
>>>> Do you think it’s a bug or I miss something in configuration?
>>>>
>>>> On 28 May 2020, at 22:25, Kyle Weaver <kc...@google.com> wrote:
>>>>
>>>> Can you try removing the cross-language component(s) from the
>>>> pipeline and see if it still has the same error?
>>>>
>>>> On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"
>>>>>
>>>>> On 28 May 2020, at 19:29, Kyle Weaver <kc...@google.com> wrote:
>>>>>
>>>>> What source are you using?
>>>>>
>>>>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <
>>>>> aromanenko.dev@gmail.com> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline
>>>>>> with an external Python transform) with a PROCESS SDK Harness and Spark
>>>>>> Portable Runner but it fails.
>>>>>> To do that I have a running Spark Runner Job Server (Spark local) and
>>>>>> standalone Expansion Service (Python) which contains a code of my Python
>>>>>> transform that should be called from main Java pipeline.
>>>>>>
>>>>>> Once job has been submitted on Job Server and started running, it
>>>>>> fails with this error:
>>>>>>
>>>>>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker:
>>>>>> Invoking job
>>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>>>> 20/05/28 18:55:12 INFO
>>>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting
>>>>>> job invocation
>>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>>>> 20/05/28 18:55:12 ERROR
>>>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
>>>>>> during job invocation
>>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
>>>>>>
>>>>>> *java.lang.IllegalArgumentException: GreedyPipelineFuser requires all
>>>>>> root nodes to be runner-implemented beam:transform:impulse:v1 or
>>>>>> beam:transform:read:v1 primitives, but
>>>>>> transform Create.Values/Read(CreateSource) executes in environment
>>>>>> Optional[urn: "beam:env:docker:v1"*payload:
>>>>>> "\n\033apache/beam_java_sdk:2.21.0"
>>>>>> capabilities: "beam:coder:bytes:v1”
>>>>>> ….
>>>>>>
>>>>>>
>>>>>> Some code snippets of my pipeline that can be helpful.
>>>>>>
>>>>>> *Java transform:*
>>>>>>
>>>>>> private static final String URN = "ml:genreclassifier:python:v1";
>>>>>>
>>>>>> @Override
>>>>>> public PCollection<KV<String, String>> expand(PCollection<String> input) {
>>>>>>   PCollection<KV<String, String>> output =
>>>>>>       input.apply(
>>>>>>           "ExternalGenreClassifier",
>>>>>>           External.of(URN, new byte[] {}, options.getExpansionServiceURL())
>>>>>>               .<KV<String, String>>withOutputType());
>>>>>>   return output;
>>>>>> }
>>>>>>
>>>>>>
>>>>>> *expansion_service.py*
>>>>>>
>>>>>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
>>>>>> class GenreClassifier(ptransform.PTransform):
>>>>>>     def __init__(self):
>>>>>>         super(GenreClassifier, self).__init__()
>>>>>>
>>>>>>     def expand(self, pcoll):
>>>>>>         return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())
>>>>>>
>>>>>>     def to_runner_api_parameter(self, unused_context):
>>>>>>         return 'ml:genreclassifier:python:v1', None
>>>>>>
>>>>>>     @staticmethod
>>>>>>     def from_runner_api_parameter(unused_ptransform, unused_parameter, unused_context):
>>>>>>         return GenreClassifier()
>>>>>>
>>>>>>
>>>>>> def main(unused_argv):
>>>>>>     ...
>>>>>>     server = grpc.server(UnboundedThreadPoolExecutor())
>>>>>>     beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>>>>>>         expansion_service.ExpansionServiceServicer(
>>>>>>             PipelineOptions.from_dictionary({
>>>>>>                 'environment_type': 'PROCESS',
>>>>>>                 'environment_config': *'{"command": **“/dev**/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}'*,
>>>>>>                 'sdk_location': 'container',
>>>>>>             })
>>>>>>         ), server
>>>>>>     )
>>>>>>     server.add_insecure_port('localhost:{}'.format(options.port))
>>>>>>     server.start()
>>>>>>
>>>>>> Does anyone have an idea what’s wrong with my setup/pipeline and how
>>>>>> to fix it?
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Re: Cross-Language pipeline fails with PROCESS SDK Harness

Posted by Kyle Weaver <kc...@google.com>.
> It would be useful to print out such errors with Error level log, I think.

I agree, using environment_type=PROCESS is difficult enough without hiding
the logs by default. I re-opened the issue.

On Mon, Jun 1, 2020 at 11:01 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Thanks! It was an issue with a setting *virtualenv* for a worker console
> where it should be running.
>
> It would be useful to print out such errors with Error level log, I think.
>
> On 29 May 2020, at 18:55, Kyle Weaver <kc...@google.com> wrote:
>
> That's probably a problem with your worker. You'll need to get additional
> logs to debug (see https://jira.apache.org/jira/browse/BEAM-8278)
>
> On Fri, May 29, 2020 at 12:48 PM Alexey Romanenko <
> aromanenko.dev@gmail.com> wrote:
>
>> Many thanks! It helped to avoid the error. I saw this option in the xlang
>> tests before but I didn’t add it since I was confused because of the name =)
>> Also, I think we need to added “*—**sdk_location=container*” for
>> Expansion Service
>>
>> Finally, I've managed to only Java and xlang pipeline (with Python
>> external) and it works for Docker Harness (though, I observe some new
>> exceptions in the runtime).
>>
>> On the other hand, with Process Harness it still fails with an error:
>>
>> 20/05/29 18:33:30 INFO
>> org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory:
>> Still waiting for startup of
>> environment '/dev/github/beam2/sdks/python/container/build/target/launcher/darwin_amd64/boot'
>> for worker id 1-10
>> 20/05/29 18:33:30 ERROR org.apache.spark.executor.Executor: Exception in
>> task 1.0 in stage 0.0 (TID 1)
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
>> java.lang.IllegalStateException: Process died with exit code 1
>>
>> If it’s unknown issue, I’ll create a Jira for that.
>>
>> On 29 May 2020, at 16:46, Kyle Weaver <kc...@google.com> wrote:
>>
>> Alexey, can you try adding --experiments=beam_fn_api to your pipeline
>> options? We add the option automatically in Python [1] but we don't in Java.
>>
>> I filed BEAM-10151 [2] to document this workflow. Alexey, perhaps you can
>> help with that.
>>
>> [1]
>> https://github.com/apache/beam/blob/a5b2046b10bebc59c5bde41d4cb6498058fdada2/sdks/python/apache_beam/pipeline.py#L209
>> [2] https://jira.apache.org/jira/browse/BEAM-10151
>>
>> On Fri, May 29, 2020 at 10:05 AM Alexey Romanenko <
>> aromanenko.dev@gmail.com> wrote:
>>
>>> Yes, I did run only Java pipeline with Portable Runner and there is the
>>> same error.
>>>
>>> Also, I did the same (without cross-language component) against Beam
>>> 2.19 and 2.20.
>>> It works fine against Beam 2.19 (as expected, since I tested it already
>>> before) and fails with kind the same error against Beam 2.20:
>>>
>>> 20/05/29 15:59:23 ERROR
>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
>>> during job invocation
>>> classificationpipeline-aromanenko-0529135917-9d94008d_25acfc79-abdb-4d04-be01-ad053334f6d1.
>>> java.lang.IllegalArgumentException: GreedyPipelineFuser requires all
>>> root nodes to be runner-implemented beam:transform:impulse:v1 or
>>> beam:transform:read:v1 primitives, but
>>> transform Create.Values/Read(CreateSource) executes in environment
>>> Optional[urn: "beam:env:docker:v1"
>>> payload: "\n\033apache/beam_java_sdk:2.20.0"
>>>
>>> Do you think it’s a bug or I miss something in configuration?
>>>
>>> On 28 May 2020, at 22:25, Kyle Weaver <kc...@google.com> wrote:
>>>
>>> Can you try removing the cross-language component(s) from the
>>> pipeline and see if it still has the same error?
>>>
>>> On Thu, May 28, 2020 at 4:15 PM Alexey Romanenko <
>>> aromanenko.dev@gmail.com> wrote:
>>>
>>>> For testing purposes, it’s just “Create.of(“Name1”, “Name2”, ...)"
>>>>
>>>> On 28 May 2020, at 19:29, Kyle Weaver <kc...@google.com> wrote:
>>>>
>>>> What source are you using?
>>>>
>>>> On Thu, May 28, 2020 at 1:24 PM Alexey Romanenko <
>>>> aromanenko.dev@gmail.com> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I’m trying to run a Cross-Language pipeline (Beam 2.21, Java pipeline
>>>>> with an external Python transform) with a PROCESS SDK Harness and Spark
>>>>> Portable Runner but it fails.
>>>>> To do that I have a running Spark Runner Job Server (Spark local) and
>>>>> standalone Expansion Service (Python) which contains a code of my Python
>>>>> transform that should be called from main Java pipeline.
>>>>>
>>>>> Once job has been submitted on Job Server and started running, it
>>>>> fails with this error:
>>>>>
>>>>> 20/05/28 18:55:12 INFO org.apache.beam.runners.spark.SparkJobInvoker:
>>>>> Invoking job
>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>>> 20/05/28 18:55:12 INFO
>>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Starting
>>>>> job invocation
>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719
>>>>> 20/05/28 18:55:12 ERROR
>>>>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation: Error
>>>>> during job invocation
>>>>> classificationpipeline-aromanenko-0528165508-5a8f57b9_63b11806-bed9-48e5-a9d9-314dbc93e719.
>>>>>
>>>>> *java.lang.IllegalArgumentException: GreedyPipelineFuser requires all
>>>>> root nodes to be runner-implemented beam:transform:impulse:v1 or
>>>>> beam:transform:read:v1 primitives, but
>>>>> transform Create.Values/Read(CreateSource) executes in environment
>>>>> Optional[urn: "beam:env:docker:v1"*payload:
>>>>> "\n\033apache/beam_java_sdk:2.21.0"
>>>>> capabilities: "beam:coder:bytes:v1”
>>>>> ….
>>>>>
>>>>>
>>>>> Some code snippets of my pipeline that can be helpful.
>>>>>
>>>>> *Java transform:*
>>>>>
>>>>> private static final String URN = "ml:genreclassifier:python:v1";
>>>>>
>>>>> @Override
>>>>> public PCollection<KV<String, String>> expand(PCollection<String> input) {
>>>>>   PCollection<KV<String, String>> output =
>>>>>       input.apply(
>>>>>           "ExternalGenreClassifier",
>>>>>           External.of(URN, new byte[] {}, options.getExpansionServiceURL())
>>>>>               .<KV<String, String>>withOutputType());
>>>>>   return output;
>>>>> }
>>>>>
>>>>>
>>>>> *expansion_service.py*
>>>>>
>>>>> @ptransform.PTransform.register_urn('ml:genreclassifier:python:v1', None)
>>>>> class GenreClassifier(ptransform.PTransform):
>>>>>     def __init__(self):
>>>>>         super(GenreClassifier, self).__init__()
>>>>>
>>>>>     def expand(self, pcoll):
>>>>>         return pcoll | "GenreClassifier" >> beam.ParDo(_GenreClassifierFn())
>>>>>
>>>>>     def to_runner_api_parameter(self, unused_context):
>>>>>         return 'ml:genreclassifier:python:v1', None
>>>>>
>>>>>     @staticmethod
>>>>>     def from_runner_api_parameter(unused_ptransform, unused_parameter, unused_context):
>>>>>         return GenreClassifier()
>>>>>
>>>>>
>>>>> def main(unused_argv):
>>>>>     ...
>>>>>     server = grpc.server(UnboundedThreadPoolExecutor())
>>>>>     beam_expansion_api_pb2_grpc.add_ExpansionServiceServicer_to_server(
>>>>>         expansion_service.ExpansionServiceServicer(
>>>>>             PipelineOptions.from_dictionary({
>>>>>                 'environment_type': 'PROCESS',
>>>>>                 'environment_config': *'{"command": **“/dev**/beam/sdks/python/container/build/target/launcher/darwin_amd64/boot"}'*,
>>>>>                 'sdk_location': 'container',
>>>>>             })
>>>>>         ), server
>>>>>     )
>>>>>     server.add_insecure_port('localhost:{}'.format(options.port))
>>>>>     server.start()
>>>>>
>>>>> Does anyone have an idea what’s wrong with my setup/pipeline and how
>>>>> to fix it?
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>