You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Robbe Sneyders <ro...@ml6.eu> on 2020/04/29 15:24:12 UTC

Beam + Flink + Docker - Write to host system

Hi all,

We're working on a project where we're limited to one big development
machine for now. We want to start developing data processing pipelines in
Python, which should eventually be ported to a currently unknown setup on a
separate cluster or cloud, so we went with Beam for its portability.

For the development setup, we wanted to have the least amount of overhead
possible, so we deployed a one node flink cluster with docker-compose. The
whole setup is defined by the following docker-compose.yml:

```
version: "2.1"
services:
  flink-jobmanager:
    image: flink:1.9
    network_mode: host
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=localhost

  flink-taskmanager:
    image: flink:1.9
    network_mode: host
    depends_on:
      - flink-jobmanager
    command: taskmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=localhost
    volumes:
      - staging-dir:/tmp/beam-artifact-staging
      - /usr/bin/docker:/usr/bin/docker
      - /var/run/docker.sock:/var/run/docker.sock
    user: flink:${DOCKER_GID}

  beam-jobserver:
    image: apache/beam_flink1.9_job_server:2.20.0
    network_mode: host
    command: --flink-master=localhost:8081
    volumes:
      - staging-dir:/tmp/beam-artifact-staging

volumes:
  staging-dir:
```

We can submit and run pipelines with the following options:
```
'runner': 'PortableRunner',
'job_endpoint': 'localhost:8099',
```
The environment type for the SDK Harness is configured to the default
'docker'.

However, we cannot write output files to the host system. To fix this,
I tried to mount a host directory to the Beam SDK Container (I had to
rebuild the Beam Job Server jar and image to do this). This seems to have
worked, as the output file is created on the host system. However the
pipeline silently fails, and the output file remains empty. Running the
pipeline with DirectRunner confirms that the pipeline is working.

Looking at the output logs, the following error is thrown in the Flink Task
Manager:
flink-taskmanager_1  | java.lang.NoClassDefFoundError:
org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
I don't know if this is a result of me rebuilding the Job Server, or caused
by another issue.

We currently do not have a distributed file system available. Is there any
way to make writing to the host system possible?

