You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Eugene Kirpichov <ek...@gmail.com> on 2020/08/26 23:10:30 UTC

Getting Beam(Python)-on-Flink-on-k8s to work

Hi folks,

I'm still working with Pachama <https://pachama.com/> right now; we have a
Kubernetes Engine cluster on GCP and want to run Beam Python batch
pipelines with custom containers against it.
Flink and Cloud Dataflow are the two options; Cloud Dataflow doesn't
support custom containers for batch pipelines yes so we're going with Flink.

I'm struggling to find complete documentation on how to do this. There
seems to be lots of conflicting or incomplete information: several ways to
deploy Flink, several ways to get Beam working with it, bizarre
StackOverflow questions, and no documentation explaining a complete working
example.

== My requests ==
* Could people briefly share their working setup? Would be good to know
which directions are promising.
* It would be particularly helpful if someone could volunteer an hour of
their time to talk to me about their working Beam/Flink/k8s setup. It's for
a good cause (fixing the planet :) ) and on my side I volunteer to write up
the findings to share with the community so others suffer less.

== Appendix: My findings so far ==
There are multiple ways to deploy Flink on k8s:
- The GCP marketplace Flink operator
<https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes>
(couldn't
get it to work) and the respective CLI version
<https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy, but
I got it working)
- https://github.com/lyft/flinkk8soperator (haven't tried)
- Flink's native k8s support
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
(super
easy to get working)
<https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>

I confirmed that my Flink cluster was operational by running a simple
Wordcount job, initiated from my machine. However I wasn't yet able to get
Beam working:

- With the Flink operator, I was able to submit a Beam job, but hit the
issue that I need Docker installed on my Flink nodes. I haven't yet tried
changing the operator's yaml files to add Docker inside them. I also
haven't tried this
<https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
yet because it implies submitting jobs using "kubectl apply"  which is
weird - why not just submit it through the Flink job server?

- With Flink's native k8s support, I tried two things:
  - Creating a fat portable jar using  --output_executable_path. The jar is
huge (200+MB) and takes forever to upload to my Flink cluster - this is a
non-starter. But if I actually upload it, then I hit the same issue with
lacking Docker. Haven't tried fixing it yet.
  - Simply running my pipeline --runner=FlinkRunner
--environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
appears to send 1+GB of data to somewhere, but the job never even starts.

I looked at a few conference talks:
-
https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
- seems to imply that I need to add a Beam worker "sidecar" to the Flink
workers; and that I need to submit my job using "kubectl apply".
- https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions the
sidecar, but also mentions the fat jar option

-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Getting Beam(Python)-on-Flink-on-k8s to work

Posted by Sam Bourne <sa...@gmail.com>.
On Sat, Aug 29, 2020 at 10:59 AM Eugene Kirpichov <ek...@gmail.com>
wrote:


>
> On Fri, Aug 28, 2020 at 6:52 PM Sam Bourne <sa...@gmail.com> wrote:
>
>> Hi Eugene,
>>
>> Glad that helped you out and thanks for the PR tweaking it for GCP.
>>
>> To fetch the containers from GCR, I had to log into Docker inside the
>> Flink nodes, specifically inside the taskmanager container, using something
>> like “kubectl exec pod/flink-taskmanager-blahblah -c taskmanager — docker
>> login -u oauth2accesstoken —password $(gcloud auth print-access-token)”
>>
>> Ouch that seems painful. I find this “precaching” step pretty silly and
>> have considered making the DockerEnvironmentFactory a little more
>> intelligent about how it deals with timeouts (e.g. no activity). It doesn’t
>> seem like it would be too difficult to also add first-class support for
>> pulling images from protected repositories. Extending the DockerPayload
>> protobuf to pass along the additional information and tweaking the
>> DockerEnvironmentFactory? I’m not a java expert but that might be worth
>> exploring if this continues to be problematic.
>>
> Yeah it makes sense to have some first-class support for Docker
> credentials. It's kind of a no-brainer that it's necessary with custom
> containers, many companies probably wouldn't want to push their custom
> containers to a public repo.
> I was thinking of embedding the credentials JSON file into the
> taskmanager container through its Dockerfile, that's workable but also
> pretty silly - having to rebuild this container just for the sake of
> putting in the credentials.
> DockerPayload might be the right place to put credentials, but I wonder if
> there's a way to do something more secure, with k8s secrets. I'm not too
> well-versed in credential management.
>
Using k8s secrets you could mount your credentials into the container and
tweak the pull/run command
<https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java#L77>
to first login using a pattern like cat /tmp/password.txt | docker login
--username foo --password-stdin. Maybe the DockerPayload protobuf could
include the password as raw-text or an absolute filepath and switch the
login command depending.

I found the time it takes to pull can be dramatically improved if you store
>> everything in memory
>> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/flink.yaml#L186>
>> .
>>
>> In the end, I got my pipeline to start, create the uber jar (about 240MB
>> in size), take a few minutes to transmit it to Flink
>>
>> You could explore spinning up the beam-flink-job-server
>> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/beam-flink-jobserver.yaml>
>> and using the PortableRunner. In theory that should reduce the amount of
>> data you’re syncing to the cluster. It does require exposing at least two
>> ingress points (8099 and 8098) so you can hit the job and artifact services
>> respectively.
>>
> Right, good idea! Haven't tried spinning up the job server. Exposing the
> job and artifact services seems pretty easy; but would also need to replace
> the jobserver image "apache/beam_flink1.10_job_server:2.23.0" with a
> custom-built one with the Beam 2.24 snapshot we're using.
>

This may be necessary anyways depending how you handle the docker login
stuff. In any case good luck!


>
>
>> Cheers,
>> Sam
>>
>> On Fri, Aug 28, 2020 at 5:50 PM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> Woohoo thanks Kyle, adding --save_main_session made it work!!!
>>>
>>> On Fri, Aug 28, 2020 at 5:02 PM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> > rpc error: code = Unimplemented desc = Method not found:
>>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>>>
>>>> This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762
>>>>
>>>> On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov <ek...@gmail.com>
>>>> wrote:
>>>>
>>>>> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort
>>>>> for a few months on Google side, and now I need community help to get it to
>>>>> work at all.
>>>>> Still pretty miraculous how far Beam's portability has come since
>>>>> then, even if it has a steep learning curve.
>>>>>
>>>>> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <ek...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Sam,
>>>>>>
>>>>>> You're a wizard - this got me *way* farther than my previous
>>>>>> attempts. Here's a PR
>>>>>> https://github.com/sambvfx/beam-flink-k8s/pull/1 with a couple of
>>>>>> changes I had to make.
>>>>>>
>>>>>> I had to make some additional changes that do not make sense to
>>>>>> share, but here they are for the record:
>>>>>> - Because I'm running on k8s engine and not minikube, I had to put
>>>>>> the docker-flink image on GCR, changing flink.yaml "image:
>>>>>> docker-flink:1.10" -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10".
>>>>>> I of course also had to build and push the container.
>>>>>> - Because I'm running with a custom container based on an unreleased
>>>>>> version of Beam, I had to push my custom container to GCR too, and change
>>>>>> your instructions to use that image name instead of the default one
>>>>>> - To fetch the containers from GCR, I had to log into Docker inside
>>>>>> the Flink nodes, specifically inside the taskmanager container, using
>>>>>> something like "kubectl exec pod/flink-taskmanager-blahblah -c taskmanager
>>>>>> -- docker login -u oauth2accesstoken --password $(gcloud auth
>>>>>> print-access-token)"
>>>>>> - Again because I'm using an unreleased Beam SDK (due to a bug whose
>>>>>> fix will be released in 2.24), I had to also build a custom Flink job
>>>>>> server jar and point to it via --flink_job_server_jar.
>>>>>>
>>>>>> In the end, I got my pipeline to start, create the uber jar (about
>>>>>> 240MB in size), take a few minutes to transmit it to Flink (which is a long
>>>>>> time, but it'll do for a prototype); the Flink UI was displaying the
>>>>>> pipeline, and was able to *start* the worker container - however it
>>>>>> quickly failed with the following error:
>>>>>>
>>>>>> 2020/08/28 15:49:09 Initializing python harness:
>>>>>> /opt/apache/beam/boot --id=1-1 --provision_endpoint=localhost:45111
>>>>>> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get
>>>>>> manifest
>>>>>> caused by:
>>>>>> rpc error: code = Unimplemented desc = Method not found:
>>>>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>>>>> (followed by a bunch of other garbage)
>>>>>>
>>>>>> I'm assuming this might be because I got tangled in my custom images
>>>>>> related to the unreleased Beam SDK, and should be fixed if running on clean
>>>>>> Beam 2.24.
>>>>>>
>>>>>> Thank you again!
>>>>>>
>>>>>> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov <
>>>>>> ekirpichov@gmail.com> wrote:
>>>>>>
>>>>>>> Holy shit, thanks Sam, this is more help than I could have asked
>>>>>>> for!!
>>>>>>> I'll give this a shot later today and report back.
>>>>>>>
>>>>>>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <sa...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Eugene!
>>>>>>>>
>>>>>>>> I’m struggling to find complete documentation on how to do this.
>>>>>>>> There seems to be lots of conflicting or incomplete information: several
>>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>>>> StackOverflow questions, and no documentation explaining a complete working
>>>>>>>> example.
>>>>>>>>
>>>>>>>> This *is* possible and I went through all the same frustrations of
>>>>>>>> sparse and confusing documentation. I’m glossing over a lot of details, but
>>>>>>>> the key thing was setting up the flink taskworker(s) to run docker. This
>>>>>>>> requires running docker-in-docker as the taskworker itself is a docker
>>>>>>>> container in k8s.
>>>>>>>>
>>>>>>>> First create a custom flink container with docker:
>>>>>>>>
>>>>>>>> # docker-flink Dockerfile
>>>>>>>>
>>>>>>>> FROM flink:1.10
>>>>>>>> # install docker
>>>>>>>> RUN apt-get ...
>>>>>>>>
>>>>>>>> Then setup the taskmanager deployment to use a sidecar
>>>>>>>> docker-in-docker service. This dind service is where the python sdk harness
>>>>>>>> container actually runs.
>>>>>>>>
>>>>>>>> kind: Deployment
>>>>>>>> ...
>>>>>>>>   containers:
>>>>>>>>   - name: docker
>>>>>>>>     image: docker:19.03.5-dind
>>>>>>>>     ...
>>>>>>>>   - name: taskmanger
>>>>>>>>     image: myregistry:5000/docker-flink:1.10
>>>>>>>>     env:
>>>>>>>>     - name: DOCKER_HOST
>>>>>>>>       value: tcp://localhost:2375
>>>>>>>> ...
>>>>>>>>
>>>>>>>> I quickly threw all these pieces together in a repo here:
>>>>>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>>>>>
>>>>>>>> I added a working (via minikube) step-by-step in the README to
>>>>>>>> prove to myself that I didn’t miss anything, but feel free to submit any
>>>>>>>> PRs if you want to add anything useful.
>>>>>>>>
>>>>>>>> The documents you linked are very informative. It would be great to
>>>>>>>> aggregate all this into digestible documentation. Let me know if you have
>>>>>>>> any further questions!
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Sam
>>>>>>>>
>>>>>>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <
>>>>>>>> ekirpichov@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Kyle,
>>>>>>>>>
>>>>>>>>> Thanks for the response!
>>>>>>>>>
>>>>>>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> > - With the Flink operator, I was able to submit a Beam job, but
>>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>>>>>> tried changing the operator's yaml files to add Docker inside them.
>>>>>>>>>>
>>>>>>>>>> Running Beam workers via Docker on the Flink nodes is not
>>>>>>>>>> recommended (and probably not even possible), since the Flink nodes are
>>>>>>>>>> themselves already running inside Docker containers. Running workers as
>>>>>>>>>> sidecars avoids that problem. For example:
>>>>>>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>>>>>>>>>
>>>>>>>>>> The main problem with the sidecar approach is that I can't use
>>>>>>>>> the Flink cluster as a "service" for anybody to submit their jobs with
>>>>>>>>> custom containers - the container version is fixed.
>>>>>>>>> Do I understand it correctly?
>>>>>>>>> Seems like the Docker-in-Docker approach is viable, and is
>>>>>>>>> mentioned in the Beam Flink K8s design doc
>>>>>>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
>>>>>>>>> .
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> > I also haven't tried this
>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> yet
>>>>>>>>>> because it implies submitting jobs using "kubectl apply"  which is weird -
>>>>>>>>>> why not just submit it through the Flink job server?
>>>>>>>>>>
>>>>>>>>>> I'm guessing it goes through k8s for monitoring purposes. I see
>>>>>>>>>> no reason it shouldn't be possible to submit to the job server directly
>>>>>>>>>> through Python, network permitting, though I haven't tried this.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <
>>>>>>>>>> ekirpichov@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi folks,
>>>>>>>>>>>
>>>>>>>>>>> I'm still working with Pachama <https://pachama.com/> right
>>>>>>>>>>> now; we have a Kubernetes Engine cluster on GCP and want to run Beam Python
>>>>>>>>>>> batch pipelines with custom containers against it.
>>>>>>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow
>>>>>>>>>>> doesn't support custom containers for batch pipelines yes so we're going
>>>>>>>>>>> with Flink.
>>>>>>>>>>>
>>>>>>>>>>> I'm struggling to find complete documentation on how to do this.
>>>>>>>>>>> There seems to be lots of conflicting or incomplete information: several
>>>>>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>>>>>>> StackOverflow questions, and no documentation explaining a complete working
>>>>>>>>>>> example.
>>>>>>>>>>>
>>>>>>>>>>> == My requests ==
>>>>>>>>>>> * Could people briefly share their working setup? Would be good
>>>>>>>>>>> to know which directions are promising.
>>>>>>>>>>> * It would be particularly helpful if someone could volunteer an
>>>>>>>>>>> hour of their time to talk to me about their working Beam/Flink/k8s setup.
>>>>>>>>>>> It's for a good cause (fixing the planet :) ) and on my side I volunteer to
>>>>>>>>>>> write up the findings to share with the community so others suffer less.
>>>>>>>>>>>
>>>>>>>>>>> == Appendix: My findings so far ==
>>>>>>>>>>> There are multiple ways to deploy Flink on k8s:
>>>>>>>>>>> - The GCP marketplace Flink operator
>>>>>>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> (couldn't
>>>>>>>>>>> get it to work) and the respective CLI version
>>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
>>>>>>>>>>> but I got it working)
>>>>>>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>>>>>>>>>> - Flink's native k8s support
>>>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> (super
>>>>>>>>>>> easy to get working)
>>>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>>>>>>
>>>>>>>>>>> I confirmed that my Flink cluster was operational by running a
>>>>>>>>>>> simple Wordcount job, initiated from my machine. However I wasn't yet able
>>>>>>>>>>> to get Beam working:
>>>>>>>>>>>
>>>>>>>>>>> - With the Flink operator, I was able to submit a Beam job, but
>>>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>>>>>>> tried changing the operator's yaml files to add Docker inside them. I also
>>>>>>>>>>> haven't tried this
>>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>>>>>>> yet because it implies submitting jobs using "kubectl apply"  which is
>>>>>>>>>>> weird - why not just submit it through the Flink job server?
>>>>>>>>>>>
>>>>>>>>>>> - With Flink's native k8s support, I tried two things:
>>>>>>>>>>>   - Creating a fat portable jar using  --output_executable_path.
>>>>>>>>>>> The jar is huge (200+MB) and takes forever to upload to my Flink cluster -
>>>>>>>>>>> this is a non-starter. But if I actually upload it, then I hit the same
>>>>>>>>>>> issue with lacking Docker. Haven't tried fixing it yet.
>>>>>>>>>>>   - Simply running my pipeline --runner=FlinkRunner
>>>>>>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
>>>>>>>>>>> appears to send 1+GB of data to somewhere, but the job never even starts.
>>>>>>>>>>>
>>>>>>>>>>> I looked at a few conference talks:
>>>>>>>>>>> -
>>>>>>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>>>>>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
>>>>>>>>>>> workers; and that I need to submit my job using "kubectl apply".
>>>>>>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also
>>>>>>>>>>> mentions the sidecar, but also mentions the fat jar option
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Eugene Kirpichov
>>>>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Eugene Kirpichov
>>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Eugene Kirpichov
>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eugene Kirpichov
>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>

