You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Juan Romero <js...@gmail.com> on 2023/04/19 00:21:46 UTC

Avoid using docker when I use a external transformation

Hi.

I have an issue when I try to run a kafka io pipeline in python on my local
machine, because in my local machine it is not possible to install docker.
Seems that beam try to use docker to pull and start the beam java sdk i
order to start the expansion service. I tried to start manually the
expansion service and define the expansion service url in the connector
properties but in anyway it keeps asking by docker process. My question is
if we can run a pipeline with external transformations without install
docker.

Looking forward to it. Thanks!!

Re: Avoid using docker when I use a external transformation

Posted by Juan Romero <js...@gmail.com>.
Thanks Cham!!. Yep I tried the last option, basically create the VM and
connect to it throughout ssh in visual studio code, but we don't want
incurring costs to maintain these machines. In anyway thanks for your
response and take your time to help me. Good luck  :)!!

Thanks!!

El mié, 26 abr 2023 a las 10:29, Chamikara Jayalath (<ch...@google.com>)
escribió:

> So, as I mentioned I don't think currently there are good options for
> executing multi-lang pipelines locally without the DOCKER environment type.
>
> BTW seems like Docker does work for AWS Workspaces on Linux but not on
> Windows:
> https://docs.aws.amazon.com/workspaces/latest/adminguide/amazon-workspaces-troubleshooting.html#docker_support
>
> Another option might be to set up a portable Beam runner and Docker in an
> EC2 node that all AWS Workspaces nodes have access to and execute pipelines
> using that.
>
> Thanks,
> Cham
>
> On Wed, Apr 26, 2023 at 8:19 AM Juan Romero <js...@gmail.com> wrote:
>
>> Hi Cham. I would like to know your options regarding the issue I am
>> having. Thanks!
>>
>> El lun, 24 abr 2023 a las 14:54, Juan Romero (<js...@gmail.com>)
>> escribió:
>>
>>> Hi Cham!!
>>>
>>> Actually we will run our production pipelines in dataflow. But:
>>>
>>> 1.  We want to set up the dev environment where all the developers can
>>> run the pipeline without deploying it in GCP Dataflow, only run the
>>> pipelines with the direct runner for development and testing purposes. The
>>> restriction is that all the developers are working in AWS Workspace (Remote
>>> Desktop) which does not support docker, for this reason we need to run the
>>> expansion service manually.
>>>
>>> 2. We want to create a custom docker image for all the workers which
>>> reduces the startup time (Dataflow is serverless and we can define a custom
>>> docker image for all the worker instances it creates). The thing here is
>>> that we would need to install docker in order to run the kafka
>>> transformation into a container which is not a good practice in
>>> accordance to what I have researched.
>>>
>>> Thank you Cham!!
>>>
>>> Looking forward to it!
>>>
>>>
>>>
>>>
>>>
>>> El lun, 24 abr 2023 a las 12:40, Chamikara Jayalath (<
>>> chamikara@google.com>) escribió:
>>>
>>>> I don't think the above options have been implemented correctly to
>>>> execute multi-lang pipelines without Docker correctly (at least we don't
>>>> have tests for this). As Robert said, we do not need Docker during
>>>> expansion (so you don't really need a manual expansion service if you are
>>>> using a released Beam version) but you will probably need Docker during
>>>> pipeline execution.
>>>>
>>>> Not sure if this will work for your scenario, but have you considered
>>>> setting up a Flink cluster in a machine that does have access to Docker and
>>>> executing your pipeline using that ? Another option might be to use
>>>> Dataflow which manages the environment for you.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>> On Mon, Apr 24, 2023 at 9:10 AM Robert Bradshaw via user <
>>>> user@beam.apache.org> wrote:
>>>>
>>>>> On Sat, Apr 22, 2023 at 12:53 PM Juan Romero <js...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Robert. Thanks for your response. Let me show what I have been
>>>>>> trying till now step by step. I hope I can be clear in my explanation:
>>>>>>
>>>>>> 1. Firstly I downloaded the java expansion service jar from this url
>>>>>> :
>>>>>> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service/2.45.0
>>>>>> 2. After I start the expansion service in the port 2233 with this
>>>>>> command:   *java -jar beam-sdks-java-io-expansion-service-2.45.0.jar
>>>>>> 2233* , like I show in this image:
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> 3. After that, I set the expansion service url attribute in the
>>>>>> connector (For ReadFromKafka and WriteToKafka) with the value
>>>>>> expansion_service='localhost:2233' :
>>>>>>
>>>>>> stream_data = (p
>>>>>>                | "Reading messages from Kafka" >> ReadFromKafka(
>>>>>>                     consumer_config=consumer_config,
>>>>>>                     topics=[source_topic],
>>>>>>                     with_metadata=True,
>>>>>>                     commit_offset_in_finalize=True,
>>>>>>                     max_num_records=10,
>>>>>>                     expansion_service='localhost:2233'
>>>>>>                 )
>>>>>>                #| "windowing" >> beam.WindowInto(beam.window.FixedWindows(3))
>>>>>>                | "Parse Messages" >> beam.ParDo(ParseRawMessage2())
>>>>>>                | "Clean address" >> beam.ParDo(CleanAddress())
>>>>>>                | beam.Map(lambda x: (x.offset.encode('utf-8'), serialize_message2(x))).with_output_types(typing.Tuple[bytes, bytes])
>>>>>> )
>>>>>>
>>>>>> writing = stream_data | WriteToKafka(
>>>>>>                     producer_config=producer_config_hardcoded,
>>>>>>                     topic=target_topic,
>>>>>>                     value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
>>>>>>                     expansion_service='localhost:2233'
>>>>>>                 )
>>>>>>
>>>>>>
>>>>>> The pipeline obviously will run in the same server when the expansion
>>>>>> service is running. Actually is running into a container that has an Ubuntu
>>>>>> focal like base.
>>>>>>
>>>>>> 4. The when I try to run the pipeline with the direct runner and I
>>>>>> get the following message: *FileNotFoundError: [Errno 2] No such
>>>>>> file or directory: 'docker'  *like i show in the image*:*
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> Seems that although the expansion service is running the beam driver
>>>>>> program keeps asking for a docker process.
>>>>>>
>>>>>
>>>>> Correct. You need to set the default (worker) environment type when
>>>>> setting up the expansion service as well. This can be set via the pipeline
>>>>> options at
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L52
>>>>>
>>>>> https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L102
>>>>>
>>>>>
>>>>>> 5. To verify is the expansion service server is listening i execute
>>>>>> the command:  * telnet localhost 2233* and i get the following :
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> Seems that the server is listening.
>>>>>>
>>>>>>
>>>>>> Then I Tried what you suggested. I don't really know if I am missing
>>>>>> any piece.
>>>>>>
>>>>>> Would be helpful If you can help to see What i'm missing, or if
>>>>>> docker is compulsory in order to run a pipeline with external
>>>>>> transformation with the direct runner.
>>>>>>
>>>>>> Thank you!
>>>>>>
>>>>>> El mar, 18 abr 2023 a las 19:46, Robert Bradshaw via user (<
>>>>>> user@beam.apache.org>) escribió:
>>>>>>
>>>>>>> Docker is not necessary to expand the transform (indeed, by default
>>>>>>> it should just pull the Jar and invokes that directly to start the
>>>>>>> expansion service), but it is used as the environment in which to execute
>>>>>>> the expanded transform.
>>>>>>>
>>>>>>> It would be in theory possible to run the worker without docker as
>>>>>>> well. This would involve manually starting up a worker in Java, manually
>>>>>>> starting up an expansion service that points to this worker as its
>>>>>>> environment, and then using that expansion service from Python. I've never
>>>>>>> done that myself, so I don't know how easy it would be, but the "LOOPBACK"
>>>>>>> runner in Java could give some insight into how this could be done.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 18, 2023 at 5:22 PM Juan Romero <js...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi.
>>>>>>>>
>>>>>>>> I have an issue when I try to run a kafka io pipeline in python on
>>>>>>>> my local machine, because in my local machine it is not possible to install
>>>>>>>> docker. Seems that beam try to use docker to pull and start the beam java
>>>>>>>> sdk i order to start the expansion service. I tried to start manually the
>>>>>>>> expansion service and define the expansion service url in the connector
>>>>>>>> properties but in anyway it keeps asking by docker process. My question is
>>>>>>>> if we can run a pipeline with external transformations without install
>>>>>>>> docker.
>>>>>>>>
>>>>>>>> Looking forward to it. Thanks!!
>>>>>>>>
>>>>>>>

