You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Lydian <ly...@gmail.com> on 2022/11/14 10:29:52 UTC

Local Test Beam Portable Flink Runner in mac

Hi,

I am working on implementing a local beam flink runner for faster
development.
I have made the docker image which contains the required flink and Beam
dependencies, and then launched different containers:Job manager, task
manager and beam job server, via docker-compose. I am using the bridge mode
(because docker doesn't support "host" network in mac) and exposed all the
related ports to localhost.

The test pipeline is written in python and runs as Portable runner, but
setting up the `--environment-type` to be `LOOPBACK` so that it uses my
local python code to run the change.  (Our pipeline is written in python,
but we need to use cross language for accessing data from Kafka)

Here's my understanding on what would happened
1. Start my short python code with the following arg:
```
   '--streaming',
   '--runner=portableRunner',
   '--environment_type=LOOPBACK',
   '--job_endpoint=localhost:8099',
   '--artifact_endpoint=localhost:8098',
   '--defaultEnvironmentType=EXTERNAL',
   '--defaultEnvironmentConfig=host.docker.internal:5000',
```
2.  The job launches Beam Java Expansion Service with process mode, because
I am using this function:
```
ReadFromKafka(
                consumer_config={"bootstrap.servers": "kafka:9092",
'auto.offset.reset': 'earliest'},
                topics=["test.topic"],
                with_metadata=False,
                expansion_service=default_io_expansion_service(
                    append_args=[
                        '--defaultEnvironmentType=PROCESS',

"--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/java_boot\"}",
                        '--experiments=use_deprecated_read',
                    ]
                )
            )
```
3. The job is then submitted to Beam Job Server
4. The job server then submit the actual job to Flink Job Manager
5. Flink Job manager distributes the work to Task Manager
6. Task Manager launched a Java Worker
7. Once the Java worker is done, it returns the processed content back to
original python process (because we are running in LOOPBACK)

However, on the very last step, it failed to run because it looks like
LOOPBACK opened a random port on the localhost and I have no idea how to
make the Java Worker talk to the "Host" with the random port.

I know the problem could be easily fixed by setting up network_mode to
host. However, we are using Mac for development, and the host network is
not supported for Docker on Mac.  Wondering if anyone tried the same thing
before and if there's any suggested workaround for mac user?  Thanks!

I also have my script and infra in this gist [1],  hopefully that would
make it easy to understand. Thanks!

[1] https://gist.github.com/lydian/0db7614652c2ccdc733884134bf67f9b

Sincerely,
Lydian Lee

Re: Local Test Beam Portable Flink Runner in mac

Posted by Ankur Goenka <an...@gmail.com>.
Hi Lydia,
The LOOPBACK environment is nothing but an EXTERNAL environment with
automated setup of SDK Harness process manager in the pipeline submission
process.
The LOOPBACK environment setup a server on a random port to start the SDK
Harness process on a random port. This is what you are probably hitting at
the moment.

[1] You can overcome this particular issue by using EXTERNAL environment
directly and manually start
https://github.com/apache/beam/blob/e439f4120ef4c25aa36e5b03756dc7391bdbd211/sdks/python/apache_beam/runners/worker/worker_pool_main.py#L201
with the same port.
However, I think the next problem that you will hit will be ports used by
the actual SDK Harness process to communicate with Flink Tasks (Runner
Harness).

My recommendation would be to start the EXTERNAL environment mentioned in
[1] inside the Flink taskmanager docker container though this doesn't fit
single process per docker container model.


Thanks,
Ankur


On Mon, 14 Nov 2022 at 02:30, Lydian <ly...@gmail.com> wrote:

> Hi,
>
> I am working on implementing a local beam flink runner for faster
> development.
> I have made the docker image which contains the required flink and Beam
> dependencies, and then launched different containers:Job manager, task
> manager and beam job server, via docker-compose. I am using the bridge mode
> (because docker doesn't support "host" network in mac) and exposed all the
> related ports to localhost.
>
> The test pipeline is written in python and runs as Portable runner, but
> setting up the `--environment-type` to be `LOOPBACK` so that it uses my
> local python code to run the change.  (Our pipeline is written in python,
> but we need to use cross language for accessing data from Kafka)
>
> Here's my understanding on what would happened
> 1. Start my short python code with the following arg:
> ```
>    '--streaming',
>    '--runner=portableRunner',
>    '--environment_type=LOOPBACK',
>    '--job_endpoint=localhost:8099',
>    '--artifact_endpoint=localhost:8098',
>    '--defaultEnvironmentType=EXTERNAL',
>    '--defaultEnvironmentConfig=host.docker.internal:5000',
> ```
> 2.  The job launches Beam Java Expansion Service with process mode,
> because I am using this function:
> ```
> ReadFromKafka(
>                 consumer_config={"bootstrap.servers": "kafka:9092",
> 'auto.offset.reset': 'earliest'},
>                 topics=["test.topic"],
>                 with_metadata=False,
>                 expansion_service=default_io_expansion_service(
>                     append_args=[
>                         '--defaultEnvironmentType=PROCESS',
>
> "--defaultEnvironmentConfig={\"command\":\"/opt/apache/beam/java_boot\"}",
>                         '--experiments=use_deprecated_read',
>                     ]
>                 )
>             )
> ```
> 3. The job is then submitted to Beam Job Server
> 4. The job server then submit the actual job to Flink Job Manager
> 5. Flink Job manager distributes the work to Task Manager
> 6. Task Manager launched a Java Worker
> 7. Once the Java worker is done, it returns the processed content back to
> original python process (because we are running in LOOPBACK)
>
> However, on the very last step, it failed to run because it looks like
> LOOPBACK opened a random port on the localhost and I have no idea how to
> make the Java Worker talk to the "Host" with the random port.
>
> I know the problem could be easily fixed by setting up network_mode to
> host. However, we are using Mac for development, and the host network is
> not supported for Docker on Mac.  Wondering if anyone tried the same thing
> before and if there's any suggested workaround for mac user?  Thanks!
>
> I also have my script and infra in this gist [1],  hopefully that would
> make it easy to understand. Thanks!
>
> [1] https://gist.github.com/lydian/0db7614652c2ccdc733884134bf67f9b
>
> Sincerely,
> Lydian Lee
>
>