Re: Getting Beam(Python)-on-Flink-on-k8s to work

Posted by Eugene Kirpichov <ek...@gmail.com>.
On Fri, Aug 28, 2020 at 6:52 PM Sam Bourne <sa...@gmail.com> wrote:

> Hi Eugene,
>
> Glad that helped you out and thanks for the PR tweaking it for GCP.
>
> To fetch the containers from GCR, I had to log into Docker inside the
> Flink nodes, specifically inside the taskmanager container, using something
> like “kubectl exec pod/flink-taskmanager-blahblah -c taskmanager — docker
> login -u oauth2accesstoken —password $(gcloud auth print-access-token)”
>
> Ouch that seems painful. I find this “precaching” step pretty silly and
> have considered making the DockerEnvironmentFactory a little more
> intelligent about how it deals with timeouts (e.g. no activity). It doesn’t
> seem like it would be too difficult to also add first-class support for
> pulling images from protected repositories. Extending the DockerPayload
> protobuf to pass along the additional information and tweaking the
> DockerEnvironmentFactory? I’m not a java expert but that might be worth
> exploring if this continues to be problematic.
>
Yeah it makes sense to have some first-class support for Docker
credentials. It's kind of a no-brainer that it's necessary with custom
containers, many companies probably wouldn't want to push their custom
containers to a public repo.
I was thinking of embedding the credentials JSON file into the
taskmanager container through its Dockerfile, that's workable but also
pretty silly - having to rebuild this container just for the sake of
putting in the credentials.
DockerPayload might be the right place to put credentials, but I wonder if
there's a way to do something more secure, with k8s secrets. I'm not too
well-versed in credential management.


> I found the time it takes to pull can be dramatically improved if you store
> everything in memory
> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/flink.yaml#L186>
> .
>
> In the end, I got my pipeline to start, create the uber jar (about 240MB
> in size), take a few minutes to transmit it to Flink
>
> You could explore spinning up the beam-flink-job-server
> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/beam-flink-jobserver.yaml>
> and using the PortableRunner. In theory that should reduce the amount of
> data you’re syncing to the cluster. It does require exposing at least two
> ingress points (8099 and 8098) so you can hit the job and artifact services
> respectively.
>
Right, good idea! Haven't tried spinning up the job server. Exposing the
job and artifact services seems pretty easy; but would also need to replace
the jobserver image "apache/beam_flink1.10_job_server:2.23.0" with a
custom-built one with the Beam 2.24 snapshot we're using.