Re: Avoid using docker when I use a external transformation

Posted by Chamikara Jayalath via user <us...@beam.apache.org>.
So, as I mentioned I don't think currently there are good options for
executing multi-lang pipelines locally without the DOCKER environment type.

BTW seems like Docker does work for AWS Workspaces on Linux but not on
Windows:
https://docs.aws.amazon.com/workspaces/latest/adminguide/amazon-workspaces-troubleshooting.html#docker_support

Another option might be to set up a portable Beam runner and Docker in an
EC2 node that all AWS Workspaces nodes have access to and execute pipelines
using that.

Thanks,
Cham

On Wed, Apr 26, 2023 at 8:19 AM Juan Romero <js...@gmail.com> wrote:

> Hi Cham. I would like to know your options regarding the issue I am
> having. Thanks!
>
> El lun, 24 abr 2023 a las 14:54, Juan Romero (<js...@gmail.com>)
> escribió:
>
>> Hi Cham!!
>>
>> Actually we will run our production pipelines in dataflow. But:
>>
>> 1.  We want to set up the dev environment where all the developers can
>> run the pipeline without deploying it in GCP Dataflow, only run the
>> pipelines with the direct runner for development and testing purposes. The
>> restriction is that all the developers are working in AWS Workspace (Remote
>> Desktop) which does not support docker, for this reason we need to run the
>> expansion service manually.
>>
>> 2. We want to create a custom docker image for all the workers which
>> reduces the startup time (Dataflow is serverless and we can define a custom
>> docker image for all the worker instances it creates). The thing here is
>> that we would need to install docker in order to run the kafka
>> transformation into a container which is not a good practice in
>> accordance to what I have researched.
>>
>> Thank you Cham!!
>>
>> Looking forward to it!
>>
>>
>>
>>
>>
>> El lun, 24 abr 2023 a las 12:40, Chamikara Jayalath (<
>> chamikara@google.com>) escribió:
>>
>>> I don't think the above options have been implemented correctly to
>>> execute multi-lang pipelines without Docker correctly (at least we don't
>>> have tests for this). As Robert said, we do not need Docker during
>>> expansion (so you don't really need a manual expansion service if you are
>>> using a released Beam version) but you will probably need Docker during
>>> pipeline execution.
>>>
>>> Not sure if this will work for your scenario, but have you considered
>>> setting up a Flink cluster in a machine that does have access to Docker and
>>> executing your pipeline using that ? Another option might be to use
>>> Dataflow which manages the environment for you.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Mon, Apr 24, 2023 at 9:10 AM Robert Bradshaw via user <
>>> user@beam.apache.org> wrote:
>>>
>>>> On Sat, Apr 22, 2023 at 12:53 PM Juan Romero <js...@gmail.com> wrote:
>>>>
>>>>> Hi Robert. Thanks for your response. Let me show what I have been
>>>>> trying till now step by step. I hope I can be clear in my explanation:
>>>>>
>>>>> 1. Firstly I downloaded the java expansion service jar from this url :
>>>>> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service/2.45.0
>>>>> 2. After I start the expansion service in the port 2233 with this
>>>>> command:   *java -jar beam-sdks-java-io-expansion-service-2.45.0.jar
>>>>> 2233* , like I show in this image:
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> 3. After that, I set the expansion service url attribute in the
>>>>> connector (For ReadFromKafka and WriteToKafka) with the value
>>>>> expansion_service='localhost:2233' :
>>>>>
>>>>> stream_data = (p
>>>>>                | "Reading messages from Kafka" >> ReadFromKafka(
>>>>>                     consumer_config=consumer_config,
>>>>>                     topics=[source_topic],
>>>>>                     with_metadata=True,
>>>>>                     commit_offset_in_finalize=True,
>>>>>                     max_num_records=10,
>>>>>                     expansion_service='localhost:2233'
>>>>>                 )
>>>>>                #| "windowing" >> beam.WindowInto(beam.window.FixedWindows(3))
>>>>>                | "Parse Messages" >> beam.ParDo(ParseRawMessage2())
>>>>>                | "Clean address" >> beam.ParDo(CleanAddress())
>>>>>                | beam.Map(lambda x: (x.offset.encode('utf-8'), serialize_message2(x))).with_output_types(typing.Tuple[bytes, bytes])
>>>>> )
>>>>>
>>>>> writing = stream_data | WriteToKafka(
>>>>>                     producer_config=producer_config_hardcoded,
>>>>>                     topic=target_topic,
>>>>>                     value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
>>>>>                     expansion_service='localhost:2233'
>>>>>                 )
>>>>>
>>>>>
>>>>> The pipeline obviously will run in the same server when the expansion
>>>>> service is running. Actually is running into a container that has an Ubuntu
>>>>> focal like base.
>>>>>
>>>>> 4. The when I try to run the pipeline with the direct runner and I get
>>>>> the following message: *FileNotFoundError: [Errno 2] No such file or
>>>>> directory: 'docker'  *like i show in the image*:*
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> Seems that although the expansion service is running the beam driver
>>>>> program keeps asking for a docker process.
>>>>>
>>>>
>>>> Correct. You need to set the default (worker) environment type when
>>>> setting up the expansion service as well. This can be set via the pipeline
>>>> options at
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L52
>>>>
>>>> https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L102
>>>>
>>>>
>>>>> 5. To verify is the expansion service server is listening i execute
>>>>> the command:  * telnet localhost 2233* and i get the following :
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> Seems that the server is listening.
>>>>>
>>>>>
>>>>> Then I Tried what you suggested. I don't really know if I am missing
>>>>> any piece.
>>>>>
>>>>> Would be helpful If you can help to see What i'm missing, or if docker
>>>>> is compulsory in order to run a pipeline with external transformation with
>>>>> the direct runner.
>>>>>
>>>>> Thank you!
>>>>>
>>>>> El mar, 18 abr 2023 a las 19:46, Robert Bradshaw via user (<
>>>>> user@beam.apache.org>) escribió:
>>>>>
>>>>>> Docker is not necessary to expand the transform (indeed, by default
>>>>>> it should just pull the Jar and invokes that directly to start the
>>>>>> expansion service), but it is used as the environment in which to execute
>>>>>> the expanded transform.
>>>>>>
>>>>>> It would be in theory possible to run the worker without docker as
>>>>>> well. This would involve manually starting up a worker in Java, manually
>>>>>> starting up an expansion service that points to this worker as its
>>>>>> environment, and then using that expansion service from Python. I've never
>>>>>> done that myself, so I don't know how easy it would be, but the "LOOPBACK"
>>>>>> runner in Java could give some insight into how this could be done.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 18, 2023 at 5:22 PM Juan Romero <js...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi.
>>>>>>>
>>>>>>> I have an issue when I try to run a kafka io pipeline in python on
>>>>>>> my local machine, because in my local machine it is not possible to install
>>>>>>> docker. Seems that beam try to use docker to pull and start the beam java
>>>>>>> sdk i order to start the expansion service. I tried to start manually the
>>>>>>> expansion service and define the expansion service url in the connector
>>>>>>> properties but in anyway it keeps asking by docker process. My question is
>>>>>>> if we can run a pipeline with external transformations without install
>>>>>>> docker.
>>>>>>>
>>>>>>> Looking forward to it. Thanks!!
>>>>>>>
>>>>>>

