You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lee Parayno <le...@gmail.com> on 2023/01/17 19:40:30 UTC

Apache Beam MinimalWordCount on Flink on Kubernetes using Flink Kubernetes Operator on GCP

I have a Kubernetes cluster in GCP running the Flink Kubernetes Operator.

I'm trying to package a project with the Apache Beam MinimalWordCount using
the Flink Runner as a FlinkDeployment to the Kubernetes Cluster

Job Docker image created with this Dockerfile:

FROM flink

ENV FLINK_CLASSPATH /opt/flink/lib/*
ENV CLASSPATH /opt/flink/lib/*

# Add Google Dependencies
ADD
https://repo1.maven.org/maven2/com/google/guava/guava/31.1-jre/guava-31.1-jre.jar
/opt/flink/lib/

# Add Google Cloud Platform Dependencies
ADD
https://repo1.maven.org/maven2/com/google/cloud/google-cloud-core/2.9.0/google-cloud-core-2.9.0.jar
/opt/flink/lib/
ADD
https://repo1.maven.org/maven2/com/google/cloud/google-cloud-core-http/2.9.0/google-cloud-core-http-2.9.0.jar
/opt/flink/lib/
ADD
https://repo1.maven.org/maven2/com/google/cloud/google-cloud-core-grpc/2.9.0/google-cloud-core-grpc-2.9.0.jar
/opt/flink/lib/

# Add dependencies for accessing Google Cloud Storage

ADD
https://repo1.maven.org/maven2/com/google/cloud/google-cloud-storage/2.9.3/google-cloud-storage-2.9.3.jar
/opt/flink/lib/
ADD
https://repo1.maven.org/maven2/com/google/auth/google-auth-library-oauth2-http/1.9.0/google-auth-library-oauth2-http-1.9.0.jar
/opt/flink/lib/
ADD
https://repo1.maven.org/maven2/com/google/http-client/google-http-client/1.42.3/google-http-client-1.42.3.jar
/opt/flink/lib/

# Apache Beam
ADD
https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-core/2.43.0/beam-sdks-java-core-2.43.0.jar
/opt/flink/lib/
ADD
https://repo1.maven.org/maven2/org/apache/beam/beam-runners-direct-java/2.43.0/beam-runners-direct-java-2.43.0.jar
/opt/flink/lib/
ADD
https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-extensions-google-cloud-platform-core/2.43.0/beam-sdks-java-extensions-google-cloud-platform-core-2.43.0.jar
/opt/flink/lib/
ADD
https://repo1.maven.org/maven2/org/apache/beam/beam-runners-flink_2.11/2.16.0/beam-runners-flink_2.11-2.16.0.jar
/opt/flink/lib/


ADD target/helloworld-bundled-1.0-SNAPSHOT.jar /opt/flink/lib/

This is the yaml for the FlinkDeployment:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: minimal-word-count2
spec:
#image: flink:1.15
image: <docker repo>/flink_with_minimal_word_count
flinkVersion: v1_16
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
ingress:
template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)"
className: "nginx"
annotations:
nginx.ingress.kubernetes.io/rewrite-target: "/$2"
jobManager:
resource:
memory: "1048m"
cpu: 0.75
taskManager:
# template:
# spec:
# env:
# - name: CLASSPATH
# value: "/opt/flink/lib/dependencies:/opt/flink/lib/*"
resource:
memory: "1048m"
cpu: 0.75
job:
#jarURI:
local:///opt/flink/usrlib/helloworld-1.0-SNAPSHOT-jar-with-dependencies.jar
#jarURI: local:///opt/flink/usrlib/helloworld-bundled-1.0-SNAPSHOT.jar
jarURI: local:///opt/flink/lib/helloworld-bundled-1.0-SNAPSHOT.jar
parallelism: 2
upgradeMode: stateless

When I apply the yaml, the pod crashes with this error:
rg.apache.flink.util.FlinkException: Could not load the provided entrypoint
class.
at
org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:215)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:100)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)
[flink-dist-1.16.0.jar:1.16.0]
Caused by: org.apache.flink.client.program.ProgramInvocationException: JAR
file does not exist '/opt/flink/lib/helloworld-bundled-1.0-SNAPSHOT.jar'
at
org.apache.flink.client.program.PackagedProgram.checkJarFile(PackagedProgram.java:617)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.program.PackagedProgram.loadJarFile(PackagedProgram.java:465)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:135)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:213)
~[flink-dist-1.16.0.jar:1.16.0]
... 2 more
Caused by: java.io.IOException: JAR file does not exist
'/opt/flink/lib/helloworld-bundled-1.0-SNAPSHOT.jar'
at org.apache.flink.util.JarUtils.checkJarFile(JarUtils.java:46)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.program.PackagedProgram.checkJarFile(PackagedProgram.java:615)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.program.PackagedProgram.loadJarFile(PackagedProgram.java:465)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:135)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
~[flink-dist-1.16.0.jar:1.16.0]
at
org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:213)
~[flink-dist-1.16.0.jar:1.16.0]
... 2 more

If I exec into the container, the jar is in the /opt/flink/lib (or
/opt/flink/usrlib when I tried copying the fat jar into there), but the
PackagedProgram client doesn't appear to be able to find it.

Does the PackagedProgram need a different configuration setting to find the
jar?  What am I missing?

Re: Apache Beam MinimalWordCount on Flink on Kubernetes using Flink Kubernetes Operator on GCP

Posted by Yang Wang <wa...@apache.org>.
The "JAR file does not exist" exception comes from the JobManager side, not
on the client.
Please be aware that the local:// scheme in the jarURI means the path in
the JobManager pod.

You could use an init-container to download your user jar and mount it to
the JobManager main-container.
Refer to the examples[1] for more information.

Of cause, you could also build your own Flink image(NOT the
flink-kubernetes-operator image) with user jar bundled[2].

[1].
https://github.com/apache/flink-kubernetes-operator/blob/main/examples/pod-template.yaml#L65
[2].
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/resource-providers/native_kubernetes/#application-mode


Best,
Yang


Lee Parayno <le...@gmail.com> 于2023年1月18日周三 03:41写道:

> I have a Kubernetes cluster in GCP running the Flink Kubernetes Operator.
>
> I'm trying to package a project with the Apache Beam MinimalWordCount
> using the Flink Runner as a FlinkDeployment to the Kubernetes Cluster
>
> Job Docker image created with this Dockerfile:
>
> FROM flink
>
> ENV FLINK_CLASSPATH /opt/flink/lib/*
> ENV CLASSPATH /opt/flink/lib/*
>
> # Add Google Dependencies
> ADD
> https://repo1.maven.org/maven2/com/google/guava/guava/31.1-jre/guava-31.1-jre.jar
> /opt/flink/lib/
>
> # Add Google Cloud Platform Dependencies
> ADD
> https://repo1.maven.org/maven2/com/google/cloud/google-cloud-core/2.9.0/google-cloud-core-2.9.0.jar
> /opt/flink/lib/
> ADD
> https://repo1.maven.org/maven2/com/google/cloud/google-cloud-core-http/2.9.0/google-cloud-core-http-2.9.0.jar
> /opt/flink/lib/
> ADD
> https://repo1.maven.org/maven2/com/google/cloud/google-cloud-core-grpc/2.9.0/google-cloud-core-grpc-2.9.0.jar
> /opt/flink/lib/
>
> # Add dependencies for accessing Google Cloud Storage
>
> ADD
> https://repo1.maven.org/maven2/com/google/cloud/google-cloud-storage/2.9.3/google-cloud-storage-2.9.3.jar
> /opt/flink/lib/
> ADD
> https://repo1.maven.org/maven2/com/google/auth/google-auth-library-oauth2-http/1.9.0/google-auth-library-oauth2-http-1.9.0.jar
> /opt/flink/lib/
> ADD
> https://repo1.maven.org/maven2/com/google/http-client/google-http-client/1.42.3/google-http-client-1.42.3.jar
> /opt/flink/lib/
>
> # Apache Beam
> ADD
> https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-core/2.43.0/beam-sdks-java-core-2.43.0.jar
> /opt/flink/lib/
> ADD
> https://repo1.maven.org/maven2/org/apache/beam/beam-runners-direct-java/2.43.0/beam-runners-direct-java-2.43.0.jar
> /opt/flink/lib/
> ADD
> https://repo1.maven.org/maven2/org/apache/beam/beam-sdks-java-extensions-google-cloud-platform-core/2.43.0/beam-sdks-java-extensions-google-cloud-platform-core-2.43.0.jar
> /opt/flink/lib/
> ADD
> https://repo1.maven.org/maven2/org/apache/beam/beam-runners-flink_2.11/2.16.0/beam-runners-flink_2.11-2.16.0.jar
> /opt/flink/lib/
>
>
> ADD target/helloworld-bundled-1.0-SNAPSHOT.jar /opt/flink/lib/
>
> This is the yaml for the FlinkDeployment:
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
> name: minimal-word-count2
> spec:
> #image: flink:1.15
> image: <docker repo>/flink_with_minimal_word_count
> flinkVersion: v1_16
> flinkConfiguration:
> taskmanager.numberOfTaskSlots: "1"
> serviceAccount: flink
> ingress:
> template: "flink.k8s.io/{{namespace}}/{{name}}(/|$)(.*)
> <http://flink.k8s.io/%7B%7Bnamespace%7D%7D/%7B%7Bname%7D%7D(/%7C$)(.*)>"
> className: "nginx"
> annotations:
> nginx.ingress.kubernetes.io/rewrite-target: "/$2"
> jobManager:
> resource:
> memory: "1048m"
> cpu: 0.75
> taskManager:
> # template:
> # spec:
> # env:
> # - name: CLASSPATH
> # value: "/opt/flink/lib/dependencies:/opt/flink/lib/*"
> resource:
> memory: "1048m"
> cpu: 0.75
> job:
> #jarURI:
> local:///opt/flink/usrlib/helloworld-1.0-SNAPSHOT-jar-with-dependencies.jar
> #jarURI: local:///opt/flink/usrlib/helloworld-bundled-1.0-SNAPSHOT.jar
> jarURI: local:///opt/flink/lib/helloworld-bundled-1.0-SNAPSHOT.jar
> parallelism: 2
> upgradeMode: stateless
>
> When I apply the yaml, the pod crashes with this error:
> rg.apache.flink.util.FlinkException: Could not load the provided
> entrypoint class.
> at
> org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:215)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:100)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)
> [flink-dist-1.16.0.jar:1.16.0]
> Caused by: org.apache.flink.client.program.ProgramInvocationException: JAR
> file does not exist '/opt/flink/lib/helloworld-bundled-1.0-SNAPSHOT.jar'
> at
> org.apache.flink.client.program.PackagedProgram.checkJarFile(PackagedProgram.java:617)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.client.program.PackagedProgram.loadJarFile(PackagedProgram.java:465)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:135)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:213)
> ~[flink-dist-1.16.0.jar:1.16.0]
> ... 2 more
> Caused by: java.io.IOException: JAR file does not exist
> '/opt/flink/lib/helloworld-bundled-1.0-SNAPSHOT.jar'
> at org.apache.flink.util.JarUtils.checkJarFile(JarUtils.java:46)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.client.program.PackagedProgram.checkJarFile(PackagedProgram.java:615)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.client.program.PackagedProgram.loadJarFile(PackagedProgram.java:465)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:135)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
> ~[flink-dist-1.16.0.jar:1.16.0]
> at
> org.apache.flink.client.program.DefaultPackagedProgramRetriever.getPackagedProgram(DefaultPackagedProgramRetriever.java:213)
> ~[flink-dist-1.16.0.jar:1.16.0]
> ... 2 more
>
> If I exec into the container, the jar is in the /opt/flink/lib (or
> /opt/flink/usrlib when I tried copying the fat jar into there), but the
> PackagedProgram client doesn't appear to be able to find it.
>
> Does the PackagedProgram need a different configuration setting to find
> the jar?  What am I missing?
>
>
>
>
>