> Cheers,
> Sam
>
> On Fri, Aug 28, 2020 at 5:50 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> Woohoo thanks Kyle, adding --save_main_session made it work!!!
>>
>> On Fri, Aug 28, 2020 at 5:02 PM Kyle Weaver <kc...@google.com> wrote:
>>
>>> > rpc error: code = Unimplemented desc = Method not found:
>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>>
>>> This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762
>>>
>>> On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov <ek...@gmail.com>
>>> wrote:
>>>
>>>> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort
>>>> for a few months on Google side, and now I need community help to get it to
>>>> work at all.
>>>> Still pretty miraculous how far Beam's portability has come since
>>>> then, even if it has a steep learning curve.
>>>>
>>>> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <ek...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Sam,
>>>>>
>>>>> You're a wizard - this got me *way* farther than my previous
>>>>> attempts. Here's a PR https://github.com/sambvfx/beam-flink-k8s/pull/1 with
>>>>> a couple of changes I had to make.
>>>>>
>>>>> I had to make some additional changes that do not make sense to share,
>>>>> but here they are for the record:
>>>>> - Because I'm running on k8s engine and not minikube, I had to put the
>>>>> docker-flink image on GCR, changing flink.yaml "image: docker-flink:1.10"
>>>>> -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10". I of course also
>>>>> had to build and push the container.
>>>>> - Because I'm running with a custom container based on an unreleased
>>>>> version of Beam, I had to push my custom container to GCR too, and change
>>>>> your instructions to use that image name instead of the default one
>>>>> - To fetch the containers from GCR, I had to log into Docker inside
>>>>> the Flink nodes, specifically inside the taskmanager container, using
>>>>> something like "kubectl exec pod/flink-taskmanager-blahblah -c taskmanager
>>>>> -- docker login -u oauth2accesstoken --password $(gcloud auth
>>>>> print-access-token)"
>>>>> - Again because I'm using an unreleased Beam SDK (due to a bug whose
>>>>> fix will be released in 2.24), I had to also build a custom Flink job
>>>>> server jar and point to it via --flink_job_server_jar.
>>>>>
>>>>> In the end, I got my pipeline to start, create the uber jar (about
>>>>> 240MB in size), take a few minutes to transmit it to Flink (which is a long
>>>>> time, but it'll do for a prototype); the Flink UI was displaying the
>>>>> pipeline, and was able to *start* the worker container - however it
>>>>> quickly failed with the following error:
>>>>>
>>>>> 2020/08/28 15:49:09 Initializing python harness: /opt/apache/beam/boot
>>>>> --id=1-1 --provision_endpoint=localhost:45111
>>>>> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get
>>>>> manifest
>>>>> caused by:
>>>>> rpc error: code = Unimplemented desc = Method not found:
>>>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>>>> (followed by a bunch of other garbage)
>>>>>
>>>>> I'm assuming this might be because I got tangled in my custom images
>>>>> related to the unreleased Beam SDK, and should be fixed if running on clean
>>>>> Beam 2.24.
>>>>>
>>>>> Thank you again!
>>>>>
>>>>> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov <
>>>>> ekirpichov@gmail.com> wrote:
>>>>>
>>>>>> Holy shit, thanks Sam, this is more help than I could have asked for!!
>>>>>> I'll give this a shot later today and report back.
>>>>>>
>>>>>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <sa...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Eugene!
>>>>>>>
>>>>>>> I’m struggling to find complete documentation on how to do this.
>>>>>>> There seems to be lots of conflicting or incomplete information: several
>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>>> StackOverflow questions, and no documentation explaining a complete working
>>>>>>> example.
>>>>>>>
>>>>>>> This *is* possible and I went through all the same frustrations of
>>>>>>> sparse and confusing documentation. I’m glossing over a lot of details, but
>>>>>>> the key thing was setting up the flink taskworker(s) to run docker. This
>>>>>>> requires running docker-in-docker as the taskworker itself is a docker
>>>>>>> container in k8s.
>>>>>>>
>>>>>>> First create a custom flink container with docker:
>>>>>>>
>>>>>>> # docker-flink Dockerfile
>>>>>>>
>>>>>>> FROM flink:1.10
>>>>>>> # install docker
>>>>>>> RUN apt-get ...
>>>>>>>
>>>>>>> Then setup the taskmanager deployment to use a sidecar
>>>>>>> docker-in-docker service. This dind service is where the python sdk harness
>>>>>>> container actually runs.
>>>>>>>
>>>>>>> kind: Deployment
>>>>>>> ...
>>>>>>>   containers:
>>>>>>>   - name: docker
>>>>>>>     image: docker:19.03.5-dind
>>>>>>>     ...
>>>>>>>   - name: taskmanger
>>>>>>>     image: myregistry:5000/docker-flink:1.10
>>>>>>>     env:
>>>>>>>     - name: DOCKER_HOST
>>>>>>>       value: tcp://localhost:2375
>>>>>>> ...
>>>>>>>
>>>>>>> I quickly threw all these pieces together in a repo here:
>>>>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>>>>
>>>>>>> I added a working (via minikube) step-by-step in the README to prove
>>>>>>> to myself that I didn’t miss anything, but feel free to submit any PRs if
>>>>>>> you want to add anything useful.
>>>>>>>
>>>>>>> The documents you linked are very informative. It would be great to
>>>>>>> aggregate all this into digestible documentation. Let me know if you have
>>>>>>> any further questions!
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Sam
>>>>>>>
>>>>>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <
>>>>>>> ekirpichov@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Kyle,
>>>>>>>>
>>>>>>>> Thanks for the response!
>>>>>>>>
>>>>>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kc...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> > - With the Flink operator, I was able to submit a Beam job, but
>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>>>>> tried changing the operator's yaml files to add Docker inside them.
>>>>>>>>>
>>>>>>>>> Running Beam workers via Docker on the Flink nodes is not
>>>>>>>>> recommended (and probably not even possible), since the Flink nodes are
>>>>>>>>> themselves already running inside Docker containers. Running workers as
>>>>>>>>> sidecars avoids that problem. For example:
>>>>>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>>>>>>>>
>>>>>>>>> The main problem with the sidecar approach is that I can't use the
>>>>>>>> Flink cluster as a "service" for anybody to submit their jobs with custom
>>>>>>>> containers - the container version is fixed.
>>>>>>>> Do I understand it correctly?
>>>>>>>> Seems like the Docker-in-Docker approach is viable, and is
>>>>>>>> mentioned in the Beam Flink K8s design doc
>>>>>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
>>>>>>>> .
>>>>>>>>
>>>>>>>>
>>>>>>>>> > I also haven't tried this
>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> yet
>>>>>>>>> because it implies submitting jobs using "kubectl apply"  which is weird -
>>>>>>>>> why not just submit it through the Flink job server?
>>>>>>>>>
>>>>>>>>> I'm guessing it goes through k8s for monitoring purposes. I see no
>>>>>>>>> reason it shouldn't be possible to submit to the job server directly
>>>>>>>>> through Python, network permitting, though I haven't tried this.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <
>>>>>>>>> ekirpichov@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi folks,
>>>>>>>>>>
>>>>>>>>>> I'm still working with Pachama <https://pachama.com/> right now;
>>>>>>>>>> we have a Kubernetes Engine cluster on GCP and want to run Beam Python
>>>>>>>>>> batch pipelines with custom containers against it.
>>>>>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow
>>>>>>>>>> doesn't support custom containers for batch pipelines yes so we're going
>>>>>>>>>> with Flink.
>>>>>>>>>>
>>>>>>>>>> I'm struggling to find complete documentation on how to do this.
>>>>>>>>>> There seems to be lots of conflicting or incomplete information: several
>>>>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>>>>>> StackOverflow questions, and no documentation explaining a complete working
>>>>>>>>>> example.
>>>>>>>>>>
>>>>>>>>>> == My requests ==
>>>>>>>>>> * Could people briefly share their working setup? Would be good
>>>>>>>>>> to know which directions are promising.
>>>>>>>>>> * It would be particularly helpful if someone could volunteer an
>>>>>>>>>> hour of their time to talk to me about their working Beam/Flink/k8s setup.
>>>>>>>>>> It's for a good cause (fixing the planet :) ) and on my side I volunteer to
>>>>>>>>>> write up the findings to share with the community so others suffer less.
>>>>>>>>>>
>>>>>>>>>> == Appendix: My findings so far ==
>>>>>>>>>> There are multiple ways to deploy Flink on k8s:
>>>>>>>>>> - The GCP marketplace Flink operator
>>>>>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> (couldn't
>>>>>>>>>> get it to work) and the respective CLI version
>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
>>>>>>>>>> but I got it working)
>>>>>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>>>>>>>>> - Flink's native k8s support
>>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> (super
>>>>>>>>>> easy to get working)
>>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>>>>>
>>>>>>>>>> I confirmed that my Flink cluster was operational by running a
>>>>>>>>>> simple Wordcount job, initiated from my machine. However I wasn't yet able
>>>>>>>>>> to get Beam working:
>>>>>>>>>>
>>>>>>>>>> - With the Flink operator, I was able to submit a Beam job, but
>>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>>>>>> tried changing the operator's yaml files to add Docker inside them. I also
>>>>>>>>>> haven't tried this
>>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>>>>>> yet because it implies submitting jobs using "kubectl apply"  which is
>>>>>>>>>> weird - why not just submit it through the Flink job server?
>>>>>>>>>>
>>>>>>>>>> - With Flink's native k8s support, I tried two things:
>>>>>>>>>>   - Creating a fat portable jar using  --output_executable_path.
>>>>>>>>>> The jar is huge (200+MB) and takes forever to upload to my Flink cluster -
>>>>>>>>>> this is a non-starter. But if I actually upload it, then I hit the same
>>>>>>>>>> issue with lacking Docker. Haven't tried fixing it yet.
>>>>>>>>>>   - Simply running my pipeline --runner=FlinkRunner
>>>>>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
>>>>>>>>>> appears to send 1+GB of data to somewhere, but the job never even starts.
>>>>>>>>>>
>>>>>>>>>> I looked at a few conference talks:
>>>>>>>>>> -
>>>>>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>>>>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
>>>>>>>>>> workers; and that I need to submit my job using "kubectl apply".
>>>>>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also
>>>>>>>>>> mentions the sidecar, but also mentions the fat jar option
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Eugene Kirpichov
>>>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Eugene Kirpichov
>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eugene Kirpichov
>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>

-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Getting Beam(Python)-on-Flink-on-k8s to work

Posted by Sam Bourne <sa...@gmail.com>.
Hi Eugene,

Glad that helped you out and thanks for the PR tweaking it for GCP.

To fetch the containers from GCR, I had to log into Docker inside the Flink
nodes, specifically inside the taskmanager container, using something like
“kubectl exec pod/flink-taskmanager-blahblah -c taskmanager — docker login
-u oauth2accesstoken —password $(gcloud auth print-access-token)”

Ouch that seems painful. I find this “precaching” step pretty silly and
have considered making the DockerEnvironmentFactory a little more
intelligent about how it deals with timeouts (e.g. no activity). It doesn’t
seem like it would be too difficult to also add first-class support for
pulling images from protected repositories. Extending the DockerPayload
protobuf to pass along the additional information and tweaking the
DockerEnvironmentFactory? I’m not a java expert but that might be worth
exploring if this continues to be problematic.

I found the time it takes to pull can be dramatically improved if you store
everything in memory
<https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/flink.yaml#L186>.

In the end, I got my pipeline to start, create the uber jar (about 240MB in
size), take a few minutes to transmit it to Flink

You could explore spinning up the beam-flink-job-server
<https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/beam-flink-jobserver.yaml>
and using the PortableRunner. In theory that should reduce the amount of
data you’re syncing to the cluster. It does require exposing at least two
ingress points (8099 and 8098) so you can hit the job and artifact services
respectively.

Cheers,
Sam

On Fri, Aug 28, 2020 at 5:50 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> Woohoo thanks Kyle, adding --save_main_session made it work!!!
>
> On Fri, Aug 28, 2020 at 5:02 PM Kyle Weaver <kc...@google.com> wrote:
>
>> > rpc error: code = Unimplemented desc = Method not found:
>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>
>> This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762
>>
>> On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort
>>> for a few months on Google side, and now I need community help to get it to
>>> work at all.
>>> Still pretty miraculous how far Beam's portability has come since
>>> then, even if it has a steep learning curve.
>>>
>>> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <ek...@gmail.com>
>>> wrote:
>>>
>>>> Hi Sam,
>>>>
>>>> You're a wizard - this got me *way* farther than my previous attempts.
>>>> Here's a PR https://github.com/sambvfx/beam-flink-k8s/pull/1 with a
>>>> couple of changes I had to make.
>>>>
>>>> I had to make some additional changes that do not make sense to share,
>>>> but here they are for the record:
>>>> - Because I'm running on k8s engine and not minikube, I had to put the
>>>> docker-flink image on GCR, changing flink.yaml "image: docker-flink:1.10"
>>>> -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10". I of course also
>>>> had to build and push the container.
>>>> - Because I'm running with a custom container based on an unreleased
>>>> version of Beam, I had to push my custom container to GCR too, and change
>>>> your instructions to use that image name instead of the default one
>>>> - To fetch the containers from GCR, I had to log into Docker inside the
>>>> Flink nodes, specifically inside the taskmanager container, using something
>>>> like "kubectl exec pod/flink-taskmanager-blahblah -c taskmanager -- docker
>>>> login -u oauth2accesstoken --password $(gcloud auth print-access-token)"
>>>> - Again because I'm using an unreleased Beam SDK (due to a bug whose
>>>> fix will be released in 2.24), I had to also build a custom Flink job
>>>> server jar and point to it via --flink_job_server_jar.
>>>>
>>>> In the end, I got my pipeline to start, create the uber jar (about
>>>> 240MB in size), take a few minutes to transmit it to Flink (which is a long
>>>> time, but it'll do for a prototype); the Flink UI was displaying the
>>>> pipeline, and was able to *start* the worker container - however it
>>>> quickly failed with the following error:
>>>>
>>>> 2020/08/28 15:49:09 Initializing python harness: /opt/apache/beam/boot
>>>> --id=1-1 --provision_endpoint=localhost:45111
>>>> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get
>>>> manifest
>>>> caused by:
>>>> rpc error: code = Unimplemented desc = Method not found:
>>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>>> (followed by a bunch of other garbage)
>>>>
>>>> I'm assuming this might be because I got tangled in my custom images
>>>> related to the unreleased Beam SDK, and should be fixed if running on clean
>>>> Beam 2.24.
>>>>
>>>> Thank you again!
>>>>
>>>> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov <ek...@gmail.com>
>>>> wrote:
>>>>
>>>>> Holy shit, thanks Sam, this is more help than I could have asked for!!
>>>>> I'll give this a shot later today and report back.
>>>>>
>>>>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <sa...@gmail.com> wrote:
>>>>>
>>>>>> Hi Eugene!
>>>>>>
>>>>>> I’m struggling to find complete documentation on how to do this.
>>>>>> There seems to be lots of conflicting or incomplete information: several
>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>> StackOverflow questions, and no documentation explaining a complete working
>>>>>> example.
>>>>>>
>>>>>> This *is* possible and I went through all the same frustrations of
>>>>>> sparse and confusing documentation. I’m glossing over a lot of details, but
>>>>>> the key thing was setting up the flink taskworker(s) to run docker. This
>>>>>> requires running docker-in-docker as the taskworker itself is a docker
>>>>>> container in k8s.
>>>>>>
>>>>>> First create a custom flink container with docker:
>>>>>>
>>>>>> # docker-flink Dockerfile
>>>>>>
>>>>>> FROM flink:1.10
>>>>>> # install docker
>>>>>> RUN apt-get ...
>>>>>>
>>>>>> Then setup the taskmanager deployment to use a sidecar
>>>>>> docker-in-docker service. This dind service is where the python sdk harness
>>>>>> container actually runs.
>>>>>>
>>>>>> kind: Deployment
>>>>>> ...
>>>>>>   containers:
>>>>>>   - name: docker
>>>>>>     image: docker:19.03.5-dind
>>>>>>     ...
>>>>>>   - name: taskmanger
>>>>>>     image: myregistry:5000/docker-flink:1.10
>>>>>>     env:
>>>>>>     - name: DOCKER_HOST
>>>>>>       value: tcp://localhost:2375
>>>>>> ...
>>>>>>
>>>>>> I quickly threw all these pieces together in a repo here:
>>>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>>>
>>>>>> I added a working (via minikube) step-by-step in the README to prove
>>>>>> to myself that I didn’t miss anything, but feel free to submit any PRs if
>>>>>> you want to add anything useful.
>>>>>>
>>>>>> The documents you linked are very informative. It would be great to
>>>>>> aggregate all this into digestible documentation. Let me know if you have
>>>>>> any further questions!
>>>>>>
>>>>>> Cheers,
>>>>>> Sam
>>>>>>
>>>>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <
>>>>>> ekirpichov@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Kyle,
>>>>>>>
>>>>>>> Thanks for the response!
>>>>>>>
>>>>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> > - With the Flink operator, I was able to submit a Beam job, but
>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>>>> tried changing the operator's yaml files to add Docker inside them.
>>>>>>>>
>>>>>>>> Running Beam workers via Docker on the Flink nodes is not
>>>>>>>> recommended (and probably not even possible), since the Flink nodes are
>>>>>>>> themselves already running inside Docker containers. Running workers as
>>>>>>>> sidecars avoids that problem. For example:
>>>>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>>>>>>>
>>>>>>>> The main problem with the sidecar approach is that I can't use the
>>>>>>> Flink cluster as a "service" for anybody to submit their jobs with custom
>>>>>>> containers - the container version is fixed.
>>>>>>> Do I understand it correctly?
>>>>>>> Seems like the Docker-in-Docker approach is viable, and is mentioned
>>>>>>> in the Beam Flink K8s design doc
>>>>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
>>>>>>> .
>>>>>>>
>>>>>>>
>>>>>>>> > I also haven't tried this
>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> yet
>>>>>>>> because it implies submitting jobs using "kubectl apply"  which is weird -
>>>>>>>> why not just submit it through the Flink job server?
>>>>>>>>
>>>>>>>> I'm guessing it goes through k8s for monitoring purposes. I see no
>>>>>>>> reason it shouldn't be possible to submit to the job server directly
>>>>>>>> through Python, network permitting, though I haven't tried this.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <
>>>>>>>> ekirpichov@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi folks,
>>>>>>>>>
>>>>>>>>> I'm still working with Pachama <https://pachama.com/> right now;
>>>>>>>>> we have a Kubernetes Engine cluster on GCP and want to run Beam Python
>>>>>>>>> batch pipelines with custom containers against it.
>>>>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow
>>>>>>>>> doesn't support custom containers for batch pipelines yes so we're going
>>>>>>>>> with Flink.
>>>>>>>>>
>>>>>>>>> I'm struggling to find complete documentation on how to do this.
>>>>>>>>> There seems to be lots of conflicting or incomplete information: several
>>>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>>>>> StackOverflow questions, and no documentation explaining a complete working
>>>>>>>>> example.
>>>>>>>>>
>>>>>>>>> == My requests ==
>>>>>>>>> * Could people briefly share their working setup? Would be good to
>>>>>>>>> know which directions are promising.
>>>>>>>>> * It would be particularly helpful if someone could volunteer an
>>>>>>>>> hour of their time to talk to me about their working Beam/Flink/k8s setup.
>>>>>>>>> It's for a good cause (fixing the planet :) ) and on my side I volunteer to
>>>>>>>>> write up the findings to share with the community so others suffer less.
>>>>>>>>>
>>>>>>>>> == Appendix: My findings so far ==
>>>>>>>>> There are multiple ways to deploy Flink on k8s:
>>>>>>>>> - The GCP marketplace Flink operator
>>>>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> (couldn't
>>>>>>>>> get it to work) and the respective CLI version
>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
>>>>>>>>> but I got it working)
>>>>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>>>>>>>> - Flink's native k8s support
>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> (super
>>>>>>>>> easy to get working)
>>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>>>>
>>>>>>>>> I confirmed that my Flink cluster was operational by running a
>>>>>>>>> simple Wordcount job, initiated from my machine. However I wasn't yet able
>>>>>>>>> to get Beam working:
>>>>>>>>>
>>>>>>>>> - With the Flink operator, I was able to submit a Beam job, but
>>>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>>>>> tried changing the operator's yaml files to add Docker inside them. I also
>>>>>>>>> haven't tried this
>>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>>>>> yet because it implies submitting jobs using "kubectl apply"  which is
>>>>>>>>> weird - why not just submit it through the Flink job server?
>>>>>>>>>
>>>>>>>>> - With Flink's native k8s support, I tried two things:
>>>>>>>>>   - Creating a fat portable jar using  --output_executable_path.
>>>>>>>>> The jar is huge (200+MB) and takes forever to upload to my Flink cluster -
>>>>>>>>> this is a non-starter. But if I actually upload it, then I hit the same
>>>>>>>>> issue with lacking Docker. Haven't tried fixing it yet.
>>>>>>>>>   - Simply running my pipeline --runner=FlinkRunner
>>>>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
>>>>>>>>> appears to send 1+GB of data to somewhere, but the job never even starts.
>>>>>>>>>
>>>>>>>>> I looked at a few conference talks:
>>>>>>>>> -
>>>>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>>>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
>>>>>>>>> workers; and that I need to submit my job using "kubectl apply".
>>>>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions
>>>>>>>>> the sidecar, but also mentions the fat jar option
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Eugene Kirpichov
>>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Eugene Kirpichov
>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>