Re: Avoid using docker when I use a external transformation

Posted by Juan Romero <js...@gmail.com>.
Hi Cham. I would like to know your options regarding the issue I am having.
Thanks!

El lun, 24 abr 2023 a las 14:54, Juan Romero (<js...@gmail.com>) escribió:

> Hi Cham!!
>
> Actually we will run our production pipelines in dataflow. But:
>
> 1.  We want to set up the dev environment where all the developers can run
> the pipeline without deploying it in GCP Dataflow, only run the
> pipelines with the direct runner for development and testing purposes. The
> restriction is that all the developers are working in AWS Workspace (Remote
> Desktop) which does not support docker, for this reason we need to run the
> expansion service manually.
>
> 2. We want to create a custom docker image for all the workers which
> reduces the startup time (Dataflow is serverless and we can define a custom
> docker image for all the worker instances it creates). The thing here is
> that we would need to install docker in order to run the kafka
> transformation into a container which is not a good practice in
> accordance to what I have researched.
>
> Thank you Cham!!
>
> Looking forward to it!
>
>
>
>
>
> El lun, 24 abr 2023 a las 12:40, Chamikara Jayalath (<ch...@google.com>)
> escribió:
>
>> I don't think the above options have been implemented correctly to
>> execute multi-lang pipelines without Docker correctly (at least we don't
>> have tests for this). As Robert said, we do not need Docker during
>> expansion (so you don't really need a manual expansion service if you are
>> using a released Beam version) but you will probably need Docker during
>> pipeline execution.
>>
>> Not sure if this will work for your scenario, but have you considered
>> setting up a Flink cluster in a machine that does have access to Docker and
>> executing your pipeline using that ? Another option might be to use
>> Dataflow which manages the environment for you.
>>
>> Thanks,
>> Cham
>>
>> On Mon, Apr 24, 2023 at 9:10 AM Robert Bradshaw via user <
>> user@beam.apache.org> wrote:
>>
>>> On Sat, Apr 22, 2023 at 12:53 PM Juan Romero <js...@gmail.com> wrote:
>>>
>>>> Hi Robert. Thanks for your response. Let me show what I have been
>>>> trying till now step by step. I hope I can be clear in my explanation:
>>>>
>>>> 1. Firstly I downloaded the java expansion service jar from this url :
>>>> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service/2.45.0
>>>> 2. After I start the expansion service in the port 2233 with this
>>>> command:   *java -jar beam-sdks-java-io-expansion-service-2.45.0.jar
>>>> 2233* , like I show in this image:
>>>>
>>>> [image: image.png]
>>>>
>>>> 3. After that, I set the expansion service url attribute in the
>>>> connector (For ReadFromKafka and WriteToKafka) with the value
>>>> expansion_service='localhost:2233' :
>>>>
>>>> stream_data = (p
>>>>                | "Reading messages from Kafka" >> ReadFromKafka(
>>>>                     consumer_config=consumer_config,
>>>>                     topics=[source_topic],
>>>>                     with_metadata=True,
>>>>                     commit_offset_in_finalize=True,
>>>>                     max_num_records=10,
>>>>                     expansion_service='localhost:2233'
>>>>                 )
>>>>                #| "windowing" >> beam.WindowInto(beam.window.FixedWindows(3))
>>>>                | "Parse Messages" >> beam.ParDo(ParseRawMessage2())
>>>>                | "Clean address" >> beam.ParDo(CleanAddress())
>>>>                | beam.Map(lambda x: (x.offset.encode('utf-8'), serialize_message2(x))).with_output_types(typing.Tuple[bytes, bytes])
>>>> )
>>>>
>>>> writing = stream_data | WriteToKafka(
>>>>                     producer_config=producer_config_hardcoded,
>>>>                     topic=target_topic,
>>>>                     value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
>>>>                     expansion_service='localhost:2233'
>>>>                 )
>>>>
>>>>
>>>> The pipeline obviously will run in the same server when the expansion
>>>> service is running. Actually is running into a container that has an Ubuntu
>>>> focal like base.
>>>>
>>>> 4. The when I try to run the pipeline with the direct runner and I get
>>>> the following message: *FileNotFoundError: [Errno 2] No such file or
>>>> directory: 'docker'  *like i show in the image*:*
>>>>
>>>> [image: image.png]
>>>>
>>>> Seems that although the expansion service is running the beam driver
>>>> program keeps asking for a docker process.
>>>>
>>>
>>> Correct. You need to set the default (worker) environment type when
>>> setting up the expansion service as well. This can be set via the pipeline
>>> options at
>>>
>>>
>>> https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L52
>>>
>>> https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L102
>>>
>>>
>>>> 5. To verify is the expansion service server is listening i execute the
>>>> command:  * telnet localhost 2233* and i get the following :
>>>>
>>>> [image: image.png]
>>>>
>>>> Seems that the server is listening.
>>>>
>>>>
>>>> Then I Tried what you suggested. I don't really know if I am missing
>>>> any piece.
>>>>
>>>> Would be helpful If you can help to see What i'm missing, or if docker
>>>> is compulsory in order to run a pipeline with external transformation with
>>>> the direct runner.
>>>>
>>>> Thank you!
>>>>
>>>> El mar, 18 abr 2023 a las 19:46, Robert Bradshaw via user (<
>>>> user@beam.apache.org>) escribió:
>>>>
>>>>> Docker is not necessary to expand the transform (indeed, by default it
>>>>> should just pull the Jar and invokes that directly to start the expansion
>>>>> service), but it is used as the environment in which to execute the
>>>>> expanded transform.
>>>>>
>>>>> It would be in theory possible to run the worker without docker as
>>>>> well. This would involve manually starting up a worker in Java, manually
>>>>> starting up an expansion service that points to this worker as its
>>>>> environment, and then using that expansion service from Python. I've never
>>>>> done that myself, so I don't know how easy it would be, but the "LOOPBACK"
>>>>> runner in Java could give some insight into how this could be done.
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 18, 2023 at 5:22 PM Juan Romero <js...@gmail.com> wrote:
>>>>>
>>>>>> Hi.
>>>>>>
>>>>>> I have an issue when I try to run a kafka io pipeline in python on my
>>>>>> local machine, because in my local machine it is not possible to install
>>>>>> docker. Seems that beam try to use docker to pull and start the beam java
>>>>>> sdk i order to start the expansion service. I tried to start manually the
>>>>>> expansion service and define the expansion service url in the connector
>>>>>> properties but in anyway it keeps asking by docker process. My question is
>>>>>> if we can run a pipeline with external transformations without install
>>>>>> docker.
>>>>>>
>>>>>> Looking forward to it. Thanks!!
>>>>>>
>>>>>

