You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by wzjmit <wz...@gmail.com> on 2021/10/30 05:01:45 UTC

[Question] GoSDK with flink portable runner

Hi, I'm trying to follow this tutorial (https://beam.apache.org/documentation/runners/flink/) for a portable runner with gosdk.

My setup is as follows:
- flink cluster on a linux host by `./bin/start-cluster.sh`
- JobService on the same host by: `docker run --net=host
    apache/beam_flink1.13_job_server:latest
    --flink-master=localhost:8081`
- run some pipeline with `./go_example_bin -runner=universal
    -endpoint=localhost:8099`

The job always fails with an exception like
    `java.lang.IllegalStateException: No container running for id
    3c831fb28521b7575bdf78d50141a77140674e7c3b2a21786d46d9e47b2bea18`.

It seems that the worker container exits unexpectedly, and the worker
    boot log is as follows:

2021/10/30 03:24:54 Provision info:
pipeline_options:{fields:{key:"beam:option:app_name:v1"  value:{string_value:"go-job-1-1635564280252965730"}}  fields:{key:"beam:option:experiments:v1"  value:{list_value:{values:{string_value:"beam_fn_api"}}}}  fields:{key:"beam:option:flink_master:v1"  value:{string_value:"localhost:8281"}}  fields:{key:"beam:option:go_options:v1"  value:{struct_value:{fields:{key:"options"  value:{struct_value:{fields:{key:"hooks"  value:{string_value:"{}"}}}}}}}}  fields:{key:"beam:option:job_name:v1"  value:{string_value:"go0job0101635564280252965730-root-1030032442-454e2e1d"}}  fields:{key:"beam:option:options_id:v1"  value:{number_value:5}}  fields:{key:"beam:option:output_executable_path:v1"  value:{null_value:NULL_VALUE}}  fields:{key:"beam:option:parallelism:v1"  value:{number_value:-1}}  fields:{key:"beam:option:retain_docker_containers:v1"  value:{bool_value:false}}  fields:{key:"beam:option:runner:v1"  value:{null_value:NULL_VALUE}}}  logging_endpoint:{url:"localhost:14431"}  artifact_endpoint:{url:"localhost:29083"}  control_endpoint:{url:"localhost:12061"}  dependencies:{type_urn:"beam:artifact:type:file:v1"  type_payload:"\ng/tmp/beam-artifact-staging/05eadb2d287a6baaade937ec1d1d2c1d460cbf33658368b55612325600a6d9f4/1-go-worker"  role_urn:"beam:artifact:role:staging_to:v1"  role_payload:"\n\x06worker"}
2021/10/30 03:24:54 Initializing Go harness: /opt/apache/beam/boot --id=1-1 --provision_endpoint=localhost:14493
2021/10/30 03:25:02 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/worker
        caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/worker
        caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/worker
        caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/worker
        caused by:
rpc error: code = Unknown desc =
    

BTW, the staged artifact can be found at
    `/tmp/beam-artifact-staging/05eadb2d287a6baaade937ec1d1d2c1d460cbf33658368b55612325600a6d9f4/1-go-worker`
    in the container of jobservice. But I cannot figure out why the
    streaming rpc failed to getArtifact from the
    ArtifactRetrievalService.


Many thanks
wzjmit
    

Re: [Question] GoSDK with flink portable runner

Posted by Kyle Weaver <kc...@google.com>.
The worker needs to be able to access the staged artifact as well.
There's still no easy way to mount the volume [1] so the workarounds
are to use pipeline option --environment_type=LOOPBACK (locally) or
set --artifacts-dir argument to the job server to point to a
distributed filesystem.

[1] https://issues.apache.org/jira/browse/BEAM-5440

On Fri, Nov 19, 2021 at 11:39 AM Luke Cwik <lc...@google.com> wrote:
>
> +Robert Burke
>
> On Fri, Oct 29, 2021 at 10:02 PM wzjmit <wz...@gmail.com> wrote:
>>
>>
>> Hi, I'm trying to follow this tutorial (https://beam.apache.org/documentation/runners/flink/) for a portable runner with gosdk.
>>
>> My setup is as follows:
>> - flink cluster on a linux host by `./bin/start-cluster.sh`
>> - JobService on the same host by: `docker run --net=host
>>     apache/beam_flink1.13_job_server:latest
>>     --flink-master=localhost:8081`
>> - run some pipeline with `./go_example_bin -runner=universal
>>     -endpoint=localhost:8099`
>>
>> The job always fails with an exception like
>>     `java.lang.IllegalStateException: No container running for id
>>     3c831fb28521b7575bdf78d50141a77140674e7c3b2a21786d46d9e47b2bea18`.
>>
>> It seems that the worker container exits unexpectedly, and the worker
>>     boot log is as follows:
>>
>> 2021/10/30 03:24:54 Provision info:
>> pipeline_options:{fields:{key:"beam:option:app_name:v1"  value:{string_value:"go-job-1-1635564280252965730"}}  fields:{key:"beam:option:experiments:v1"  value:{list_value:{values:{string_value:"beam_fn_api"}}}}  fields:{key:"beam:option:flink_master:v1"  value:{string_value:"localhost:8281"}}  fields:{key:"beam:option:go_options:v1"  value:{struct_value:{fields:{key:"options"  value:{struct_value:{fields:{key:"hooks"  value:{string_value:"{}"}}}}}}}}  fields:{key:"beam:option:job_name:v1"  value:{string_value:"go0job0101635564280252965730-root-1030032442-454e2e1d"}}  fields:{key:"beam:option:options_id:v1"  value:{number_value:5}}  fields:{key:"beam:option:output_executable_path:v1"  value:{null_value:NULL_VALUE}}  fields:{key:"beam:option:parallelism:v1"  value:{number_value:-1}}  fields:{key:"beam:option:retain_docker_containers:v1"  value:{bool_value:false}}  fields:{key:"beam:option:runner:v1"  value:{null_value:NULL_VALUE}}}  logging_endpoint:{url:"localhost:14431"}  artifact_endpoint:{url:"localhost:29083"}  control_endpoint:{url:"localhost:12061"}  dependencies:{type_urn:"beam:artifact:type:file:v1"  type_payload:"\ng/tmp/beam-artifact-staging/05eadb2d287a6baaade937ec1d1d2c1d460cbf33658368b55612325600a6d9f4/1-go-worker"  role_urn:"beam:artifact:role:staging_to:v1"  role_payload:"\n\x06worker"}
>> 2021/10/30 03:24:54 Initializing Go harness: /opt/apache/beam/boot --id=1-1 --provision_endpoint=localhost:14493
>> 2021/10/30 03:25:02 Failed to retrieve staged files: failed to retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/worker
>>         caused by:
>> rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/worker
>>         caused by:
>> rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/worker
>>         caused by:
>> rpc error: code = Unknown desc = ; failed to retrieve chunk for /tmp/staged/worker
>>         caused by:
>> rpc error: code = Unknown desc =
>>
>>
>> BTW, the staged artifact can be found at
>>     `/tmp/beam-artifact-staging/05eadb2d287a6baaade937ec1d1d2c1d460cbf33658368b55612325600a6d9f4/1-go-worker`
>>     in the container of jobservice. But I cannot figure out why the
>>     streaming rpc failed to getArtifact from the
>>     ArtifactRetrievalService.
>>
>>
>> Many thanks
>> wzjmit
>>