Re: Getting Beam(Python)-on-Flink-on-k8s to work

Posted by Eugene Kirpichov <ek...@gmail.com>.
Woohoo thanks Kyle, adding --save_main_session made it work!!!

On Fri, Aug 28, 2020 at 5:02 PM Kyle Weaver <kc...@google.com> wrote:

> > rpc error: code = Unimplemented desc = Method not found:
> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>
> This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762
>
> On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort for
>> a few months on Google side, and now I need community help to get it to
>> work at all.
>> Still pretty miraculous how far Beam's portability has come since
>> then, even if it has a steep learning curve.
>>
>> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> Hi Sam,
>>>
>>> You're a wizard - this got me *way* farther than my previous attempts.
>>> Here's a PR https://github.com/sambvfx/beam-flink-k8s/pull/1 with a
>>> couple of changes I had to make.
>>>
>>> I had to make some additional changes that do not make sense to share,
>>> but here they are for the record:
>>> - Because I'm running on k8s engine and not minikube, I had to put the
>>> docker-flink image on GCR, changing flink.yaml "image: docker-flink:1.10"
>>> -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10". I of course also
>>> had to build and push the container.
>>> - Because I'm running with a custom container based on an unreleased
>>> version of Beam, I had to push my custom container to GCR too, and change
>>> your instructions to use that image name instead of the default one
>>> - To fetch the containers from GCR, I had to log into Docker inside the
>>> Flink nodes, specifically inside the taskmanager container, using something
>>> like "kubectl exec pod/flink-taskmanager-blahblah -c taskmanager -- docker
>>> login -u oauth2accesstoken --password $(gcloud auth print-access-token)"
>>> - Again because I'm using an unreleased Beam SDK (due to a bug whose fix
>>> will be released in 2.24), I had to also build a custom Flink job server
>>> jar and point to it via --flink_job_server_jar.
>>>
>>> In the end, I got my pipeline to start, create the uber jar (about 240MB
>>> in size), take a few minutes to transmit it to Flink (which is a long time,
>>> but it'll do for a prototype); the Flink UI was displaying the pipeline,
>>> and was able to *start* the worker container - however it quickly
>>> failed with the following error:
>>>
>>> 2020/08/28 15:49:09 Initializing python harness: /opt/apache/beam/boot
>>> --id=1-1 --provision_endpoint=localhost:45111
>>> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get
>>> manifest
>>> caused by:
>>> rpc error: code = Unimplemented desc = Method not found:
>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>> (followed by a bunch of other garbage)
>>>
>>> I'm assuming this might be because I got tangled in my custom images
>>> related to the unreleased Beam SDK, and should be fixed if running on clean
>>> Beam 2.24.
>>>
>>> Thank you again!
>>>
>>> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov <ek...@gmail.com>
>>> wrote:
>>>
>>>> Holy shit, thanks Sam, this is more help than I could have asked for!!
>>>> I'll give this a shot later today and report back.
>>>>
>>>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <sa...@gmail.com> wrote:
>>>>
>>>>> Hi Eugene!
>>>>>
>>>>> I’m struggling to find complete documentation on how to do this. There
>>>>> seems to be lots of conflicting or incomplete information: several ways to
>>>>> deploy Flink, several ways to get Beam working with it, bizarre
>>>>> StackOverflow questions, and no documentation explaining a complete working
>>>>> example.
>>>>>
>>>>> This *is* possible and I went through all the same frustrations of
>>>>> sparse and confusing documentation. I’m glossing over a lot of details, but
>>>>> the key thing was setting up the flink taskworker(s) to run docker. This
>>>>> requires running docker-in-docker as the taskworker itself is a docker
>>>>> container in k8s.
>>>>>
>>>>> First create a custom flink container with docker:
>>>>>
>>>>> # docker-flink Dockerfile
>>>>>
>>>>> FROM flink:1.10
>>>>> # install docker
>>>>> RUN apt-get ...
>>>>>
>>>>> Then setup the taskmanager deployment to use a sidecar
>>>>> docker-in-docker service. This dind service is where the python sdk harness
>>>>> container actually runs.
>>>>>
>>>>> kind: Deployment
>>>>> ...
>>>>>   containers:
>>>>>   - name: docker
>>>>>     image: docker:19.03.5-dind
>>>>>     ...
>>>>>   - name: taskmanger
>>>>>     image: myregistry:5000/docker-flink:1.10
>>>>>     env:
>>>>>     - name: DOCKER_HOST
>>>>>       value: tcp://localhost:2375
>>>>> ...
>>>>>
>>>>> I quickly threw all these pieces together in a repo here:
>>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>>
>>>>> I added a working (via minikube) step-by-step in the README to prove
>>>>> to myself that I didn’t miss anything, but feel free to submit any PRs if
>>>>> you want to add anything useful.
>>>>>
>>>>> The documents you linked are very informative. It would be great to
>>>>> aggregate all this into digestible documentation. Let me know if you have
>>>>> any further questions!
>>>>>
>>>>> Cheers,
>>>>> Sam
>>>>>
>>>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <
>>>>> ekirpichov@gmail.com> wrote:
>>>>>
>>>>>> Hi Kyle,
>>>>>>
>>>>>> Thanks for the response!
>>>>>>
>>>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kc...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> > - With the Flink operator, I was able to submit a Beam job, but
>>>>>>> hit the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>>> tried changing the operator's yaml files to add Docker inside them.
>>>>>>>
>>>>>>> Running Beam workers via Docker on the Flink nodes is not
>>>>>>> recommended (and probably not even possible), since the Flink nodes are
>>>>>>> themselves already running inside Docker containers. Running workers as
>>>>>>> sidecars avoids that problem. For example:
>>>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>>>>>>
>>>>>>> The main problem with the sidecar approach is that I can't use the
>>>>>> Flink cluster as a "service" for anybody to submit their jobs with custom
>>>>>> containers - the container version is fixed.
>>>>>> Do I understand it correctly?
>>>>>> Seems like the Docker-in-Docker approach is viable, and is mentioned
>>>>>> in the Beam Flink K8s design doc
>>>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
>>>>>> .
>>>>>>
>>>>>>
>>>>>>> > I also haven't tried this
>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> yet
>>>>>>> because it implies submitting jobs using "kubectl apply"  which is weird -
>>>>>>> why not just submit it through the Flink job server?
>>>>>>>
>>>>>>> I'm guessing it goes through k8s for monitoring purposes. I see no
>>>>>>> reason it shouldn't be possible to submit to the job server directly
>>>>>>> through Python, network permitting, though I haven't tried this.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <
>>>>>>> ekirpichov@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi folks,
>>>>>>>>
>>>>>>>> I'm still working with Pachama <https://pachama.com/> right now;
>>>>>>>> we have a Kubernetes Engine cluster on GCP and want to run Beam Python
>>>>>>>> batch pipelines with custom containers against it.
>>>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow
>>>>>>>> doesn't support custom containers for batch pipelines yes so we're going
>>>>>>>> with Flink.
>>>>>>>>
>>>>>>>> I'm struggling to find complete documentation on how to do this.
>>>>>>>> There seems to be lots of conflicting or incomplete information: several
>>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>>>> StackOverflow questions, and no documentation explaining a complete working
>>>>>>>> example.
>>>>>>>>
>>>>>>>> == My requests ==
>>>>>>>> * Could people briefly share their working setup? Would be good to
>>>>>>>> know which directions are promising.
>>>>>>>> * It would be particularly helpful if someone could volunteer an
>>>>>>>> hour of their time to talk to me about their working Beam/Flink/k8s setup.
>>>>>>>> It's for a good cause (fixing the planet :) ) and on my side I volunteer to
>>>>>>>> write up the findings to share with the community so others suffer less.
>>>>>>>>
>>>>>>>> == Appendix: My findings so far ==
>>>>>>>> There are multiple ways to deploy Flink on k8s:
>>>>>>>> - The GCP marketplace Flink operator
>>>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> (couldn't
>>>>>>>> get it to work) and the respective CLI version
>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
>>>>>>>> but I got it working)
>>>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>>>>>>> - Flink's native k8s support
>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> (super
>>>>>>>> easy to get working)
>>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>>>
>>>>>>>> I confirmed that my Flink cluster was operational by running a
>>>>>>>> simple Wordcount job, initiated from my machine. However I wasn't yet able
>>>>>>>> to get Beam working:
>>>>>>>>
>>>>>>>> - With the Flink operator, I was able to submit a Beam job, but hit
>>>>>>>> the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>>>> tried changing the operator's yaml files to add Docker inside them. I also
>>>>>>>> haven't tried this
>>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>>>> yet because it implies submitting jobs using "kubectl apply"  which is
>>>>>>>> weird - why not just submit it through the Flink job server?
>>>>>>>>
>>>>>>>> - With Flink's native k8s support, I tried two things:
>>>>>>>>   - Creating a fat portable jar using  --output_executable_path.
>>>>>>>> The jar is huge (200+MB) and takes forever to upload to my Flink cluster -
>>>>>>>> this is a non-starter. But if I actually upload it, then I hit the same
>>>>>>>> issue with lacking Docker. Haven't tried fixing it yet.
>>>>>>>>   - Simply running my pipeline --runner=FlinkRunner
>>>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
>>>>>>>> appears to send 1+GB of data to somewhere, but the job never even starts.
>>>>>>>>
>>>>>>>> I looked at a few conference talks:
>>>>>>>> -
>>>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
>>>>>>>> workers; and that I need to submit my job using "kubectl apply".
>>>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions
>>>>>>>> the sidecar, but also mentions the fat jar option
>>>>>>>>
>>>>>>>> --
>>>>>>>> Eugene Kirpichov
>>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Eugene Kirpichov
>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>