Re: Avoid using docker when I use a external transformation

Posted by Juan Romero <js...@gmail.com>.
Hi Cham!!

Actually we will run our production pipelines in dataflow. But:

1.  We want to set up the dev environment where all the developers can run
the pipeline without deploying it in GCP Dataflow, only run the
pipelines with the direct runner for development and testing purposes. The
restriction is that all the developers are working in AWS Workspace (Remote
Desktop) which does not support docker, for this reason we need to run the
expansion service manually.

2. We want to create a custom docker image for all the workers which
reduces the startup time (Dataflow is serverless and we can define a custom
docker image for all the worker instances it creates). The thing here is
that we would need to install docker in order to run the kafka
transformation into a container which is not a good practice in
accordance to what I have researched.

Thank you Cham!!

Looking forward to it!





El lun, 24 abr 2023 a las 12:40, Chamikara Jayalath (<ch...@google.com>)
escribió:

> I don't think the above options have been implemented correctly to execute
> multi-lang pipelines without Docker correctly (at least we don't have tests
> for this). As Robert said, we do not need Docker during expansion (so you
> don't really need a manual expansion service if you are using a released
> Beam version) but you will probably need Docker during pipeline execution.
>
> Not sure if this will work for your scenario, but have you considered
> setting up a Flink cluster in a machine that does have access to Docker and
> executing your pipeline using that ? Another option might be to use
> Dataflow which manages the environment for you.
>
> Thanks,
> Cham
>
> On Mon, Apr 24, 2023 at 9:10 AM Robert Bradshaw via user <
> user@beam.apache.org> wrote:
>
>> On Sat, Apr 22, 2023 at 12:53 PM Juan Romero <js...@gmail.com> wrote:
>>
>>> Hi Robert. Thanks for your response. Let me show what I have been trying
>>> till now step by step. I hope I can be clear in my explanation:
>>>
>>> 1. Firstly I downloaded the java expansion service jar from this url :
>>> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service/2.45.0
>>> 2. After I start the expansion service in the port 2233 with this
>>> command:   *java -jar beam-sdks-java-io-expansion-service-2.45.0.jar
>>> 2233* , like I show in this image:
>>>
>>> [image: image.png]
>>>
>>> 3. After that, I set the expansion service url attribute in the
>>> connector (For ReadFromKafka and WriteToKafka) with the value
>>> expansion_service='localhost:2233' :
>>>
>>> stream_data = (p
>>>                | "Reading messages from Kafka" >> ReadFromKafka(
>>>                     consumer_config=consumer_config,
>>>                     topics=[source_topic],
>>>                     with_metadata=True,
>>>                     commit_offset_in_finalize=True,
>>>                     max_num_records=10,
>>>                     expansion_service='localhost:2233'
>>>                 )
>>>                #| "windowing" >> beam.WindowInto(beam.window.FixedWindows(3))
>>>                | "Parse Messages" >> beam.ParDo(ParseRawMessage2())
>>>                | "Clean address" >> beam.ParDo(CleanAddress())
>>>                | beam.Map(lambda x: (x.offset.encode('utf-8'), serialize_message2(x))).with_output_types(typing.Tuple[bytes, bytes])
>>> )
>>>
>>> writing = stream_data | WriteToKafka(
>>>                     producer_config=producer_config_hardcoded,
>>>                     topic=target_topic,
>>>                     value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
>>>                     expansion_service='localhost:2233'
>>>                 )
>>>
>>>
>>> The pipeline obviously will run in the same server when the expansion
>>> service is running. Actually is running into a container that has an Ubuntu
>>> focal like base.
>>>
>>> 4. The when I try to run the pipeline with the direct runner and I get
>>> the following message: *FileNotFoundError: [Errno 2] No such file or
>>> directory: 'docker'  *like i show in the image*:*
>>>
>>> [image: image.png]
>>>
>>> Seems that although the expansion service is running the beam driver
>>> program keeps asking for a docker process.
>>>
>>
>> Correct. You need to set the default (worker) environment type when
>> setting up the expansion service as well. This can be set via the pipeline
>> options at
>>
>>
>> https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L52
>>
>> https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L102
>>
>>
>>> 5. To verify is the expansion service server is listening i execute the
>>> command:  * telnet localhost 2233* and i get the following :
>>>
>>> [image: image.png]
>>>
>>> Seems that the server is listening.
>>>
>>>
>>> Then I Tried what you suggested. I don't really know if I am missing any
>>> piece.
>>>
>>> Would be helpful If you can help to see What i'm missing, or if docker
>>> is compulsory in order to run a pipeline with external transformation with
>>> the direct runner.
>>>
>>> Thank you!
>>>
>>> El mar, 18 abr 2023 a las 19:46, Robert Bradshaw via user (<
>>> user@beam.apache.org>) escribió:
>>>
>>>> Docker is not necessary to expand the transform (indeed, by default it
>>>> should just pull the Jar and invokes that directly to start the expansion
>>>> service), but it is used as the environment in which to execute the
>>>> expanded transform.
>>>>
>>>> It would be in theory possible to run the worker without docker as
>>>> well. This would involve manually starting up a worker in Java, manually
>>>> starting up an expansion service that points to this worker as its
>>>> environment, and then using that expansion service from Python. I've never
>>>> done that myself, so I don't know how easy it would be, but the "LOOPBACK"
>>>> runner in Java could give some insight into how this could be done.
>>>>
>>>>
>>>>
>>>> On Tue, Apr 18, 2023 at 5:22 PM Juan Romero <js...@gmail.com> wrote:
>>>>
>>>>> Hi.
>>>>>
>>>>> I have an issue when I try to run a kafka io pipeline in python on my
>>>>> local machine, because in my local machine it is not possible to install
>>>>> docker. Seems that beam try to use docker to pull and start the beam java
>>>>> sdk i order to start the expansion service. I tried to start manually the
>>>>> expansion service and define the expansion service url in the connector
>>>>> properties but in anyway it keeps asking by docker process. My question is
>>>>> if we can run a pipeline with external transformations without install
>>>>> docker.
>>>>>
>>>>> Looking forward to it. Thanks!!
>>>>>
>>>>