Re: [Question] GoSDK with flink portable runner

Posted by Luke Cwik <lc...@google.com>.
+Robert Burke <re...@google.com>

On Fri, Oct 29, 2021 at 10:02 PM wzjmit <wz...@gmail.com> wrote:

>
> Hi, I'm trying to follow this tutorial (
> https://beam.apache.org/documentation/runners/flink/) for a portable
> runner with gosdk.
>
> My setup is as follows:
> - flink cluster on a linux host by `./bin/start-cluster.sh`
> - JobService on the same host by: `docker run --net=host
>     apache/beam_flink1.13_job_server:latest
>     --flink-master=localhost:8081`
> - run some pipeline with `./go_example_bin -runner=universal
>     -endpoint=localhost:8099`
>
> The job always fails with an exception like
>     `java.lang.IllegalStateException: No container running for id
>     3c831fb28521b7575bdf78d50141a77140674e7c3b2a21786d46d9e47b2bea18`.
>
> It seems that the worker container exits unexpectedly, and the worker
>     boot log is as follows:
>
> 2021/10/30 03:24:54 Provision info:
> pipeline_options:{fields:{key:"beam:option:app_name:v1"
> value:{string_value:"go-job-1-1635564280252965730"}}
> fields:{key:"beam:option:experiments:v1"
> value:{list_value:{values:{string_value:"beam_fn_api"}}}}
> fields:{key:"beam:option:flink_master:v1"
> value:{string_value:"localhost:8281"}}
> fields:{key:"beam:option:go_options:v1"
> value:{struct_value:{fields:{key:"options"
> value:{struct_value:{fields:{key:"hooks"  value:{string_value:"{}"}}}}}}}}
> fields:{key:"beam:option:job_name:v1"
> value:{string_value:"go0job0101635564280252965730-root-1030032442-454e2e1d"}}
> fields:{key:"beam:option:options_id:v1"  value:{number_value:5}}
> fields:{key:"beam:option:output_executable_path:v1"
> value:{null_value:NULL_VALUE}}  fields:{key:"beam:option:parallelism:v1"
> value:{number_value:-1}}
> fields:{key:"beam:option:retain_docker_containers:v1"
> value:{bool_value:false}}  fields:{key:"beam:option:runner:v1"
> value:{null_value:NULL_VALUE}}}  logging_endpoint:{url:"localhost:14431"}
> artifact_endpoint:{url:"localhost:29083"}
> control_endpoint:{url:"localhost:12061"}
> dependencies:{type_urn:"beam:artifact:type:file:v1"
> type_payload:"\ng/tmp/beam-artifact-staging/05eadb2d287a6baaade937ec1d1d2c1d460cbf33658368b55612325600a6d9f4/1-go-worker"
> role_urn:"beam:artifact:role:staging_to:v1"  role_payload:"\n\x06worker"}
> 2021/10/30 03:24:54 Initializing Go harness: /opt/apache/beam/boot
> --id=1-1 --provision_endpoint=localhost:14493
> 2021/10/30 03:25:02 Failed to retrieve staged files: failed to retrieve
> /tmp/staged in 3 attempts: failed to retrieve chunk for /tmp/staged/worker
>         caused by:
> rpc error: code = Unknown desc = ; failed to retrieve chunk for
> /tmp/staged/worker
>         caused by:
> rpc error: code = Unknown desc = ; failed to retrieve chunk for
> /tmp/staged/worker
>         caused by:
> rpc error: code = Unknown desc = ; failed to retrieve chunk for
> /tmp/staged/worker
>         caused by:
> rpc error: code = Unknown desc =
>
>
> BTW, the staged artifact can be found at
>
> `/tmp/beam-artifact-staging/05eadb2d287a6baaade937ec1d1d2c1d460cbf33658368b55612325600a6d9f4/1-go-worker`
>     in the container of jobservice. But I cannot figure out why the
>     streaming rpc failed to getArtifact from the
>     ArtifactRetrievalService.
>
>
> Many thanks
> wzjmit
>
>