-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Getting Beam(Python)-on-Flink-on-k8s to work

Posted by Kyle Weaver <kc...@google.com>.
> rpc error: code = Unimplemented desc = Method not found:
org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest

This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762

On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort for
> a few months on Google side, and now I need community help to get it to
> work at all.
> Still pretty miraculous how far Beam's portability has come since
> then, even if it has a steep learning curve.
>
> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> Hi Sam,
>>
>> You're a wizard - this got me *way* farther than my previous attempts.
>> Here's a PR https://github.com/sambvfx/beam-flink-k8s/pull/1 with a
>> couple of changes I had to make.
>>
>> I had to make some additional changes that do not make sense to share,
>> but here they are for the record:
>> - Because I'm running on k8s engine and not minikube, I had to put the
>> docker-flink image on GCR, changing flink.yaml "image: docker-flink:1.10"
>> -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10". I of course also
>> had to build and push the container.
>> - Because I'm running with a custom container based on an unreleased
>> version of Beam, I had to push my custom container to GCR too, and change
>> your instructions to use that image name instead of the default one
>> - To fetch the containers from GCR, I had to log into Docker inside the
>> Flink nodes, specifically inside the taskmanager container, using something
>> like "kubectl exec pod/flink-taskmanager-blahblah -c taskmanager -- docker
>> login -u oauth2accesstoken --password $(gcloud auth print-access-token)"
>> - Again because I'm using an unreleased Beam SDK (due to a bug whose fix
>> will be released in 2.24), I had to also build a custom Flink job server
>> jar and point to it via --flink_job_server_jar.
>>
>> In the end, I got my pipeline to start, create the uber jar (about 240MB
>> in size), take a few minutes to transmit it to Flink (which is a long time,
>> but it'll do for a prototype); the Flink UI was displaying the pipeline,
>> and was able to *start* the worker container - however it quickly failed
>> with the following error:
>>
>> 2020/08/28 15:49:09 Initializing python harness: /opt/apache/beam/boot
>> --id=1-1 --provision_endpoint=localhost:45111
>> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get
>> manifest
>> caused by:
>> rpc error: code = Unimplemented desc = Method not found:
>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>> (followed by a bunch of other garbage)
>>
>> I'm assuming this might be because I got tangled in my custom images
>> related to the unreleased Beam SDK, and should be fixed if running on clean
>> Beam 2.24.
>>
>> Thank you again!
>>
>> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> Holy shit, thanks Sam, this is more help than I could have asked for!!
>>> I'll give this a shot later today and report back.
>>>
>>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <sa...@gmail.com> wrote:
>>>
>>>> Hi Eugene!
>>>>
>>>> I’m struggling to find complete documentation on how to do this. There
>>>> seems to be lots of conflicting or incomplete information: several ways to
>>>> deploy Flink, several ways to get Beam working with it, bizarre
>>>> StackOverflow questions, and no documentation explaining a complete working
>>>> example.
>>>>
>>>> This *is* possible and I went through all the same frustrations of
>>>> sparse and confusing documentation. I’m glossing over a lot of details, but
>>>> the key thing was setting up the flink taskworker(s) to run docker. This
>>>> requires running docker-in-docker as the taskworker itself is a docker
>>>> container in k8s.
>>>>
>>>> First create a custom flink container with docker:
>>>>
>>>> # docker-flink Dockerfile
>>>>
>>>> FROM flink:1.10
>>>> # install docker
>>>> RUN apt-get ...
>>>>
>>>> Then setup the taskmanager deployment to use a sidecar docker-in-docker
>>>> service. This dind service is where the python sdk harness container
>>>> actually runs.
>>>>
>>>> kind: Deployment
>>>> ...
>>>>   containers:
>>>>   - name: docker
>>>>     image: docker:19.03.5-dind
>>>>     ...
>>>>   - name: taskmanger
>>>>     image: myregistry:5000/docker-flink:1.10
>>>>     env:
>>>>     - name: DOCKER_HOST
>>>>       value: tcp://localhost:2375
>>>> ...
>>>>
>>>> I quickly threw all these pieces together in a repo here:
>>>> https://github.com/sambvfx/beam-flink-k8s
>>>>
>>>> I added a working (via minikube) step-by-step in the README to prove to
>>>> myself that I didn’t miss anything, but feel free to submit any PRs if you
>>>> want to add anything useful.
>>>>
>>>> The documents you linked are very informative. It would be great to
>>>> aggregate all this into digestible documentation. Let me know if you have
>>>> any further questions!
>>>>
>>>> Cheers,
>>>> Sam
>>>>
>>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <ek...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Kyle,
>>>>>
>>>>> Thanks for the response!
>>>>>
>>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kc...@google.com>
>>>>> wrote:
>>>>>
>>>>>> > - With the Flink operator, I was able to submit a Beam job, but hit
>>>>>> the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>> tried changing the operator's yaml files to add Docker inside them.
>>>>>>
>>>>>> Running Beam workers via Docker on the Flink nodes is not recommended
>>>>>> (and probably not even possible), since the Flink nodes are themselves
>>>>>> already running inside Docker containers. Running workers as sidecars
>>>>>> avoids that problem. For example:
>>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>>>>>
>>>>>> The main problem with the sidecar approach is that I can't use the
>>>>> Flink cluster as a "service" for anybody to submit their jobs with custom
>>>>> containers - the container version is fixed.
>>>>> Do I understand it correctly?
>>>>> Seems like the Docker-in-Docker approach is viable, and is mentioned
>>>>> in the Beam Flink K8s design doc
>>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
>>>>> .
>>>>>
>>>>>
>>>>>> > I also haven't tried this
>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> yet
>>>>>> because it implies submitting jobs using "kubectl apply"  which is weird -
>>>>>> why not just submit it through the Flink job server?
>>>>>>
>>>>>> I'm guessing it goes through k8s for monitoring purposes. I see no
>>>>>> reason it shouldn't be possible to submit to the job server directly
>>>>>> through Python, network permitting, though I haven't tried this.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <
>>>>>> ekirpichov@gmail.com> wrote:
>>>>>>
>>>>>>> Hi folks,
>>>>>>>
>>>>>>> I'm still working with Pachama <https://pachama.com/> right now; we
>>>>>>> have a Kubernetes Engine cluster on GCP and want to run Beam Python batch
>>>>>>> pipelines with custom containers against it.
>>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow doesn't
>>>>>>> support custom containers for batch pipelines yes so we're going with Flink.
>>>>>>>
>>>>>>> I'm struggling to find complete documentation on how to do this.
>>>>>>> There seems to be lots of conflicting or incomplete information: several
>>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>>> StackOverflow questions, and no documentation explaining a complete working
>>>>>>> example.
>>>>>>>
>>>>>>> == My requests ==
>>>>>>> * Could people briefly share their working setup? Would be good to
>>>>>>> know which directions are promising.
>>>>>>> * It would be particularly helpful if someone could volunteer an
>>>>>>> hour of their time to talk to me about their working Beam/Flink/k8s setup.
>>>>>>> It's for a good cause (fixing the planet :) ) and on my side I volunteer to
>>>>>>> write up the findings to share with the community so others suffer less.
>>>>>>>
>>>>>>> == Appendix: My findings so far ==
>>>>>>> There are multiple ways to deploy Flink on k8s:
>>>>>>> - The GCP marketplace Flink operator
>>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> (couldn't
>>>>>>> get it to work) and the respective CLI version
>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
>>>>>>> but I got it working)
>>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>>>>>> - Flink's native k8s support
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> (super
>>>>>>> easy to get working)
>>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>>
>>>>>>> I confirmed that my Flink cluster was operational by running a
>>>>>>> simple Wordcount job, initiated from my machine. However I wasn't yet able
>>>>>>> to get Beam working:
>>>>>>>
>>>>>>> - With the Flink operator, I was able to submit a Beam job, but hit
>>>>>>> the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>>> tried changing the operator's yaml files to add Docker inside them. I also
>>>>>>> haven't tried this
>>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>>> yet because it implies submitting jobs using "kubectl apply"  which is
>>>>>>> weird - why not just submit it through the Flink job server?
>>>>>>>
>>>>>>> - With Flink's native k8s support, I tried two things:
>>>>>>>   - Creating a fat portable jar using  --output_executable_path. The
>>>>>>> jar is huge (200+MB) and takes forever to upload to my Flink cluster - this
>>>>>>> is a non-starter. But if I actually upload it, then I hit the same issue
>>>>>>> with lacking Docker. Haven't tried fixing it yet.
>>>>>>>   - Simply running my pipeline --runner=FlinkRunner
>>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
>>>>>>> appears to send 1+GB of data to somewhere, but the job never even starts.
>>>>>>>
>>>>>>> I looked at a few conference talks:
>>>>>>> -
>>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
>>>>>>> workers; and that I need to submit my job using "kubectl apply".
>>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions
>>>>>>> the sidecar, but also mentions the fat jar option
>>>>>>>
>>>>>>> --
>>>>>>> Eugene Kirpichov
>>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>

Re: Getting Beam(Python)-on-Flink-on-k8s to work

Posted by Eugene Kirpichov <ek...@gmail.com>.
P.S. Ironic how back in 2018 I was TL-ing the portable runners effort for a
few months on Google side, and now I need community help to get it to work
at all.
Still pretty miraculous how far Beam's portability has come since
then, even if it has a steep learning curve.