Re: Avoid using docker when I use a external transformation

Posted by Chamikara Jayalath via user <us...@beam.apache.org>.
I don't think the above options have been implemented correctly to execute
multi-lang pipelines without Docker correctly (at least we don't have tests
for this). As Robert said, we do not need Docker during expansion (so you
don't really need a manual expansion service if you are using a released
Beam version) but you will probably need Docker during pipeline execution.

Not sure if this will work for your scenario, but have you considered
setting up a Flink cluster in a machine that does have access to Docker and
executing your pipeline using that ? Another option might be to use
Dataflow which manages the environment for you.

Thanks,
Cham

On Mon, Apr 24, 2023 at 9:10 AM Robert Bradshaw via user <
user@beam.apache.org> wrote:

> On Sat, Apr 22, 2023 at 12:53 PM Juan Romero <js...@gmail.com> wrote:
>
>> Hi Robert. Thanks for your response. Let me show what I have been trying
>> till now step by step. I hope I can be clear in my explanation:
>>
>> 1. Firstly I downloaded the java expansion service jar from this url :
>> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service/2.45.0
>> 2. After I start the expansion service in the port 2233 with this
>> command:   *java -jar beam-sdks-java-io-expansion-service-2.45.0.jar
>> 2233* , like I show in this image:
>>
>> [image: image.png]
>>
>> 3. After that, I set the expansion service url attribute in the connector
>> (For ReadFromKafka and WriteToKafka) with the value expansion_service='
>> localhost:2233' :
>>
>> stream_data = (p
>>                | "Reading messages from Kafka" >> ReadFromKafka(
>>                     consumer_config=consumer_config,
>>                     topics=[source_topic],
>>                     with_metadata=True,
>>                     commit_offset_in_finalize=True,
>>                     max_num_records=10,
>>                     expansion_service='localhost:2233'
>>                 )
>>                #| "windowing" >> beam.WindowInto(beam.window.FixedWindows(3))
>>                | "Parse Messages" >> beam.ParDo(ParseRawMessage2())
>>                | "Clean address" >> beam.ParDo(CleanAddress())
>>                | beam.Map(lambda x: (x.offset.encode('utf-8'), serialize_message2(x))).with_output_types(typing.Tuple[bytes, bytes])
>> )
>>
>> writing = stream_data | WriteToKafka(
>>                     producer_config=producer_config_hardcoded,
>>                     topic=target_topic,
>>                     value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
>>                     expansion_service='localhost:2233'
>>                 )
>>
>>
>> The pipeline obviously will run in the same server when the expansion
>> service is running. Actually is running into a container that has an Ubuntu
>> focal like base.
>>
>> 4. The when I try to run the pipeline with the direct runner and I get
>> the following message: *FileNotFoundError: [Errno 2] No such file or
>> directory: 'docker'  *like i show in the image*:*
>>
>> [image: image.png]
>>
>> Seems that although the expansion service is running the beam driver
>> program keeps asking for a docker process.
>>
>
> Correct. You need to set the default (worker) environment type when
> setting up the expansion service as well. This can be set via the pipeline
> options at
>
>
> https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L52
>
> https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L102
>
>
>> 5. To verify is the expansion service server is listening i execute the
>> command:  * telnet localhost 2233* and i get the following :
>>
>> [image: image.png]
>>
>> Seems that the server is listening.
>>
>>
>> Then I Tried what you suggested. I don't really know if I am missing any
>> piece.
>>
>> Would be helpful If you can help to see What i'm missing, or if docker is
>> compulsory in order to run a pipeline with external transformation with the
>> direct runner.
>>
>> Thank you!
>>
>> El mar, 18 abr 2023 a las 19:46, Robert Bradshaw via user (<
>> user@beam.apache.org>) escribió:
>>
>>> Docker is not necessary to expand the transform (indeed, by default it
>>> should just pull the Jar and invokes that directly to start the expansion
>>> service), but it is used as the environment in which to execute the
>>> expanded transform.
>>>
>>> It would be in theory possible to run the worker without docker as well.
>>> This would involve manually starting up a worker in Java, manually starting
>>> up an expansion service that points to this worker as its environment, and
>>> then using that expansion service from Python. I've never done that myself,
>>> so I don't know how easy it would be, but the "LOOPBACK" runner in Java
>>> could give some insight into how this could be done.
>>>
>>>
>>>
>>> On Tue, Apr 18, 2023 at 5:22 PM Juan Romero <js...@gmail.com> wrote:
>>>
>>>> Hi.
>>>>
>>>> I have an issue when I try to run a kafka io pipeline in python on my
>>>> local machine, because in my local machine it is not possible to install
>>>> docker. Seems that beam try to use docker to pull and start the beam java
>>>> sdk i order to start the expansion service. I tried to start manually the
>>>> expansion service and define the expansion service url in the connector
>>>> properties but in anyway it keeps asking by docker process. My question is
>>>> if we can run a pipeline with external transformations without install
>>>> docker.
>>>>
>>>> Looking forward to it. Thanks!!
>>>>
>>>