Kind regards,
Robbe

 [image: https://ml6.eu] <https://ml6.eu>

Robbe Sneyders

ML6 Gent
<https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>

M: +32 474 71 31 08

Re: Beam + Flink + Docker - Write to host system

Posted by Kyle Weaver <kc...@google.com>.
This does look a lot like FLINK-10672.. maybe leave a comment there.

If you have a simple pipeline you can share that can reproduce the error,
that would be extremely helpful.

On Thu, Apr 30, 2020 at 5:56 PM Robbe Sneyders <ro...@ml6.eu>
wrote:

> Yes, the task manager has one task slot per CPU core available, and the
> dashboard shows that the work is parallelized across multiple subtasks.
>
> However when using parallelism, the pipeline stalls, the Task Manager
> starts throwing 'Output channel stalled' warnings, and high back pressure
> is created at the Partition step as is shown in the tables below.
>
> The Task Manager should have more than enough memory.
> JVM Heap Size: 30.0 GB
> Flink Managed Memory: 21.0 GB
>
> Any idea what could cause this and how I could resolve it?
>
> Parallelism = 1:
> Name Status Bytes Received Records Received Bytes Sent Records Sent
> Parallelism Start Time Duration End Time Tasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.0 KB 52 831 MB 33060 1
> 43951.97782 3m 2s - 1
> Partition RUNNING 831 MB 33059 831 MB 33059 1 43951.97788 2m 58s - 1
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, Process
> element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 831
> MB 33057 641 MB 32439 1 43951.97788 2m 58s - 1
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) ->
> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 641 MB
> 32438 0 B 0 1 43951.97787 2m 58s - 1
>
> Parallelism = 10:
> Name Status Bytes Received Records Received Bytes Sent Records Sent
> Parallelism Start Time Duration End Time Tasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.1 KB 52 493 MB 19625
> 10 43951.9834 7m 15s - 19
> Partition RUNNING 486 MB 19363 486 MB 19363 10 43951.9834 7m 14s - 10
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, Process
> element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 477
> MB 18987 0 B 0 10 43951.98341 7m 14s - 10
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) ->
> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 1.16 KB
> 0 0 B 0 10 43951.9834 7m 14s - 10
>
>  [image: https://ml6.eu] <https://ml6.eu>
>
> Robbe Sneyders
>
> ML6 Gent
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>
> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>
>
> On Thu, 30 Apr 2020 at 22:35, Kyle Weaver <kc...@google.com> wrote:
>
>> If you are using only a single task manager but want to get parallelism >
>> 1, you will need to increase taskmanager.numberOfTaskSlots in
>> your flink-conf.yaml.
>> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling
>>
>> On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders <ro...@ml6.eu>
>> wrote:
>>
>>> Hi Kyle,
>>>
>>> Thanks for the quick response.
>>> The problem was that the pipeline could not access the input file. The
>>> Task Manager errors seem unrelated indeed.
>>>
>>> I'm now able to run the pipeline completely, but I'm running into
>>> problems when using parallelism.
>>> The pipeline can be summarized as:
>>> read file -> shuffle -> process -> write files
>>>
>>> When using parallelism > 1, the pipeline stalls and the Task Manager
>>> outputs following warnings:
>>> flink-taskmanager_1  | 2020-04-30 09:24:46,272 INFO
>>>  org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output channel
>>> stalled for 255s, outbound thread CHAIN MapPartition (MapPartition at
>>> [4]{Discard array, Load json, Process element, Dump json}) -> FlatMap
>>> (FlatMap at ExtractOutput[0]) (7/10). See:
>>> https://issues.apache.org/jira/browse/BEAM-4280 for the history for
>>> this issue.
>>>
>>> The referenced issue [1] doesn't contain a lot of information and is
>>> resolved. There is a Flink issue [2] that seems related, although I'm not
>>> seeing the reported stacktrace. I guess this problem occurs since I'm
>>> reading and writing to the same disc in parallel.
>>>
>>> Increasing the Task Manager memory seems to resolve the issue partially.
>>> I'm still getting the stalled channel warnings, but the pipeline does
>>> proceed step-wise but slowly.
>>>
>>> Using BATCH_FORCED execution mode removes the warnings, but still runs a
>>> lot slower than running with parallelism=1.
>>>
>>> The pipeline shouldn't be I/O bounded, so I guess I should still be able
>>> to get some benefit out of running tasks in parallel?
>>>
>>> 1. https://issues.apache.org/jira/browse/BEAM-4280
>>> 2.
>>> https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692
>>>
>>> Kind regards,
>>> Robbe
>>>
>>>  [image: https://ml6.eu] <https://ml6.eu>
>>>
>>> Robbe Sneyders
>>>
>>> ML6 Gent
>>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>>
>>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>>
>>>
>>> On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> > This seems to have worked, as the output file is created on the host
>>>> system. However the pipeline silently fails, and the output file remains
>>>> empty.
>>>>
>>>> Have you checked the SDK container logs? They are most likely to
>>>> contain relevant failure information.
>>>>
>>>> > I don't know if this is a result of me rebuilding the Job Server, or
>>>> caused by another issue.
>>>>
>>>> Looks like there is an old but unresolved bug with the same error:
>>>> https://issues.apache.org/jira/browse/BEAM-5397
>>>>
>>>> On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders <ro...@ml6.eu>
>>>> wrote:
>>>>
>>>>> Hi all,
>>>>>
>>>>> We're working on a project where we're limited to one big development
>>>>> machine for now. We want to start developing data processing pipelines in
>>>>> Python, which should eventually be ported to a currently unknown setup on a
>>>>> separate cluster or cloud, so we went with Beam for its portability.
>>>>>
>>>>> For the development setup, we wanted to have the least amount of
>>>>> overhead possible, so we deployed a one node flink cluster with
>>>>> docker-compose. The whole setup is defined by the following
>>>>> docker-compose.yml:
>>>>>
>>>>> ```
>>>>> version: "2.1"
>>>>> services:
>>>>>   flink-jobmanager:
>>>>>     image: flink:1.9
>>>>>     network_mode: host
>>>>>     command: jobmanager
>>>>>     environment:
>>>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>>>
>>>>>   flink-taskmanager:
>>>>>     image: flink:1.9
>>>>>     network_mode: host
>>>>>     depends_on:
>>>>>       - flink-jobmanager
>>>>>     command: taskmanager
>>>>>     environment:
>>>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>>>     volumes:
>>>>>       - staging-dir:/tmp/beam-artifact-staging
>>>>>       - /usr/bin/docker:/usr/bin/docker
>>>>>       - /var/run/docker.sock:/var/run/docker.sock
>>>>>     user: flink:${DOCKER_GID}
>>>>>
>>>>>   beam-jobserver:
>>>>>     image: apache/beam_flink1.9_job_server:2.20.0
>>>>>     network_mode: host
>>>>>     command: --flink-master=localhost:8081
>>>>>     volumes:
>>>>>       - staging-dir:/tmp/beam-artifact-staging
>>>>>
>>>>> volumes:
>>>>>   staging-dir:
>>>>> ```
>>>>>
>>>>> We can submit and run pipelines with the following options:
>>>>> ```
>>>>> 'runner': 'PortableRunner',
>>>>> 'job_endpoint': 'localhost:8099',
>>>>> ```
>>>>> The environment type for the SDK Harness is configured to the default
>>>>> 'docker'.
>>>>>
>>>>> However, we cannot write output files to the host system. To fix this,
>>>>> I tried to mount a host directory to the Beam SDK Container (I had to
>>>>> rebuild the Beam Job Server jar and image to do this). This seems to have
>>>>> worked, as the output file is created on the host system. However the
>>>>> pipeline silently fails, and the output file remains empty. Running the
>>>>> pipeline with DirectRunner confirms that the pipeline is working.
>>>>>
>>>>> Looking at the output logs, the following error is thrown in the Flink
>>>>> Task Manager:
>>>>> flink-taskmanager_1  | java.lang.NoClassDefFoundError:
>>>>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
>>>>> I don't know if this is a result of me rebuilding the Job Server, or
>>>>> caused by another issue.
>>>>>
>>>>> We currently do not have a distributed file system available. Is there
>>>>> any way to make writing to the host system possible?
>>>>>
>>>>> Kind regards,
>>>>> Robbe
>>>>>
>>>>>  [image: https://ml6.eu] <https://ml6.eu>
>>>>>
>>>>> Robbe Sneyders
>>>>>
>>>>> ML6 Gent
>>>>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>>>>
>>>>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>>>>
>>>>

Re: Beam + Flink + Docker - Write to host system

Posted by Maximilian Michels <mx...@apache.org>.
Hey Robbe,

The issue with a higher parallelism is likely due to the single Python
process which processes the data.

You may want to use the `sdk_worker_parallelism` pipeline option which
brings up multiple worker Python workers.

Best,
Max

On 30.04.20 23:56, Robbe Sneyders wrote:
> Yes, the task manager has one task slot per CPU core available, and the
> dashboard shows that the work is parallelized across multiple subtasks.
> 
> However when using parallelism, the pipeline stalls, the Task Manager
> starts throwing 'Output channel stalled' warnings, and high back
> pressure is created at the Partition step as is shown in the tables below.
> 
> The Task Manager should have more than enough memory.
> JVM Heap Size: 30.0 GB
> Flink Managed Memory: 21.0 GB
> 
> Any idea what could cause this and how I could resolve it?
> 
> Parallelism = 1:
> Name	Status	Bytes Received	Records Received	Bytes Sent	Records Sent
> Parallelism	Start Time	Duration	End Time	Tasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0])	RUNNING	43.0 KB	52	831 MB
> 33060	1	43951.97782	3m 2s	-	1
> Partition	RUNNING	831 MB	33059	831 MB	33059	1	43951.97788	2m 58s	-	1
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json,
> Process element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0])
> RUNNING	831 MB	33057	641 MB	32439	1	43951.97788	2m 58s	-	1
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)})
> -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor)	RUNNING	641
> MB	32438	0 B	0	1	43951.97787	2m 58s	-	1
> 
> 
> Parallelism = 10:
> Name	Status	Bytes Received	Records Received	Bytes Sent	Records Sent
> Parallelism	Start Time	Duration	End Time	Tasks
> CHAIN MapPartition (MapPartition at [1]Read
> input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
> -> FlatMap (FlatMap at ExtractOutput[0])	RUNNING	43.1 KB	52	493 MB
> 19625	10	43951.9834	7m 15s	-	19
> Partition	RUNNING	486 MB	19363	486 MB	19363	10	43951.9834	7m 14s	-	10
> CHAIN MapPartition (MapPartition at [4]{Discard array, Load json,
> Process element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0])
> RUNNING	477 MB	18987	0 B	0	10	43951.98341	7m 14s	-	10
> CHAIN MapPartition (MapPartition at [3]Write
> output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)})
> -> FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
> GroupCombine (GroupCombine at GroupCombine: Write
> output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor)	RUNNING
> 1.16 KB	0	0 B	0	10	43951.9834	7m 14s	-	10
> 
> 
>  https://ml6.eu <https://ml6.eu> 
> 
> 	
> 	
> 
> Robbe Sneyders
> 
> ML6 Gent
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl> 
> 
> M: +32 474 71 31 08
> 
> 
> 
> On Thu, 30 Apr 2020 at 22:35, Kyle Weaver <kcweaver@google.com
> <ma...@google.com>> wrote:
> 
>     If you are using only a single task manager but want to get
>     parallelism > 1, you will need to
>     increase taskmanager.numberOfTaskSlots in
>     your flink-conf.yaml. https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling
> 
>     On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders
>     <robbe.sneyders@ml6.eu <ma...@ml6.eu>> wrote:
> 
>         Hi Kyle,
> 
>         Thanks for the quick response.
>         The problem was that the pipeline could not access the input
>         file. The Task Manager errors seem unrelated indeed.
> 
>         I'm now able to run the pipeline completely, but I'm running
>         into problems when using parallelism. 
>         The pipeline can be summarized as:
>         read file -> shuffle -> process -> write files
> 
>         When using parallelism > 1, the pipeline stalls and the Task
>         Manager outputs following warnings:
>         flink-taskmanager_1  | 2020-04-30 09:24:46,272 INFO
>          org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output
>         channel stalled for 255s, outbound thread CHAIN MapPartition
>         (MapPartition at [4]{Discard array, Load json, Process element,
>         Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) (7/10).
>         See: https://issues.apache.org/jira/browse/BEAM-4280 for the
>         history for this issue.
> 
>         The referenced issue [1] doesn't contain a lot of information
>         and is resolved. There is a Flink issue [2] that seems related,
>         although I'm not seeing the reported stacktrace. I guess this
>         problem occurs since I'm reading and writing to the same disc in
>         parallel.
> 
>         Increasing the Task Manager memory seems to resolve the issue
>         partially. I'm still getting the stalled channel warnings, but
>         the pipeline does proceed step-wise but slowly.
> 
>         Using BATCH_FORCED execution mode removes the warnings, but
>         still runs a lot slower than running with parallelism=1.
> 
>         The pipeline shouldn't be I/O bounded, so I guess I should still
>         be able to get some benefit out of running tasks in parallel?
> 
>         1. https://issues.apache.org/jira/browse/BEAM-4280
>         2. https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692
> 
>         Kind regards,
>         Robbe
> 
>          https://ml6.eu <https://ml6.eu> 
> 
>         	
>         	
> 
>         Robbe Sneyders
> 
>         ML6 Gent
>         <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl> 
> 
>         M: +32 474 71 31 08 <tel:+32%20474%2071%2031%2008>
> 
> 
> 
>         On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <kcweaver@google.com
>         <ma...@google.com>> wrote:
> 
>             > This seems to have worked, as the output file is created
>             on the host system. However the pipeline silently fails, and
>             the output file remains empty.
> 
>             Have you checked the SDK container logs? They are most
>             likely to contain relevant failure information.
> 
>             > I don't know if this is a result of me rebuilding the Job
>             Server, or caused by another issue.
> 
>             Looks like there is an old but unresolved bug with the same
>             error: https://issues.apache.org/jira/browse/BEAM-5397
> 
>             On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders
>             <robbe.sneyders@ml6.eu <ma...@ml6.eu>> wrote:
> 
>                 Hi all,
> 
>                 We're working on a project where we're limited to one
>                 big development machine for now. We want to start
>                 developing data processing pipelines in Python, which
>                 should eventually be ported to a currently unknown setup
>                 on a separate cluster or cloud, so we went with Beam for
>                 its portability.
> 
>                 For the development setup, we wanted to have the least
>                 amount of overhead possible, so we deployed a one node
>                 flink cluster with docker-compose. The whole setup is
>                 defined by the following docker-compose.yml:
> 
>                 ```
>                 version: "2.1"
>                 services:
>                   flink-jobmanager:
>                     image: flink:1.9
>                     network_mode: host
>                     command: jobmanager
>                     environment:
>                       - JOB_MANAGER_RPC_ADDRESS=localhost
> 
>                   flink-taskmanager:
>                     image: flink:1.9
>                     network_mode: host
>                     depends_on:
>                       - flink-jobmanager
>                     command: taskmanager
>                     environment:
>                       - JOB_MANAGER_RPC_ADDRESS=localhost
>                     volumes:
>                       - staging-dir:/tmp/beam-artifact-staging
>                       - /usr/bin/docker:/usr/bin/docker
>                       - /var/run/docker.sock:/var/run/docker.sock
>                     user: flink:${DOCKER_GID}
> 
>                   beam-jobserver:
>                     image: apache/beam_flink1.9_job_server:2.20.0
>                     network_mode: host
>                     command: --flink-master=localhost:8081
>                     volumes:
>                       - staging-dir:/tmp/beam-artifact-staging
> 
>                 volumes:
>                   staging-dir:
>                 ```
> 
>                 We can submit and run pipelines with the following options:
>                 ```
>                 'runner': 'PortableRunner',
>                 'job_endpoint': 'localhost:8099',
>                 ```
>                 The environment type for the SDK Harness is configured
>                 to the default 'docker'.
> 
>                 However, we cannot write output files to the host
>                 system. To fix this, I tried to mount a host directory
>                 to the Beam SDK Container (I had to rebuild the Beam Job
>                 Server jar and image to do this). This seems to have
>                 worked, as the output file is created on the host
>                 system. However the pipeline silently fails, and the
>                 output file remains empty. Running the pipeline with
>                 DirectRunner confirms that the pipeline is working.
> 
>                 Looking at the output logs, the following error is
>                 thrown in the Flink Task Manager:
>                 flink-taskmanager_1  | java.lang.NoClassDefFoundError:
>                 org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1 
>                 I don't know if this is a result of me rebuilding the
>                 Job Server, or caused by another issue.
> 
>                 We currently do not have a distributed file system
>                 available. Is there any way to make writing to the host
>                 system possible?
> 
>                 Kind regards,
>                 Robbe
> 
>                  https://ml6.eu <https://ml6.eu> 
> 
>                 	
>                 	
> 
>                 Robbe Sneyders
> 
>                 ML6 Gent
>                 <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl> 
> 
>                 M: +32 474 71 31 08 <tel:+32%20474%2071%2031%2008>
> 

Re: Beam + Flink + Docker - Write to host system

Posted by Robbe Sneyders <ro...@ml6.eu>.
Yes, the task manager has one task slot per CPU core available, and the
dashboard shows that the work is parallelized across multiple subtasks.

However when using parallelism, the pipeline stalls, the Task Manager
starts throwing 'Output channel stalled' warnings, and high back pressure
is created at the Partition step as is shown in the tables below.

The Task Manager should have more than enough memory.
JVM Heap Size: 30.0 GB
Flink Managed Memory: 21.0 GB

Any idea what could cause this and how I could resolve it?

Parallelism = 1:
Name Status Bytes Received Records Received Bytes Sent Records Sent
Parallelism Start Time Duration End Time Tasks
CHAIN MapPartition (MapPartition at [1]Read
input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
-> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.0 KB 52 831 MB 33060 1
43951.97782 3m 2s - 1
Partition RUNNING 831 MB 33059 831 MB 33059 1 43951.97788 2m 58s - 1
CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, Process
element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 831 MB
33057 641 MB 32439 1 43951.97788 2m 58s - 1
CHAIN MapPartition (MapPartition at [3]Write
output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) ->
FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
GroupCombine (GroupCombine at GroupCombine: Write
output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 641 MB
32438 0 B 0 1 43951.97787 2m 58s - 1

Parallelism = 10:
Name Status Bytes Received Records Received Bytes Sent Records Sent
Parallelism Start Time Duration End Time Tasks
CHAIN MapPartition (MapPartition at [1]Read
input/Read/_SDFBoundedSourceWrapper/ParDo(SDFBoundedSourceDoFn)/ProcessSizedElementsAndRestrictions0)
-> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 43.1 KB 52 493 MB 19625 10
43951.9834 7m 15s - 19
Partition RUNNING 486 MB 19363 486 MB 19363 10 43951.9834 7m 14s - 10
CHAIN MapPartition (MapPartition at [4]{Discard array, Load json, Process
element, Dump json}) -> FlatMap (FlatMap at ExtractOutput[0]) RUNNING 477 MB
18987 0 B 0 10 43951.98341 7m 14s - 10
CHAIN MapPartition (MapPartition at [3]Write
output/Write/WriteImpl/{WriteBundles, Pair, WindowInto(WindowIntoFn)}) ->
FlatMap (FlatMap at ExtractOutput[0]) -> Map (Key Extractor) ->
GroupCombine (GroupCombine at GroupCombine: Write
output/Write/WriteImpl/GroupByKey) -> Map (Key Extractor) RUNNING 1.16 KB 0 0
B 0 10 43951.9834 7m 14s - 10

 [image: https://ml6.eu] <https://ml6.eu>

Robbe Sneyders

ML6 Gent
<https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>

M: +32 474 71 31 08


On Thu, 30 Apr 2020 at 22:35, Kyle Weaver <kc...@google.com> wrote:

> If you are using only a single task manager but want to get parallelism >
> 1, you will need to increase taskmanager.numberOfTaskSlots in
> your flink-conf.yaml.
> https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling
>
> On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders <ro...@ml6.eu>
> wrote:
>
>> Hi Kyle,
>>
>> Thanks for the quick response.
>> The problem was that the pipeline could not access the input file. The
>> Task Manager errors seem unrelated indeed.
>>
>> I'm now able to run the pipeline completely, but I'm running into
>> problems when using parallelism.
>> The pipeline can be summarized as:
>> read file -> shuffle -> process -> write files
>>
>> When using parallelism > 1, the pipeline stalls and the Task Manager
>> outputs following warnings:
>> flink-taskmanager_1  | 2020-04-30 09:24:46,272 INFO
>>  org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output channel
>> stalled for 255s, outbound thread CHAIN MapPartition (MapPartition at
>> [4]{Discard array, Load json, Process element, Dump json}) -> FlatMap
>> (FlatMap at ExtractOutput[0]) (7/10). See:
>> https://issues.apache.org/jira/browse/BEAM-4280 for the history for this
>> issue.
>>
>> The referenced issue [1] doesn't contain a lot of information and is
>> resolved. There is a Flink issue [2] that seems related, although I'm not
>> seeing the reported stacktrace. I guess this problem occurs since I'm
>> reading and writing to the same disc in parallel.
>>
>> Increasing the Task Manager memory seems to resolve the issue partially.
>> I'm still getting the stalled channel warnings, but the pipeline does
>> proceed step-wise but slowly.
>>
>> Using BATCH_FORCED execution mode removes the warnings, but still runs a
>> lot slower than running with parallelism=1.
>>
>> The pipeline shouldn't be I/O bounded, so I guess I should still be able
>> to get some benefit out of running tasks in parallel?
>>
>> 1. https://issues.apache.org/jira/browse/BEAM-4280
>> 2.
>> https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692
>>
>> Kind regards,
>> Robbe
>>
>>  [image: https://ml6.eu] <https://ml6.eu>
>>
>> Robbe Sneyders
>>
>> ML6 Gent
>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>
>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>
>>
>> On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <kc...@google.com> wrote:
>>
>>> > This seems to have worked, as the output file is created on the host
>>> system. However the pipeline silently fails, and the output file remains
>>> empty.
>>>
>>> Have you checked the SDK container logs? They are most likely to contain
>>> relevant failure information.
>>>
>>> > I don't know if this is a result of me rebuilding the Job Server, or
>>> caused by another issue.
>>>
>>> Looks like there is an old but unresolved bug with the same error:
>>> https://issues.apache.org/jira/browse/BEAM-5397
>>>
>>> On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders <ro...@ml6.eu>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> We're working on a project where we're limited to one big development
>>>> machine for now. We want to start developing data processing pipelines in
>>>> Python, which should eventually be ported to a currently unknown setup on a
>>>> separate cluster or cloud, so we went with Beam for its portability.
>>>>
>>>> For the development setup, we wanted to have the least amount of
>>>> overhead possible, so we deployed a one node flink cluster with
>>>> docker-compose. The whole setup is defined by the following
>>>> docker-compose.yml:
>>>>
>>>> ```
>>>> version: "2.1"
>>>> services:
>>>>   flink-jobmanager:
>>>>     image: flink:1.9
>>>>     network_mode: host
>>>>     command: jobmanager
>>>>     environment:
>>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>>
>>>>   flink-taskmanager:
>>>>     image: flink:1.9
>>>>     network_mode: host
>>>>     depends_on:
>>>>       - flink-jobmanager
>>>>     command: taskmanager
>>>>     environment:
>>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>>     volumes:
>>>>       - staging-dir:/tmp/beam-artifact-staging
>>>>       - /usr/bin/docker:/usr/bin/docker
>>>>       - /var/run/docker.sock:/var/run/docker.sock
>>>>     user: flink:${DOCKER_GID}
>>>>
>>>>   beam-jobserver:
>>>>     image: apache/beam_flink1.9_job_server:2.20.0
>>>>     network_mode: host
>>>>     command: --flink-master=localhost:8081
>>>>     volumes:
>>>>       - staging-dir:/tmp/beam-artifact-staging
>>>>
>>>> volumes:
>>>>   staging-dir:
>>>> ```
>>>>
>>>> We can submit and run pipelines with the following options:
>>>> ```
>>>> 'runner': 'PortableRunner',
>>>> 'job_endpoint': 'localhost:8099',
>>>> ```
>>>> The environment type for the SDK Harness is configured to the default
>>>> 'docker'.
>>>>
>>>> However, we cannot write output files to the host system. To fix this,
>>>> I tried to mount a host directory to the Beam SDK Container (I had to
>>>> rebuild the Beam Job Server jar and image to do this). This seems to have
>>>> worked, as the output file is created on the host system. However the
>>>> pipeline silently fails, and the output file remains empty. Running the
>>>> pipeline with DirectRunner confirms that the pipeline is working.
>>>>
>>>> Looking at the output logs, the following error is thrown in the Flink
>>>> Task Manager:
>>>> flink-taskmanager_1  | java.lang.NoClassDefFoundError:
>>>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
>>>> I don't know if this is a result of me rebuilding the Job Server, or
>>>> caused by another issue.
>>>>
>>>> We currently do not have a distributed file system available. Is there
>>>> any way to make writing to the host system possible?
>>>>
>>>> Kind regards,
>>>> Robbe
>>>>
>>>>  [image: https://ml6.eu] <https://ml6.eu>
>>>>
>>>> Robbe Sneyders
>>>>
>>>> ML6 Gent
>>>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>>>
>>>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>>>
>>>

Re: Beam + Flink + Docker - Write to host system

Posted by Kyle Weaver <kc...@google.com>.
If you are using only a single task manager but want to get parallelism >
1, you will need to increase taskmanager.numberOfTaskSlots in
your flink-conf.yaml.
https://ci.apache.org/projects/flink/flink-docs-stable/internals/job_scheduling.html#scheduling

On Thu, Apr 30, 2020 at 8:19 AM Robbe Sneyders <ro...@ml6.eu>
wrote:

> Hi Kyle,
>
> Thanks for the quick response.
> The problem was that the pipeline could not access the input file. The
> Task Manager errors seem unrelated indeed.
>
> I'm now able to run the pipeline completely, but I'm running into problems
> when using parallelism.
> The pipeline can be summarized as:
> read file -> shuffle -> process -> write files
>
> When using parallelism > 1, the pipeline stalls and the Task Manager
> outputs following warnings:
> flink-taskmanager_1  | 2020-04-30 09:24:46,272 INFO
>  org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output channel
> stalled for 255s, outbound thread CHAIN MapPartition (MapPartition at
> [4]{Discard array, Load json, Process element, Dump json}) -> FlatMap
> (FlatMap at ExtractOutput[0]) (7/10). See:
> https://issues.apache.org/jira/browse/BEAM-4280 for the history for this
> issue.
>
> The referenced issue [1] doesn't contain a lot of information and is
> resolved. There is a Flink issue [2] that seems related, although I'm not
> seeing the reported stacktrace. I guess this problem occurs since I'm
> reading and writing to the same disc in parallel.
>
> Increasing the Task Manager memory seems to resolve the issue partially.
> I'm still getting the stalled channel warnings, but the pipeline does
> proceed step-wise but slowly.
>
> Using BATCH_FORCED execution mode removes the warnings, but still runs a
> lot slower than running with parallelism=1.
>
> The pipeline shouldn't be I/O bounded, so I guess I should still be able
> to get some benefit out of running tasks in parallel?
>
> 1. https://issues.apache.org/jira/browse/BEAM-4280
> 2.
> https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692
>
> Kind regards,
> Robbe
>
>  [image: https://ml6.eu] <https://ml6.eu>
>
> Robbe Sneyders
>
> ML6 Gent
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>
> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>
>
> On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <kc...@google.com> wrote:
>
>> > This seems to have worked, as the output file is created on the host
>> system. However the pipeline silently fails, and the output file remains
>> empty.
>>
>> Have you checked the SDK container logs? They are most likely to contain
>> relevant failure information.
>>
>> > I don't know if this is a result of me rebuilding the Job Server, or
>> caused by another issue.
>>
>> Looks like there is an old but unresolved bug with the same error:
>> https://issues.apache.org/jira/browse/BEAM-5397
>>
>> On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders <ro...@ml6.eu>
>> wrote:
>>
>>> Hi all,
>>>
>>> We're working on a project where we're limited to one big development
>>> machine for now. We want to start developing data processing pipelines in
>>> Python, which should eventually be ported to a currently unknown setup on a
>>> separate cluster or cloud, so we went with Beam for its portability.
>>>
>>> For the development setup, we wanted to have the least amount of
>>> overhead possible, so we deployed a one node flink cluster with
>>> docker-compose. The whole setup is defined by the following
>>> docker-compose.yml:
>>>
>>> ```
>>> version: "2.1"
>>> services:
>>>   flink-jobmanager:
>>>     image: flink:1.9
>>>     network_mode: host
>>>     command: jobmanager
>>>     environment:
>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>
>>>   flink-taskmanager:
>>>     image: flink:1.9
>>>     network_mode: host
>>>     depends_on:
>>>       - flink-jobmanager
>>>     command: taskmanager
>>>     environment:
>>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>>     volumes:
>>>       - staging-dir:/tmp/beam-artifact-staging
>>>       - /usr/bin/docker:/usr/bin/docker
>>>       - /var/run/docker.sock:/var/run/docker.sock
>>>     user: flink:${DOCKER_GID}
>>>
>>>   beam-jobserver:
>>>     image: apache/beam_flink1.9_job_server:2.20.0
>>>     network_mode: host
>>>     command: --flink-master=localhost:8081
>>>     volumes:
>>>       - staging-dir:/tmp/beam-artifact-staging
>>>
>>> volumes:
>>>   staging-dir:
>>> ```
>>>
>>> We can submit and run pipelines with the following options:
>>> ```
>>> 'runner': 'PortableRunner',
>>> 'job_endpoint': 'localhost:8099',
>>> ```
>>> The environment type for the SDK Harness is configured to the default
>>> 'docker'.
>>>
>>> However, we cannot write output files to the host system. To fix this,
>>> I tried to mount a host directory to the Beam SDK Container (I had to
>>> rebuild the Beam Job Server jar and image to do this). This seems to have
>>> worked, as the output file is created on the host system. However the
>>> pipeline silently fails, and the output file remains empty. Running the
>>> pipeline with DirectRunner confirms that the pipeline is working.
>>>
>>> Looking at the output logs, the following error is thrown in the Flink
>>> Task Manager:
>>> flink-taskmanager_1  | java.lang.NoClassDefFoundError:
>>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
>>> I don't know if this is a result of me rebuilding the Job Server, or
>>> caused by another issue.
>>>
>>> We currently do not have a distributed file system available. Is there
>>> any way to make writing to the host system possible?
>>>
>>> Kind regards,
>>> Robbe
>>>
>>>  [image: https://ml6.eu] <https://ml6.eu>
>>>
>>> Robbe Sneyders
>>>
>>> ML6 Gent
>>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>>
>>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>>
>>

Re: Beam + Flink + Docker - Write to host system

Posted by Robbe Sneyders <ro...@ml6.eu>.
Hi Kyle,

Thanks for the quick response.
The problem was that the pipeline could not access the input file. The Task
Manager errors seem unrelated indeed.

I'm now able to run the pipeline completely, but I'm running into problems
when using parallelism.
The pipeline can be summarized as:
read file -> shuffle -> process -> write files

When using parallelism > 1, the pipeline stalls and the Task Manager
outputs following warnings:
flink-taskmanager_1  | 2020-04-30 09:24:46,272 INFO
 org.apache.beam.sdk.fn.stream.DirectStreamObserver - Output channel
stalled for 255s, outbound thread CHAIN MapPartition (MapPartition at
[4]{Discard array, Load json, Process element, Dump json}) -> FlatMap
(FlatMap at ExtractOutput[0]) (7/10). See:
https://issues.apache.org/jira/browse/BEAM-4280 for the history for this
issue.

The referenced issue [1] doesn't contain a lot of information and is
resolved. There is a Flink issue [2] that seems related, although I'm not
seeing the reported stacktrace. I guess this problem occurs since I'm
reading and writing to the same disc in parallel.

Increasing the Task Manager memory seems to resolve the issue partially.
I'm still getting the stalled channel warnings, but the pipeline does
proceed step-wise but slowly.

Using BATCH_FORCED execution mode removes the warnings, but still runs a
lot slower than running with parallelism=1.

The pipeline shouldn't be I/O bounded, so I guess I should still be able to
get some benefit out of running tasks in parallel?

1. https://issues.apache.org/jira/browse/BEAM-4280
2.
https://issues.apache.org/jira/browse/FLINK-10672?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16970692#comment-16970692

Kind regards,
Robbe

 [image: https://ml6.eu] <https://ml6.eu>

Robbe Sneyders

ML6 Gent
<https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>

M: +32 474 71 31 08


On Wed, 29 Apr 2020 at 19:28, Kyle Weaver <kc...@google.com> wrote:

> > This seems to have worked, as the output file is created on the host
> system. However the pipeline silently fails, and the output file remains
> empty.
>
> Have you checked the SDK container logs? They are most likely to contain
> relevant failure information.
>
> > I don't know if this is a result of me rebuilding the Job Server, or
> caused by another issue.
>
> Looks like there is an old but unresolved bug with the same error:
> https://issues.apache.org/jira/browse/BEAM-5397
>
> On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders <ro...@ml6.eu>
> wrote:
>
>> Hi all,
>>
>> We're working on a project where we're limited to one big development
>> machine for now. We want to start developing data processing pipelines in
>> Python, which should eventually be ported to a currently unknown setup on a
>> separate cluster or cloud, so we went with Beam for its portability.
>>
>> For the development setup, we wanted to have the least amount of overhead
>> possible, so we deployed a one node flink cluster with docker-compose. The
>> whole setup is defined by the following docker-compose.yml:
>>
>> ```
>> version: "2.1"
>> services:
>>   flink-jobmanager:
>>     image: flink:1.9
>>     network_mode: host
>>     command: jobmanager
>>     environment:
>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>
>>   flink-taskmanager:
>>     image: flink:1.9
>>     network_mode: host
>>     depends_on:
>>       - flink-jobmanager
>>     command: taskmanager
>>     environment:
>>       - JOB_MANAGER_RPC_ADDRESS=localhost
>>     volumes:
>>       - staging-dir:/tmp/beam-artifact-staging
>>       - /usr/bin/docker:/usr/bin/docker
>>       - /var/run/docker.sock:/var/run/docker.sock
>>     user: flink:${DOCKER_GID}
>>
>>   beam-jobserver:
>>     image: apache/beam_flink1.9_job_server:2.20.0
>>     network_mode: host
>>     command: --flink-master=localhost:8081
>>     volumes:
>>       - staging-dir:/tmp/beam-artifact-staging
>>
>> volumes:
>>   staging-dir:
>> ```
>>
>> We can submit and run pipelines with the following options:
>> ```
>> 'runner': 'PortableRunner',
>> 'job_endpoint': 'localhost:8099',
>> ```
>> The environment type for the SDK Harness is configured to the default
>> 'docker'.
>>
>> However, we cannot write output files to the host system. To fix this,
>> I tried to mount a host directory to the Beam SDK Container (I had to
>> rebuild the Beam Job Server jar and image to do this). This seems to have
>> worked, as the output file is created on the host system. However the
>> pipeline silently fails, and the output file remains empty. Running the
>> pipeline with DirectRunner confirms that the pipeline is working.
>>
>> Looking at the output logs, the following error is thrown in the Flink
>> Task Manager:
>> flink-taskmanager_1  | java.lang.NoClassDefFoundError:
>> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
>> I don't know if this is a result of me rebuilding the Job Server, or
>> caused by another issue.
>>
>> We currently do not have a distributed file system available. Is there
>> any way to make writing to the host system possible?
>>
>> Kind regards,
>> Robbe
>>
>>  [image: https://ml6.eu] <https://ml6.eu>
>>
>> Robbe Sneyders
>>
>> ML6 Gent
>> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>>
>> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>>
>

Re: Beam + Flink + Docker - Write to host system

Posted by Kyle Weaver <kc...@google.com>.
> This seems to have worked, as the output file is created on the host
system. However the pipeline silently fails, and the output file remains
empty.

Have you checked the SDK container logs? They are most likely to contain
relevant failure information.

> I don't know if this is a result of me rebuilding the Job Server, or
caused by another issue.

Looks like there is an old but unresolved bug with the same error:
https://issues.apache.org/jira/browse/BEAM-5397

On Wed, Apr 29, 2020 at 11:24 AM Robbe Sneyders <ro...@ml6.eu>
wrote:

> Hi all,
>
> We're working on a project where we're limited to one big development
> machine for now. We want to start developing data processing pipelines in
> Python, which should eventually be ported to a currently unknown setup on a
> separate cluster or cloud, so we went with Beam for its portability.
>
> For the development setup, we wanted to have the least amount of overhead
> possible, so we deployed a one node flink cluster with docker-compose. The
> whole setup is defined by the following docker-compose.yml:
>
> ```
> version: "2.1"
> services:
>   flink-jobmanager:
>     image: flink:1.9
>     network_mode: host
>     command: jobmanager
>     environment:
>       - JOB_MANAGER_RPC_ADDRESS=localhost
>
>   flink-taskmanager:
>     image: flink:1.9
>     network_mode: host
>     depends_on:
>       - flink-jobmanager
>     command: taskmanager
>     environment:
>       - JOB_MANAGER_RPC_ADDRESS=localhost
>     volumes:
>       - staging-dir:/tmp/beam-artifact-staging
>       - /usr/bin/docker:/usr/bin/docker
>       - /var/run/docker.sock:/var/run/docker.sock
>     user: flink:${DOCKER_GID}
>
>   beam-jobserver:
>     image: apache/beam_flink1.9_job_server:2.20.0
>     network_mode: host
>     command: --flink-master=localhost:8081
>     volumes:
>       - staging-dir:/tmp/beam-artifact-staging
>
> volumes:
>   staging-dir:
> ```
>
> We can submit and run pipelines with the following options:
> ```
> 'runner': 'PortableRunner',
> 'job_endpoint': 'localhost:8099',
> ```
> The environment type for the SDK Harness is configured to the default
> 'docker'.
>
> However, we cannot write output files to the host system. To fix this,
> I tried to mount a host directory to the Beam SDK Container (I had to
> rebuild the Beam Job Server jar and image to do this). This seems to have
> worked, as the output file is created on the host system. However the
> pipeline silently fails, and the output file remains empty. Running the
> pipeline with DirectRunner confirms that the pipeline is working.
>
> Looking at the output logs, the following error is thrown in the Flink
> Task Manager:
> flink-taskmanager_1  | java.lang.NoClassDefFoundError:
> org/apache/beam/vendor/grpc/v1p26p0/io/netty/buffer/PoolArena$1
> I don't know if this is a result of me rebuilding the Job Server, or
> caused by another issue.
>
> We currently do not have a distributed file system available. Is there any
> way to make writing to the host system possible?
>
> Kind regards,
> Robbe
>
>  [image: https://ml6.eu] <https://ml6.eu>
>
> Robbe Sneyders
>
> ML6 Gent
> <https://www.google.be/maps/place/ML6/@51.037408,3.7044893,17z/data=!3m1!4b1!4m5!3m4!1s0x47c37161feeca14b:0xb8f72585fdd21c90!8m2!3d51.037408!4d3.706678?hl=nl>
>
> M: +32 474 71 31 08 <+32%20474%2071%2031%2008>
>