On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> Hi Sam,
>
> You're a wizard - this got me *way* farther than my previous attempts.
> Here's a PR https://github.com/sambvfx/beam-flink-k8s/pull/1 with a
> couple of changes I had to make.
>
> I had to make some additional changes that do not make sense to share, but
> here they are for the record:
> - Because I'm running on k8s engine and not minikube, I had to put the
> docker-flink image on GCR, changing flink.yaml "image: docker-flink:1.10"
> -> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10". I of course also had
> to build and push the container.
> - Because I'm running with a custom container based on an unreleased
> version of Beam, I had to push my custom container to GCR too, and change
> your instructions to use that image name instead of the default one
> - To fetch the containers from GCR, I had to log into Docker inside the
> Flink nodes, specifically inside the taskmanager container, using something
> like "kubectl exec pod/flink-taskmanager-blahblah -c taskmanager -- docker
> login -u oauth2accesstoken --password $(gcloud auth print-access-token)"
> - Again because I'm using an unreleased Beam SDK (due to a bug whose fix
> will be released in 2.24), I had to also build a custom Flink job server
> jar and point to it via --flink_job_server_jar.
>
> In the end, I got my pipeline to start, create the uber jar (about 240MB
> in size), take a few minutes to transmit it to Flink (which is a long time,
> but it'll do for a prototype); the Flink UI was displaying the pipeline,
> and was able to *start* the worker container - however it quickly failed
> with the following error:
>
> 2020/08/28 15:49:09 Initializing python harness: /opt/apache/beam/boot
> --id=1-1 --provision_endpoint=localhost:45111
> 2020/08/28 15:49:09 Failed to retrieve staged files: failed to get manifest
> caused by:
> rpc error: code = Unimplemented desc = Method not found:
> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
> (followed by a bunch of other garbage)
>
> I'm assuming this might be because I got tangled in my custom images
> related to the unreleased Beam SDK, and should be fixed if running on clean
> Beam 2.24.
>
> Thank you again!
>
> On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> Holy shit, thanks Sam, this is more help than I could have asked for!!
>> I'll give this a shot later today and report back.
>>
>> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <sa...@gmail.com> wrote:
>>
>>> Hi Eugene!
>>>
>>> I’m struggling to find complete documentation on how to do this. There
>>> seems to be lots of conflicting or incomplete information: several ways to
>>> deploy Flink, several ways to get Beam working with it, bizarre
>>> StackOverflow questions, and no documentation explaining a complete working
>>> example.
>>>
>>> This *is* possible and I went through all the same frustrations of
>>> sparse and confusing documentation. I’m glossing over a lot of details, but
>>> the key thing was setting up the flink taskworker(s) to run docker. This
>>> requires running docker-in-docker as the taskworker itself is a docker
>>> container in k8s.
>>>
>>> First create a custom flink container with docker:
>>>
>>> # docker-flink Dockerfile
>>>
>>> FROM flink:1.10
>>> # install docker
>>> RUN apt-get ...
>>>
>>> Then setup the taskmanager deployment to use a sidecar docker-in-docker
>>> service. This dind service is where the python sdk harness container
>>> actually runs.
>>>
>>> kind: Deployment
>>> ...
>>>   containers:
>>>   - name: docker
>>>     image: docker:19.03.5-dind
>>>     ...
>>>   - name: taskmanger
>>>     image: myregistry:5000/docker-flink:1.10
>>>     env:
>>>     - name: DOCKER_HOST
>>>       value: tcp://localhost:2375
>>> ...
>>>
>>> I quickly threw all these pieces together in a repo here:
>>> https://github.com/sambvfx/beam-flink-k8s
>>>
>>> I added a working (via minikube) step-by-step in the README to prove to
>>> myself that I didn’t miss anything, but feel free to submit any PRs if you
>>> want to add anything useful.
>>>
>>> The documents you linked are very informative. It would be great to
>>> aggregate all this into digestible documentation. Let me know if you have
>>> any further questions!
>>>
>>> Cheers,
>>> Sam
>>>
>>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <ek...@gmail.com>
>>> wrote:
>>>
>>>> Hi Kyle,
>>>>
>>>> Thanks for the response!
>>>>
>>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kc...@google.com>
>>>> wrote:
>>>>
>>>>> > - With the Flink operator, I was able to submit a Beam job, but hit
>>>>> the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>> tried changing the operator's yaml files to add Docker inside them.
>>>>>
>>>>> Running Beam workers via Docker on the Flink nodes is not recommended
>>>>> (and probably not even possible), since the Flink nodes are themselves
>>>>> already running inside Docker containers. Running workers as sidecars
>>>>> avoids that problem. For example:
>>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>>>>
>>>>> The main problem with the sidecar approach is that I can't use the
>>>> Flink cluster as a "service" for anybody to submit their jobs with custom
>>>> containers - the container version is fixed.
>>>> Do I understand it correctly?
>>>> Seems like the Docker-in-Docker approach is viable, and is mentioned in
>>>> the Beam Flink K8s design doc
>>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
>>>> .
>>>>
>>>>
>>>>> > I also haven't tried this
>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> yet
>>>>> because it implies submitting jobs using "kubectl apply"  which is weird -
>>>>> why not just submit it through the Flink job server?
>>>>>
>>>>> I'm guessing it goes through k8s for monitoring purposes. I see no
>>>>> reason it shouldn't be possible to submit to the job server directly
>>>>> through Python, network permitting, though I haven't tried this.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <ek...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi folks,
>>>>>>
>>>>>> I'm still working with Pachama <https://pachama.com/> right now; we
>>>>>> have a Kubernetes Engine cluster on GCP and want to run Beam Python batch
>>>>>> pipelines with custom containers against it.
>>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow doesn't
>>>>>> support custom containers for batch pipelines yes so we're going with Flink.
>>>>>>
>>>>>> I'm struggling to find complete documentation on how to do this.
>>>>>> There seems to be lots of conflicting or incomplete information: several
>>>>>> ways to deploy Flink, several ways to get Beam working with it, bizarre
>>>>>> StackOverflow questions, and no documentation explaining a complete working
>>>>>> example.
>>>>>>
>>>>>> == My requests ==
>>>>>> * Could people briefly share their working setup? Would be good to
>>>>>> know which directions are promising.
>>>>>> * It would be particularly helpful if someone could volunteer an hour
>>>>>> of their time to talk to me about their working Beam/Flink/k8s setup. It's
>>>>>> for a good cause (fixing the planet :) ) and on my side I volunteer to
>>>>>> write up the findings to share with the community so others suffer less.
>>>>>>
>>>>>> == Appendix: My findings so far ==
>>>>>> There are multiple ways to deploy Flink on k8s:
>>>>>> - The GCP marketplace Flink operator
>>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> (couldn't
>>>>>> get it to work) and the respective CLI version
>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
>>>>>> but I got it working)
>>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>>>>> - Flink's native k8s support
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> (super
>>>>>> easy to get working)
>>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>>
>>>>>> I confirmed that my Flink cluster was operational by running a simple
>>>>>> Wordcount job, initiated from my machine. However I wasn't yet able to get
>>>>>> Beam working:
>>>>>>
>>>>>> - With the Flink operator, I was able to submit a Beam job, but hit
>>>>>> the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>>> tried changing the operator's yaml files to add Docker inside them. I also
>>>>>> haven't tried this
>>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>>> yet because it implies submitting jobs using "kubectl apply"  which is
>>>>>> weird - why not just submit it through the Flink job server?
>>>>>>
>>>>>> - With Flink's native k8s support, I tried two things:
>>>>>>   - Creating a fat portable jar using  --output_executable_path. The
>>>>>> jar is huge (200+MB) and takes forever to upload to my Flink cluster - this
>>>>>> is a non-starter. But if I actually upload it, then I hit the same issue
>>>>>> with lacking Docker. Haven't tried fixing it yet.
>>>>>>   - Simply running my pipeline --runner=FlinkRunner
>>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
>>>>>> appears to send 1+GB of data to somewhere, but the job never even starts.
>>>>>>
>>>>>> I looked at a few conference talks:
>>>>>> -
>>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
>>>>>> workers; and that I need to submit my job using "kubectl apply".
>>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions
>>>>>> the sidecar, but also mentions the fat jar option
>>>>>>
>>>>>> --
>>>>>> Eugene Kirpichov
>>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Getting Beam(Python)-on-Flink-on-k8s to work

Posted by Eugene Kirpichov <ek...@gmail.com>.
Hi Sam,

You're a wizard - this got me *way* farther than my previous attempts.
Here's a PR https://github.com/sambvfx/beam-flink-k8s/pull/1 with a couple
of changes I had to make.

I had to make some additional changes that do not make sense to share, but
here they are for the record:
- Because I'm running on k8s engine and not minikube, I had to put the
docker-flink image on GCR, changing flink.yaml "image: docker-flink:1.10"
-> "image: us.gcr.io/$PROJECT_ID/docker-flink:1.10". I of course also had
to build and push the container.
- Because I'm running with a custom container based on an unreleased
version of Beam, I had to push my custom container to GCR too, and change
your instructions to use that image name instead of the default one
- To fetch the containers from GCR, I had to log into Docker inside the
Flink nodes, specifically inside the taskmanager container, using something
like "kubectl exec pod/flink-taskmanager-blahblah -c taskmanager -- docker
login -u oauth2accesstoken --password $(gcloud auth print-access-token)"
- Again because I'm using an unreleased Beam SDK (due to a bug whose fix
will be released in 2.24), I had to also build a custom Flink job server
jar and point to it via --flink_job_server_jar.