Re: Avoid using docker when I use a external transformation

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
On Sat, Apr 22, 2023 at 12:53 PM Juan Romero <js...@gmail.com> wrote:

> Hi Robert. Thanks for your response. Let me show what I have been trying
> till now step by step. I hope I can be clear in my explanation:
>
> 1. Firstly I downloaded the java expansion service jar from this url :
> https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service/2.45.0
> 2. After I start the expansion service in the port 2233 with this
> command:   *java -jar beam-sdks-java-io-expansion-service-2.45.0.jar 2233*
> , like I show in this image:
>
> [image: image.png]
>
> 3. After that, I set the expansion service url attribute in the connector
> (For ReadFromKafka and WriteToKafka) with the value expansion_service='
> localhost:2233' :
>
> stream_data = (p
>                | "Reading messages from Kafka" >> ReadFromKafka(
>                     consumer_config=consumer_config,
>                     topics=[source_topic],
>                     with_metadata=True,
>                     commit_offset_in_finalize=True,
>                     max_num_records=10,
>                     expansion_service='localhost:2233'
>                 )
>                #| "windowing" >> beam.WindowInto(beam.window.FixedWindows(3))
>                | "Parse Messages" >> beam.ParDo(ParseRawMessage2())
>                | "Clean address" >> beam.ParDo(CleanAddress())
>                | beam.Map(lambda x: (x.offset.encode('utf-8'), serialize_message2(x))).with_output_types(typing.Tuple[bytes, bytes])
> )
>
> writing = stream_data | WriteToKafka(
>                     producer_config=producer_config_hardcoded,
>                     topic=target_topic,
>                     value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
>                     expansion_service='localhost:2233'
>                 )
>
>
> The pipeline obviously will run in the same server when the expansion
> service is running. Actually is running into a container that has an Ubuntu
> focal like base.
>
> 4. The when I try to run the pipeline with the direct runner and I get the
> following message: *FileNotFoundError: [Errno 2] No such file or
> directory: 'docker'  *like i show in the image*:*
>
> [image: image.png]
>
> Seems that although the expansion service is running the beam driver
> program keeps asking for a docker process.
>