In the end, I got my pipeline to start, create the uber jar (about 240MB in
size), take a few minutes to transmit it to Flink (which is a long time,
but it'll do for a prototype); the Flink UI was displaying the pipeline,
and was able to *start* the worker container - however it quickly failed
with the following error:

2020/08/28 15:49:09 Initializing python harness: /opt/apache/beam/boot
--id=1-1 --provision_endpoint=localhost:45111
2020/08/28 15:49:09 Failed to retrieve staged files: failed to get manifest
caused by:
rpc error: code = Unimplemented desc = Method not found:
org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
(followed by a bunch of other garbage)

I'm assuming this might be because I got tangled in my custom images
related to the unreleased Beam SDK, and should be fixed if running on clean
Beam 2.24.

Thank you again!

On Fri, Aug 28, 2020 at 10:21 AM Eugene Kirpichov <ek...@gmail.com>
wrote:

> Holy shit, thanks Sam, this is more help than I could have asked for!!
> I'll give this a shot later today and report back.
>
> On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <sa...@gmail.com> wrote:
>
>> Hi Eugene!
>>
>> I’m struggling to find complete documentation on how to do this. There
>> seems to be lots of conflicting or incomplete information: several ways to
>> deploy Flink, several ways to get Beam working with it, bizarre
>> StackOverflow questions, and no documentation explaining a complete working
>> example.
>>
>> This *is* possible and I went through all the same frustrations of
>> sparse and confusing documentation. I’m glossing over a lot of details, but
>> the key thing was setting up the flink taskworker(s) to run docker. This
>> requires running docker-in-docker as the taskworker itself is a docker
>> container in k8s.
>>
>> First create a custom flink container with docker:
>>
>> # docker-flink Dockerfile
>>
>> FROM flink:1.10
>> # install docker
>> RUN apt-get ...
>>
>> Then setup the taskmanager deployment to use a sidecar docker-in-docker
>> service. This dind service is where the python sdk harness container
>> actually runs.
>>
>> kind: Deployment
>> ...
>>   containers:
>>   - name: docker
>>     image: docker:19.03.5-dind
>>     ...
>>   - name: taskmanger
>>     image: myregistry:5000/docker-flink:1.10
>>     env:
>>     - name: DOCKER_HOST
>>       value: tcp://localhost:2375
>> ...
>>
>> I quickly threw all these pieces together in a repo here:
>> https://github.com/sambvfx/beam-flink-k8s
>>
>> I added a working (via minikube) step-by-step in the README to prove to
>> myself that I didn’t miss anything, but feel free to submit any PRs if you
>> want to add anything useful.
>>
>> The documents you linked are very informative. It would be great to
>> aggregate all this into digestible documentation. Let me know if you have
>> any further questions!
>>
>> Cheers,
>> Sam
>>
>> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> Hi Kyle,
>>>
>>> Thanks for the response!
>>>
>>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kc...@google.com> wrote:
>>>
>>>> > - With the Flink operator, I was able to submit a Beam job, but hit
>>>> the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>> tried changing the operator's yaml files to add Docker inside them.
>>>>
>>>> Running Beam workers via Docker on the Flink nodes is not recommended
>>>> (and probably not even possible), since the Flink nodes are themselves
>>>> already running inside Docker containers. Running workers as sidecars
>>>> avoids that problem. For example:
>>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>>>
>>>> The main problem with the sidecar approach is that I can't use the
>>> Flink cluster as a "service" for anybody to submit their jobs with custom
>>> containers - the container version is fixed.
>>> Do I understand it correctly?
>>> Seems like the Docker-in-Docker approach is viable, and is mentioned in
>>> the Beam Flink K8s design doc
>>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
>>> .
>>>
>>>
>>>> > I also haven't tried this
>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> yet
>>>> because it implies submitting jobs using "kubectl apply"  which is weird -
>>>> why not just submit it through the Flink job server?
>>>>
>>>> I'm guessing it goes through k8s for monitoring purposes. I see no
>>>> reason it shouldn't be possible to submit to the job server directly
>>>> through Python, network permitting, though I haven't tried this.
>>>>
>>>>
>>>>
>>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <ek...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi folks,
>>>>>
>>>>> I'm still working with Pachama <https://pachama.com/> right now; we
>>>>> have a Kubernetes Engine cluster on GCP and want to run Beam Python batch
>>>>> pipelines with custom containers against it.
>>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow doesn't
>>>>> support custom containers for batch pipelines yes so we're going with Flink.
>>>>>
>>>>> I'm struggling to find complete documentation on how to do this. There
>>>>> seems to be lots of conflicting or incomplete information: several ways to
>>>>> deploy Flink, several ways to get Beam working with it, bizarre
>>>>> StackOverflow questions, and no documentation explaining a complete working
>>>>> example.
>>>>>
>>>>> == My requests ==
>>>>> * Could people briefly share their working setup? Would be good to
>>>>> know which directions are promising.
>>>>> * It would be particularly helpful if someone could volunteer an hour
>>>>> of their time to talk to me about their working Beam/Flink/k8s setup. It's
>>>>> for a good cause (fixing the planet :) ) and on my side I volunteer to
>>>>> write up the findings to share with the community so others suffer less.
>>>>>
>>>>> == Appendix: My findings so far ==
>>>>> There are multiple ways to deploy Flink on k8s:
>>>>> - The GCP marketplace Flink operator
>>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> (couldn't
>>>>> get it to work) and the respective CLI version
>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
>>>>> but I got it working)
>>>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>>>> - Flink's native k8s support
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> (super
>>>>> easy to get working)
>>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>>
>>>>> I confirmed that my Flink cluster was operational by running a simple
>>>>> Wordcount job, initiated from my machine. However I wasn't yet able to get
>>>>> Beam working:
>>>>>
>>>>> - With the Flink operator, I was able to submit a Beam job, but hit
>>>>> the issue that I need Docker installed on my Flink nodes. I haven't yet
>>>>> tried changing the operator's yaml files to add Docker inside them. I also
>>>>> haven't tried this
>>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>>> yet because it implies submitting jobs using "kubectl apply"  which is
>>>>> weird - why not just submit it through the Flink job server?
>>>>>
>>>>> - With Flink's native k8s support, I tried two things:
>>>>>   - Creating a fat portable jar using  --output_executable_path. The
>>>>> jar is huge (200+MB) and takes forever to upload to my Flink cluster - this
>>>>> is a non-starter. But if I actually upload it, then I hit the same issue
>>>>> with lacking Docker. Haven't tried fixing it yet.
>>>>>   - Simply running my pipeline --runner=FlinkRunner
>>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
>>>>> appears to send 1+GB of data to somewhere, but the job never even starts.
>>>>>
>>>>> I looked at a few conference talks:
>>>>> -
>>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>>>> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
>>>>> workers; and that I need to submit my job using "kubectl apply".
>>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions the
>>>>> sidecar, but also mentions the fat jar option
>>>>>
>>>>> --
>>>>> Eugene Kirpichov
>>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>>
>>>>
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>


-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Getting Beam(Python)-on-Flink-on-k8s to work

Posted by Eugene Kirpichov <ek...@gmail.com>.
Holy shit, thanks Sam, this is more help than I could have asked for!!
I'll give this a shot later today and report back.

On Thu, Aug 27, 2020 at 10:27 PM Sam Bourne <sa...@gmail.com> wrote:

> Hi Eugene!
>
> I’m struggling to find complete documentation on how to do this. There
> seems to be lots of conflicting or incomplete information: several ways to
> deploy Flink, several ways to get Beam working with it, bizarre
> StackOverflow questions, and no documentation explaining a complete working
> example.
>
> This *is* possible and I went through all the same frustrations of sparse
> and confusing documentation. I’m glossing over a lot of details, but the
> key thing was setting up the flink taskworker(s) to run docker. This
> requires running docker-in-docker as the taskworker itself is a docker
> container in k8s.
>
> First create a custom flink container with docker:
>
> # docker-flink Dockerfile
>
> FROM flink:1.10
> # install docker
> RUN apt-get ...
>
> Then setup the taskmanager deployment to use a sidecar docker-in-docker
> service. This dind service is where the python sdk harness container
> actually runs.
>
> kind: Deployment
> ...
>   containers:
>   - name: docker
>     image: docker:19.03.5-dind
>     ...
>   - name: taskmanger
>     image: myregistry:5000/docker-flink:1.10
>     env:
>     - name: DOCKER_HOST
>       value: tcp://localhost:2375
> ...
>
> I quickly threw all these pieces together in a repo here:
> https://github.com/sambvfx/beam-flink-k8s
>
> I added a working (via minikube) step-by-step in the README to prove to
> myself that I didn’t miss anything, but feel free to submit any PRs if you
> want to add anything useful.
>
> The documents you linked are very informative. It would be great to
> aggregate all this into digestible documentation. Let me know if you have
> any further questions!
>
> Cheers,
> Sam
>
> On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> Hi Kyle,
>>
>> Thanks for the response!
>>
>> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kc...@google.com> wrote:
>>
>>> > - With the Flink operator, I was able to submit a Beam job, but hit
>>> the issue that I need Docker installed on my Flink nodes. I haven't yet
>>> tried changing the operator's yaml files to add Docker inside them.
>>>
>>> Running Beam workers via Docker on the Flink nodes is not recommended
>>> (and probably not even possible), since the Flink nodes are themselves
>>> already running inside Docker containers. Running workers as sidecars
>>> avoids that problem. For example:
>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>>
>>> The main problem with the sidecar approach is that I can't use the Flink
>> cluster as a "service" for anybody to submit their jobs with custom
>> containers - the container version is fixed.
>> Do I understand it correctly?
>> Seems like the Docker-in-Docker approach is viable, and is mentioned in
>> the Beam Flink K8s design doc
>> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
>> .
>>
>>
>>> > I also haven't tried this
>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> yet
>>> because it implies submitting jobs using "kubectl apply"  which is weird -
>>> why not just submit it through the Flink job server?
>>>
>>> I'm guessing it goes through k8s for monitoring purposes. I see no
>>> reason it shouldn't be possible to submit to the job server directly
>>> through Python, network permitting, though I haven't tried this.
>>>
>>>
>>>
>>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <ek...@gmail.com>
>>> wrote:
>>>
>>>> Hi folks,
>>>>
>>>> I'm still working with Pachama <https://pachama.com/> right now; we
>>>> have a Kubernetes Engine cluster on GCP and want to run Beam Python batch
>>>> pipelines with custom containers against it.
>>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow doesn't
>>>> support custom containers for batch pipelines yes so we're going with Flink.
>>>>
>>>> I'm struggling to find complete documentation on how to do this. There
>>>> seems to be lots of conflicting or incomplete information: several ways to
>>>> deploy Flink, several ways to get Beam working with it, bizarre
>>>> StackOverflow questions, and no documentation explaining a complete working
>>>> example.
>>>>
>>>> == My requests ==
>>>> * Could people briefly share their working setup? Would be good to know
>>>> which directions are promising.
>>>> * It would be particularly helpful if someone could volunteer an hour
>>>> of their time to talk to me about their working Beam/Flink/k8s setup. It's
>>>> for a good cause (fixing the planet :) ) and on my side I volunteer to
>>>> write up the findings to share with the community so others suffer less.
>>>>
>>>> == Appendix: My findings so far ==
>>>> There are multiple ways to deploy Flink on k8s:
>>>> - The GCP marketplace Flink operator
>>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> (couldn't
>>>> get it to work) and the respective CLI version
>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
>>>> but I got it working)
>>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>>> - Flink's native k8s support
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> (super
>>>> easy to get working)
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>>
>>>> I confirmed that my Flink cluster was operational by running a simple
>>>> Wordcount job, initiated from my machine. However I wasn't yet able to get
>>>> Beam working:
>>>>
>>>> - With the Flink operator, I was able to submit a Beam job, but hit the
>>>> issue that I need Docker installed on my Flink nodes. I haven't yet tried
>>>> changing the operator's yaml files to add Docker inside them. I also
>>>> haven't tried this
>>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>>> yet because it implies submitting jobs using "kubectl apply"  which is
>>>> weird - why not just submit it through the Flink job server?
>>>>
>>>> - With Flink's native k8s support, I tried two things:
>>>>   - Creating a fat portable jar using  --output_executable_path. The
>>>> jar is huge (200+MB) and takes forever to upload to my Flink cluster - this
>>>> is a non-starter. But if I actually upload it, then I hit the same issue
>>>> with lacking Docker. Haven't tried fixing it yet.
>>>>   - Simply running my pipeline --runner=FlinkRunner
>>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
>>>> appears to send 1+GB of data to somewhere, but the job never even starts.
>>>>
>>>> I looked at a few conference talks:
>>>> -
>>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>>> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
>>>> workers; and that I need to submit my job using "kubectl apply".
>>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions the
>>>> sidecar, but also mentions the fat jar option
>>>>
>>>> --
>>>> Eugene Kirpichov
>>>> http://www.linkedin.com/in/eugenekirpichov
>>>>
>>>
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>

-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Getting Beam(Python)-on-Flink-on-k8s to work

Posted by Sam Bourne <sa...@gmail.com>.
Hi Eugene!

I’m struggling to find complete documentation on how to do this. There
seems to be lots of conflicting or incomplete information: several ways to
deploy Flink, several ways to get Beam working with it, bizarre
StackOverflow questions, and no documentation explaining a complete working
example.

This *is* possible and I went through all the same frustrations of sparse
and confusing documentation. I’m glossing over a lot of details, but the
key thing was setting up the flink taskworker(s) to run docker. This
requires running docker-in-docker as the taskworker itself is a docker
container in k8s.

First create a custom flink container with docker:

# docker-flink Dockerfile

FROM flink:1.10
# install docker
RUN apt-get ...

Then setup the taskmanager deployment to use a sidecar docker-in-docker
service. This dind service is where the python sdk harness container
actually runs.

kind: Deployment
...
  containers:
  - name: docker
    image: docker:19.03.5-dind
    ...
  - name: taskmanger
    image: myregistry:5000/docker-flink:1.10
    env:
    - name: DOCKER_HOST
      value: tcp://localhost:2375
...

I quickly threw all these pieces together in a repo here:
https://github.com/sambvfx/beam-flink-k8s

I added a working (via minikube) step-by-step in the README to prove to
myself that I didn’t miss anything, but feel free to submit any PRs if you
want to add anything useful.

The documents you linked are very informative. It would be great to
aggregate all this into digestible documentation. Let me know if you have
any further questions!