Correct. You need to set the default (worker) environment type when setting
up the expansion service as well. This can be set via the pipeline options
at

https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L52
https://github.com/apache/beam/blob/v2.46.0/sdks/java/core/src/main/java/org/apache/beam/sdk/options/PortablePipelineOptions.java#L102


> 5. To verify is the expansion service server is listening i execute the
> command:  * telnet localhost 2233* and i get the following :
>
> [image: image.png]
>
> Seems that the server is listening.
>
>
> Then I Tried what you suggested. I don't really know if I am missing any
> piece.
>
> Would be helpful If you can help to see What i'm missing, or if docker is
> compulsory in order to run a pipeline with external transformation with the
> direct runner.
>
> Thank you!
>
> El mar, 18 abr 2023 a las 19:46, Robert Bradshaw via user (<
> user@beam.apache.org>) escribió:
>
>> Docker is not necessary to expand the transform (indeed, by default it
>> should just pull the Jar and invokes that directly to start the expansion
>> service), but it is used as the environment in which to execute the
>> expanded transform.
>>
>> It would be in theory possible to run the worker without docker as well.
>> This would involve manually starting up a worker in Java, manually starting
>> up an expansion service that points to this worker as its environment, and
>> then using that expansion service from Python. I've never done that myself,
>> so I don't know how easy it would be, but the "LOOPBACK" runner in Java
>> could give some insight into how this could be done.
>>
>>
>>
>> On Tue, Apr 18, 2023 at 5:22 PM Juan Romero <js...@gmail.com> wrote:
>>
>>> Hi.
>>>
>>> I have an issue when I try to run a kafka io pipeline in python on my
>>> local machine, because in my local machine it is not possible to install
>>> docker. Seems that beam try to use docker to pull and start the beam java
>>> sdk i order to start the expansion service. I tried to start manually the
>>> expansion service and define the expansion service url in the connector
>>> properties but in anyway it keeps asking by docker process. My question is
>>> if we can run a pipeline with external transformations without install
>>> docker.
>>>
>>> Looking forward to it. Thanks!!
>>>
>>