Cheers,
Sam

On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov <ek...@gmail.com>
wrote:

> Hi Kyle,
>
> Thanks for the response!
>
> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kc...@google.com> wrote:
>
>> > - With the Flink operator, I was able to submit a Beam job, but hit the
>> issue that I need Docker installed on my Flink nodes. I haven't yet tried
>> changing the operator's yaml files to add Docker inside them.
>>
>> Running Beam workers via Docker on the Flink nodes is not recommended
>> (and probably not even possible), since the Flink nodes are themselves
>> already running inside Docker containers. Running workers as sidecars
>> avoids that problem. For example:
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>
>> The main problem with the sidecar approach is that I can't use the Flink
> cluster as a "service" for anybody to submit their jobs with custom
> containers - the container version is fixed.
> Do I understand it correctly?
> Seems like the Docker-in-Docker approach is viable, and is mentioned in
> the Beam Flink K8s design doc
> <https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
> .
>
>
>> > I also haven't tried this
>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> yet
>> because it implies submitting jobs using "kubectl apply"  which is weird -
>> why not just submit it through the Flink job server?
>>
>> I'm guessing it goes through k8s for monitoring purposes. I see no reason
>> it shouldn't be possible to submit to the job server directly through
>> Python, network permitting, though I haven't tried this.
>>
>>
>>
>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <ek...@gmail.com>
>> wrote:
>>
>>> Hi folks,
>>>
>>> I'm still working with Pachama <https://pachama.com/> right now; we
>>> have a Kubernetes Engine cluster on GCP and want to run Beam Python batch
>>> pipelines with custom containers against it.
>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow doesn't
>>> support custom containers for batch pipelines yes so we're going with Flink.
>>>
>>> I'm struggling to find complete documentation on how to do this. There
>>> seems to be lots of conflicting or incomplete information: several ways to
>>> deploy Flink, several ways to get Beam working with it, bizarre
>>> StackOverflow questions, and no documentation explaining a complete working
>>> example.
>>>
>>> == My requests ==
>>> * Could people briefly share their working setup? Would be good to know
>>> which directions are promising.
>>> * It would be particularly helpful if someone could volunteer an hour of
>>> their time to talk to me about their working Beam/Flink/k8s setup. It's for
>>> a good cause (fixing the planet :) ) and on my side I volunteer to write up
>>> the findings to share with the community so others suffer less.
>>>
>>> == Appendix: My findings so far ==
>>> There are multiple ways to deploy Flink on k8s:
>>> - The GCP marketplace Flink operator
>>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> (couldn't
>>> get it to work) and the respective CLI version
>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
>>> but I got it working)
>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>> - Flink's native k8s support
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> (super
>>> easy to get working)
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>>
>>> I confirmed that my Flink cluster was operational by running a simple
>>> Wordcount job, initiated from my machine. However I wasn't yet able to get
>>> Beam working:
>>>
>>> - With the Flink operator, I was able to submit a Beam job, but hit the
>>> issue that I need Docker installed on my Flink nodes. I haven't yet tried
>>> changing the operator's yaml files to add Docker inside them. I also
>>> haven't tried this
>>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>>> yet because it implies submitting jobs using "kubectl apply"  which is
>>> weird - why not just submit it through the Flink job server?
>>>
>>> - With Flink's native k8s support, I tried two things:
>>>   - Creating a fat portable jar using  --output_executable_path. The jar
>>> is huge (200+MB) and takes forever to upload to my Flink cluster - this is
>>> a non-starter. But if I actually upload it, then I hit the same issue with
>>> lacking Docker. Haven't tried fixing it yet.
>>>   - Simply running my pipeline --runner=FlinkRunner
>>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
>>> appears to send 1+GB of data to somewhere, but the job never even starts.
>>>
>>> I looked at a few conference talks:
>>> -
>>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>>> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
>>> workers; and that I need to submit my job using "kubectl apply".
>>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions the
>>> sidecar, but also mentions the fat jar option
>>>
>>> --
>>> Eugene Kirpichov
>>> http://www.linkedin.com/in/eugenekirpichov
>>>
>>
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>

Re: Getting Beam(Python)-on-Flink-on-k8s to work

Posted by Eugene Kirpichov <ek...@gmail.com>.
Hi Kyle,

Thanks for the response!

On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver <kc...@google.com> wrote:

> > - With the Flink operator, I was able to submit a Beam job, but hit the
> issue that I need Docker installed on my Flink nodes. I haven't yet tried
> changing the operator's yaml files to add Docker inside them.
>
> Running Beam workers via Docker on the Flink nodes is not recommended (and
> probably not even possible), since the Flink nodes are themselves already
> running inside Docker containers. Running workers as sidecars avoids that
> problem. For example:
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>
> The main problem with the sidecar approach is that I can't use the Flink
cluster as a "service" for anybody to submit their jobs with custom
containers - the container version is fixed.
Do I understand it correctly?
Seems like the Docker-in-Docker approach is viable, and is mentioned
in the Beam
Flink K8s design doc
<https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.dtj1gnks47dq>
.


> > I also haven't tried this
> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md> yet
> because it implies submitting jobs using "kubectl apply"  which is weird -
> why not just submit it through the Flink job server?
>
> I'm guessing it goes through k8s for monitoring purposes. I see no reason
> it shouldn't be possible to submit to the job server directly through
> Python, network permitting, though I haven't tried this.
>
>
>
> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <ek...@gmail.com>
> wrote:
>
>> Hi folks,
>>
>> I'm still working with Pachama <https://pachama.com/> right now; we have
>> a Kubernetes Engine cluster on GCP and want to run Beam Python batch
>> pipelines with custom containers against it.
>> Flink and Cloud Dataflow are the two options; Cloud Dataflow doesn't
>> support custom containers for batch pipelines yes so we're going with Flink.
>>
>> I'm struggling to find complete documentation on how to do this. There
>> seems to be lots of conflicting or incomplete information: several ways to
>> deploy Flink, several ways to get Beam working with it, bizarre
>> StackOverflow questions, and no documentation explaining a complete working
>> example.
>>
>> == My requests ==
>> * Could people briefly share their working setup? Would be good to know
>> which directions are promising.
>> * It would be particularly helpful if someone could volunteer an hour of
>> their time to talk to me about their working Beam/Flink/k8s setup. It's for
>> a good cause (fixing the planet :) ) and on my side I volunteer to write up
>> the findings to share with the community so others suffer less.
>>
>> == Appendix: My findings so far ==
>> There are multiple ways to deploy Flink on k8s:
>> - The GCP marketplace Flink operator
>> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> (couldn't
>> get it to work) and the respective CLI version
>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
>> but I got it working)
>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>> - Flink's native k8s support
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> (super
>> easy to get working)
>> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>>
>> I confirmed that my Flink cluster was operational by running a simple
>> Wordcount job, initiated from my machine. However I wasn't yet able to get
>> Beam working:
>>
>> - With the Flink operator, I was able to submit a Beam job, but hit the
>> issue that I need Docker installed on my Flink nodes. I haven't yet tried
>> changing the operator's yaml files to add Docker inside them. I also
>> haven't tried this
>> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
>> yet because it implies submitting jobs using "kubectl apply"  which is
>> weird - why not just submit it through the Flink job server?
>>
>> - With Flink's native k8s support, I tried two things:
>>   - Creating a fat portable jar using  --output_executable_path. The jar
>> is huge (200+MB) and takes forever to upload to my Flink cluster - this is
>> a non-starter. But if I actually upload it, then I hit the same issue with
>> lacking Docker. Haven't tried fixing it yet.
>>   - Simply running my pipeline --runner=FlinkRunner
>> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
>> appears to send 1+GB of data to somewhere, but the job never even starts.
>>
>> I looked at a few conference talks:
>> -
>> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
>> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
>> workers; and that I need to submit my job using "kubectl apply".
>> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions the
>> sidecar, but also mentions the fat jar option
>>
>> --
>> Eugene Kirpichov
>> http://www.linkedin.com/in/eugenekirpichov
>>
>

-- 
Eugene Kirpichov
http://www.linkedin.com/in/eugenekirpichov

Re: Getting Beam(Python)-on-Flink-on-k8s to work

Posted by Kyle Weaver <kc...@google.com>.
> - With the Flink operator, I was able to submit a Beam job, but hit the
issue that I need Docker installed on my Flink nodes. I haven't yet tried
changing the operator's yaml files to add Docker inside them.

Running Beam workers via Docker on the Flink nodes is not recommended (and
probably not even possible), since the Flink nodes are themselves already
running inside Docker containers. Running workers as sidecars avoids that
problem. For example:
https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20

> I also haven't tried this
<https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
yet
because it implies submitting jobs using "kubectl apply"  which is weird -
why not just submit it through the Flink job server?

I'm guessing it goes through k8s for monitoring purposes. I see no reason
it shouldn't be possible to submit to the job server directly through
Python, network permitting, though I haven't tried this.



On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov <ek...@gmail.com>
wrote:

> Hi folks,
>
> I'm still working with Pachama <https://pachama.com/> right now; we have
> a Kubernetes Engine cluster on GCP and want to run Beam Python batch
> pipelines with custom containers against it.
> Flink and Cloud Dataflow are the two options; Cloud Dataflow doesn't
> support custom containers for batch pipelines yes so we're going with Flink.
>
> I'm struggling to find complete documentation on how to do this. There
> seems to be lots of conflicting or incomplete information: several ways to
> deploy Flink, several ways to get Beam working with it, bizarre
> StackOverflow questions, and no documentation explaining a complete working
> example.
>
> == My requests ==
> * Could people briefly share their working setup? Would be good to know
> which directions are promising.
> * It would be particularly helpful if someone could volunteer an hour of
> their time to talk to me about their working Beam/Flink/k8s setup. It's for
> a good cause (fixing the planet :) ) and on my side I volunteer to write up
> the findings to share with the community so others suffer less.
>
> == Appendix: My findings so far ==
> There are multiple ways to deploy Flink on k8s:
> - The GCP marketplace Flink operator
> <https://cloud.google.com/blog/products/data-analytics/open-source-processing-engines-for-kubernetes> (couldn't
> get it to work) and the respective CLI version
> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> (buggy,
> but I got it working)
> - https://github.com/lyft/flinkk8soperator (haven't tried)
> - Flink's native k8s support
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html> (super
> easy to get working)
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/native_kubernetes.html>
>
> I confirmed that my Flink cluster was operational by running a simple
> Wordcount job, initiated from my machine. However I wasn't yet able to get
> Beam working:
>
> - With the Flink operator, I was able to submit a Beam job, but hit the
> issue that I need Docker installed on my Flink nodes. I haven't yet tried
> changing the operator's yaml files to add Docker inside them. I also
> haven't tried this
> <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/docs/beam_guide.md>
> yet because it implies submitting jobs using "kubectl apply"  which is
> weird - why not just submit it through the Flink job server?
>
> - With Flink's native k8s support, I tried two things:
>   - Creating a fat portable jar using  --output_executable_path. The jar
> is huge (200+MB) and takes forever to upload to my Flink cluster - this is
> a non-starter. But if I actually upload it, then I hit the same issue with
> lacking Docker. Haven't tried fixing it yet.
>   - Simply running my pipeline --runner=FlinkRunner
> --environment_type=DOCKER --flink_master=$PUBLIC_IP:8081. The Java process
> appears to send 1+GB of data to somewhere, but the job never even starts.
>
> I looked at a few conference talks:
> -
> https://www.cncf.io/wp-content/uploads/2020/02/CNCF-Webinar_-Apache-Flink-on-Kubernetes-Operator-1.pdf
> - seems to imply that I need to add a Beam worker "sidecar" to the Flink
> workers; and that I need to submit my job using "kubectl apply".
> - https://www.youtube.com/watch?v=8k1iezoc5Sc which also mentions the
> sidecar, but also mentions the fat jar option
>
> --
> Eugene Kirpichov
> http://www.linkedin.com/in/eugenekirpichov
>