Re: Avoid using docker when I use a external transformation

Posted by Juan Romero <js...@gmail.com>.
Hi Robert. Thanks for your response. Let me show what I have been trying
till now step by step. I hope I can be clear in my explanation:

1. Firstly I downloaded the java expansion service jar from this url :
https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-expansion-service/2.45.0
2. After I start the expansion service in the port 2233 with this command:
 *java -jar beam-sdks-java-io-expansion-service-2.45.0.jar 2233* , like I
show in this image:

[image: image.png]

3. After that, I set the expansion service url attribute in the connector
(For ReadFromKafka and WriteToKafka) with the value expansion_service='
localhost:2233' :

stream_data = (p
               | "Reading messages from Kafka" >> ReadFromKafka(
                    consumer_config=consumer_config,
                    topics=[source_topic],
                    with_metadata=True,
                    commit_offset_in_finalize=True,
                    max_num_records=10,
                    expansion_service='localhost:2233'
                )
               #| "windowing" >> beam.WindowInto(beam.window.FixedWindows(3))
               | "Parse Messages" >> beam.ParDo(ParseRawMessage2())
               | "Clean address" >> beam.ParDo(CleanAddress())
               | beam.Map(lambda x: (x.offset.encode('utf-8'),
serialize_message2(x))).with_output_types(typing.Tuple[bytes, bytes])
)

writing = stream_data | WriteToKafka(
                    producer_config=producer_config_hardcoded,
                    topic=target_topic,

value_serializer='org.apache.kafka.common.serialization.ByteArraySerializer',
                    expansion_service='localhost:2233'
                )


The pipeline obviously will run in the same server when the expansion
service is running. Actually is running into a container that has an Ubuntu
focal like base.

4. The when I try to run the pipeline with the direct runner and I get the
following message: *FileNotFoundError: [Errno 2] No such file or directory:
'docker'  *like i show in the image*:*

[image: image.png]

Seems that although the expansion service is running the beam driver
program keeps asking for a docker process.

5. To verify is the expansion service server is listening i execute the
command:  * telnet localhost 2233* and i get the following :

[image: image.png]

Seems that the server is listening.


Then I Tried what you suggested. I don't really know if I am missing any
piece.

Would be helpful If you can help to see What i'm missing, or if docker is
compulsory in order to run a pipeline with external transformation with the
direct runner.

Thank you!

El mar, 18 abr 2023 a las 19:46, Robert Bradshaw via user (<
user@beam.apache.org>) escribió:

> Docker is not necessary to expand the transform (indeed, by default it
> should just pull the Jar and invokes that directly to start the expansion
> service), but it is used as the environment in which to execute the
> expanded transform.
>
> It would be in theory possible to run the worker without docker as well.
> This would involve manually starting up a worker in Java, manually starting
> up an expansion service that points to this worker as its environment, and
> then using that expansion service from Python. I've never done that myself,
> so I don't know how easy it would be, but the "LOOPBACK" runner in Java
> could give some insight into how this could be done.
>
>
>
> On Tue, Apr 18, 2023 at 5:22 PM Juan Romero <js...@gmail.com> wrote:
>
>> Hi.
>>
>> I have an issue when I try to run a kafka io pipeline in python on my
>> local machine, because in my local machine it is not possible to install
>> docker. Seems that beam try to use docker to pull and start the beam java
>> sdk i order to start the expansion service. I tried to start manually the
>> expansion service and define the expansion service url in the connector
>> properties but in anyway it keeps asking by docker process. My question is
>> if we can run a pipeline with external transformations without install
>> docker.
>>
>> Looking forward to it. Thanks!!
>>
>

Re: Avoid using docker when I use a external transformation

Posted by Robert Bradshaw via user <us...@beam.apache.org>.
Docker is not necessary to expand the transform (indeed, by default it
should just pull the Jar and invokes that directly to start the expansion
service), but it is used as the environment in which to execute the
expanded transform.

It would be in theory possible to run the worker without docker as well.
This would involve manually starting up a worker in Java, manually starting
up an expansion service that points to this worker as its environment, and
then using that expansion service from Python. I've never done that myself,
so I don't know how easy it would be, but the "LOOPBACK" runner in Java
could give some insight into how this could be done.



On Tue, Apr 18, 2023 at 5:22 PM Juan Romero <js...@gmail.com> wrote:

> Hi.
>
> I have an issue when I try to run a kafka io pipeline in python on my
> local machine, because in my local machine it is not possible to install
> docker. Seems that beam try to use docker to pull and start the beam java
> sdk i order to start the expansion service. I tried to start manually the
> expansion service and define the expansion service url in the connector
> properties but in anyway it keeps asking by docker process. My question is
> if we can run a pipeline with external transformations without install
> docker.
>
> Looking forward to it. Thanks!!
>