You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Adam Roberts <AR...@uk.ibm.com> on 2020/08/25 12:40:04 UTC

Example flink run with security options? Running on k8s in my case

Hey everyone, I've been experimenting with Flink using
<https://github.com/GoogleCloudPlatform/flink-on-k8s-operator> and I believe
I've successfully deployed a JobManager and TaskManager with security enabled,
and a self-signed certificate (the pods come up great).



However, I can't do much with this - I can't port-forward and access the UI,
nor can I submit jobs to it by running another pod and using the DNS name
lookup of the service.



I always get



The program finished with the following exception:  
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
JobGraph.



...



Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel
became inactive.  
    ... 37 more





and this is even with all of the -D security options provided.



The versions of Flink are the same for both my Job and my FlinkCluster
(1.11.1).



Is this a sensible thing to do? If I weren't using the operator for example,
would users be expected to flink run with all of these options?



Does anything look odd here? My guess is because security's on, the Job
Manager refuses to talk to my submitter.



Running as the flink user in the container, I do



      securityContext:

        runAsUser: 9999

        runAsGroup: 9999

      containers:

      \- name: wordcount

        image: adamroberts/mycoolflink:latest

        args:

        \- /opt/flink/bin/flink

        \- run

        \- -D

        \- security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key

        \- -D 

        \- security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks

        \- -D 

        \- security.ssl.rest.keystore-password=thepass # Replace with value of flink-tls-keystore.password

        \- -D 

        \- security.ssl.rest.key-password=thepass # Replace with value of tls.p12.password

        \- -D 

        \- security.ssl.rest.truststore-password=thepass # Replace with value of flink-tls-ca.truststore.password

        \- -D 

        \- security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key

        \- -D 

        \- security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks

        \- -D 

        \- security.ssl.internal.keystore-password=thepass # Replace with value of flink-tls-keystore.password

        \- -D 

        \- security.ssl.internal.key-password=thepass # Replace with value of flink-tls-keystore.password

        \- -D 

        \- security.ssl.internal.truststore-password=thepass # Replace with value of flink-tls-truststore.password

        \- -m

        \- tls-flink-cluster-1-11-jobmanager:8081

        \- /opt/flink/examples/batch/WordCount.jar 

        \- --input 

        \- /opt/flink/NOTICE



with the secrets mounted in at the above location (if I exec into my
container, I can see they're all there OK). Note that it is a read-only file
system.



adamroberts/mycoolflink (at this time of this email) is just based on
<https://github.com/apache/flink-docker>.



Thanks!



Unless stated otherwise above:  
IBM United Kingdom Limited - Registered in England and Wales with number
741598.  
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU  
  


Re: Example flink run with security options? Running on k8s in my case

Posted by Nico Kruber <ni...@ververica.com>.
Actually, your curl command may be incorrect since you didn't specify https as 
the protocol: Its man page says:
> If you specify URL without protocol:// prefix, curl will attempt to guess
> what protocol you might want. It will then default to HTTP but try other
> protocols based on often-used host name prefixes. For example, for host
> names starting with "ftp." curl will assume you want to speak FTP.

So I guess it wasn't actually using that and failed to connect. Unfortunately, 
an empty response doesn't tell you much since it could have established a 
connection which was then reset by the server. Please use one of Flink's REST 
endpoints[1] to verify - these should have some content in the response.

It may also be useful to pair curl with `--verbose` for more output and also 
look at the JM logs for any such problems.


I'm not sure how the GCP flink operator sets things up, but if submitting the 
job is independent of starting the JM and TM pods, you don't need any of the 
internal SSL configuration parameters for submitting a job. This is a per-
cluster setting!


As for the certificate generation: I'm not sure "myhost.company.org,ip:
127.0.0.1" would work here if the client is accessing the JM via the name 
"tls-flink-cluster-1-11-jobmanager"...I'm not 100% sure here, but I would 
assume there is verification on the actual URL that the certificate is 
supposed to secure. What you were saying when creating it was that the URL is 
either "myhost.company.org" or "127.0.0.1" which is not correct in the non-
local case.


Just one further note here: Because setting up SSL can be difficult, our 
Ververica Platform (also on the free-to-use community edition) comes with a 
SSL setup [2] that you can enable with a click of a button and it just works 
as expected. Maybe also something to check out (not just for configuring SSL). 
Feel free to contact me personally for more in this regard.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/
rest_api.html
[2] https://docs.ververica.com/user_guide/deployments/
configure_flink.html#ssl-tls-setup

On Thursday, 27 August 2020 13:36:45 CEST Adam Roberts wrote:
> Hey folks, outside of Kubernetes things are great yep, with the same
> generated files. 
> So to share what I'm doing a little more... and I've modified things to be
> more inline with the current docs 
> keytool -genkeypair -alias flink.internal -keystore internal.keystore -dname
> "CN=flink.internal" -storepass internal_store_password -keyalg RSA -keysize
> 4096 -storetype PKCS12 keytool -genkeypair -alias flink.rest -keystore
> rest.keystore -dname "CN=myhost.company.org" -ext
> "SAN=dns:myhost.company.org,ip:127.0.0.1" -storepass rest_keystore_password
> -keyalg RSA -keysize 4096 -storetype PKCS12 keytool -exportcert -keystore
> rest.keystore -alias flink.rest -storepass rest_keystore_password -file
> flink.cer keytool -importcert -keystore rest.truststore -alias flink.rest
> -storepass rest_truststore_password -file flink.cer -noprompt kubectl
> delete secret flink-tls-secret-2
> # Create the simpler secret from main docs for Flink
> cat << EOF | kubectl create -n abp -f -
>   apiVersion: v1
>   kind: Secret
>   type: Opaque
>   metadata:
>     name: flink-tls-secret-2
>   data:
>     rest.keystore: $(cat ./rest.keystore | base64 | tr -d '\n')
>     rest.truststore: $(cat ./rest.truststore | base64 | tr -d '\n')
>     internal.keystore: $(cat ./internal.keystore | base64 | tr -d '\n')
>     internal.truststore: $(cat ./internal.keystore | base64 | tr -d '\n')
> EOF
>  
> I run this script to get flink-tls-secret-2 with those files in, the keytool
> commands should be familiar since they're from the Flink 1.11 security
> docs). 
> Note I don't have a file called internal.truststore but neither do the docs,
> they mention file.truststore but don't tell me how that's made...maybe this
> is the problem? But things are fine with my normal Flink outside of
> Kubernetes set up. 
> The Job CustomResource does:
> 
> apiVersion: batch/v1
> kind: Job
> metadata:
>   name: sample-job
>   labels:
>     app: flink-job
> spec:
>   template:
>     spec:
>       # Run as flink user
>       securityContext:
>         runAsUser: 9999
>         runAsGroup: 9999
>       containers:
>       - name: wordcount
>         # Replace this to be a Docker image with your built Flink app at a
> known location # Your build of Flink should be based on
> https://github.com/apache/flink-docker/tree/master/1.11/scala_2.12-java8-de
> bian # with a modification to the Dockerfile to add your jar in (with a
> COPY) image: adamroberts/mycoolflink:latest
>         - /opt/flink/bin/flink
>         - run
>         - -D security.ssl.internal.enabled=true
>         - -D security.ssl.rest.enabled=true
>         - -D security.ssl.rest.keystore=/etc/flink-secrets/rest.keystore
>         - -D security.ssl.rest.truststore=/etc/flink-secrets/rest.truststore
> - -D security.ssl.rest.keystore-password=rest_keystore_password - -D
> security.ssl.rest.key-password=rest_keystore_password - -D
> security.ssl.rest.truststore-password=rest_truststore_password - -D
> security.ssl.internal.keystore=/etc/flink-secrets/internal.keystore - -D
> security.ssl.internal.truststore=/etc/flink-secrets/internal.keystore - -D
> security.ssl.internal.keystore-password=internal_store_password - -D
> security.ssl.internal.key-password=internal_store_password - -D
> security.ssl.internal.truststore-password=internal_store_password - -m
>         - tls-flink-cluster-1-11-jobmanager:8081
>         - /opt/flink/examples/batch/WordCount.jar 
>         - --input 
>         - /opt/flink/NOTICE
>         volumeMounts:
>           - name: flink-secret-volume
>             mountPath: /etc/flink-secrets
>       volumes:
>       - name: flink-secret-volume
>         secret:
>           secretName: flink-tls-secret-2
>       restartPolicy: Never
>  
> If I modify that to be a simple curl image but keeping the secrets mounted
> in, I can kubectl exec in and curl the JobManager at 
> tls-flink-cluster-1-11-jobmanager:8081 - I get no response, but I get an
> error if I go to a different port or URL. 
> The secrets do look ok inside the container too.
>  
> The Cluster spec looks like this now
>  
> apiVersion: flinkoperator.k8s.io/v1beta1
> kind: FlinkCluster
> metadata:
>   name: tls-flink-cluster-1-11
> spec:
>   jobManager:
>     volumeMounts:
>     - name: flink-secret-volume
>       mountPath: /etc/flink-secrets
>     volumes:
>     - name: flink-secret-volume
>       secret:
>         secretName: flink-tls-secret-2
>     resources:
>       limits:
>         memory: 600Mi
>         cpu: "1.0"
>   taskManager:
>     volumeMounts:
>       - name: flink-secret-volume
>         mountPath: /etc/flink-secrets
>     volumes:
>     - name: flink-secret-volume
>       secret:
>         secretName: flink-tls-secret-2
>     replicas: 1
>     resources:
>       limits:
>         memory: 1Gi
>         cpu: "1.0"
>   image:
>     name: adamroberts/mycoolflink:latest
>   flinkProperties:
>     #
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.htm
> l is helpful for this part. web.submit.enable: "false"
>     security.ssl.rest.enabled: "true"
>     security.ssl.rest.keystore: "/etc/flink-secrets/rest.keystore"
>     security.ssl.rest.truststore: "/etc/flink-secrets/rest.truststore"
>     security.ssl.rest.keystore-password: "rest_keystore_password"
>     security.ssl.rest.key-password: "rest_keystore_password"
>     security.ssl.rest.truststore-password: "rest_truststore_password"
>     security.ssl.internal.enabled: "true"
>     security.ssl.internal.keystore: "/etc/flink-secrets/internal.keystore"
>     security.ssl.internal.truststore: "/etc/flink-secrets/internal.keystore"
> security.ssl.internal.keystore-password: "internal_store_password"
> security.ssl.internal.key-password: "internal_store_password"
>     security.ssl.internal.truststore-password: "internal_store_password"
>     taskmanager.numberOfTaskSlots: "1"
>     jobmanager.heap.size: ""                # set empty value (only for
> Flink version 1.11 or above) jobmanager.memory.process.size: 1gb   # job
> manager memory limit  (only for Flink version 1.11 or above)
> taskmanager.heap.size: ""               # set empty value
>     taskmanager.memory.process.size: 1gb    # task manager memory limit
>  
> Cheers,
> ----- Original message -----
> From: Andrey Zagrebin <az...@apache.org>
> To: Adam Roberts <AR...@uk.ibm.com>
> Cc: nkruber@apache.org, user <us...@flink.apache.org>
> Subject: [EXTERNAL] Re: Example flink run with security options? Running on
> k8s in my case Date: Wed, Aug 26, 2020 5:35 PM
>  
> Hi Adam,
> 
> maybe also check your SSL setup in a local cluster to exclude possibly
> related k8s things.
> 
> Best,
> Andrey
>  
> On Wed, Aug 26, 2020 at 3:59 PM Adam Roberts <AR...@uk.ibm.com> wrote:
> Hey Nico - thanks for the prompt response, good catch - I've just tried with
> the two security options (enabling rest and internal SSL communications)
> and still hit the same problem 
> I've also tried turning off security (both in my Job definition and in my
> Flink cluster JobManager/TaskManager settings) and the communication does
> happen successfully, suggesting all is well otherwise. 
> With regards to testing with just a regular curl, I switched security back
> on and did the curl, using this: 
> 
> openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in
> /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
> 
> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
> 
> curl --cacert rest.pem --cert rest.pem
> tls-flink-cluster-1-11-jobmanager:8081
> 
>  
> from the Job CR pod, which is who runs the flink run against my JobManager
> i'd like to connect to. 
> That gives 
>  
> 
> $ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in
> /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
> 
> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
> 
> curl --cacert rest.pem --cert rest.pem
> tls-flink-cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1
> encoding routines:ASN1_get_object:header too
> long:../crypto/asn1/asn1_lib.c:101:
> 
> so I wonder if my security set up itself is flawed...I'll be happy to share
> the scripting I have to do that if folks feel it'll be of use, thanks again
> 
> ----- Original message -----
> From: Nico Kruber <nk...@apache.org>
> To: user@flink.apache.org
> Cc: Adam Roberts <AR...@uk.ibm.com>
> Subject: [EXTERNAL] Re: Example flink run with security options? Running on
> k8s in my case Date: Wed, Aug 26, 2020 11:40 AM
>  
> Hi Adam,
> the flink binary will pick up any configuration from the flink-conf.yaml of
> its directory. If that is the same as in the cluster, you wouldn't have to
> pass most of your parameters manually. However, if you prefer not having a
> flink-conf.yaml in place, you could remove the security.ssl.internal.*
> parameter from its call since those only affect internal communication.
> 
> If the client's connection to the JM is denied, you would actually have this
> in the JM logs as well which you could check.
> 
> To check whether your whole setup works, I would suggest to try without
> security enabled first and then enable it (just to rule out any other
> issues)
> 
> From the commands you mentioned, it looks like you're just missing
> security.ssl.rest.enabled=true and because of that, the client would not use
> SSL for the connection.
> 
> For more information and setup, I recommend reading through [1] which also
> contains an example at the bottom of the page and how to use curl to test or
> use the REST endpoint.
> 
> 
> Nico
> 
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-s
> sl.html 
> On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:
> > Hey everyone, I've been experimenting with Flink
> > using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator  and I
> > believe I've successfully deployed a JobManager and TaskManager with
> > security enabled, and a self-signed certificate (the pods come up great).
> > However, I can't do much with this - I can't port-forward and access the
> > UI, nor can I submit jobs to it by running another pod and using the DNS
> > name lookup of the service.
> > I always get
> >  
> > The program finished with the following exception:
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method
> > caused an error: java.util.concurrent.ExecutionException:
> > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> > JobGraph.
> > ...
> >  
> > Caused by: org.apache.flink.runtime.rest.ConnectionClosedException:
> > Channel
> > became inactive. ... 37 more
> >  
> >  
> > and this is even with all of the -D security options provided.
> >  
> > The versions of Flink are the same for both my Job and my FlinkCluster
> > (1.11.1).
> > Is this a sensible thing to do? If I weren't using the operator for
> > example, would users be expected to flink run with all of these options?
> > Does anything look odd here? My guess is because security's on, the Job
> > Manager refuses to talk to my submitter.
> > Running as the flink user in the container, I do
> >  
> > 
> >       securityContext:
> > 
> >         runAsUser: 9999
> > 
> >         runAsGroup: 9999
> > 
> >       containers:
> > 
> >       - name: wordcount
> > 
> >         image: adamroberts/mycoolflink:latest
> > 
> >         args:
> > 
> >         - /opt/flink/bin/flink
> > 
> >         - run
> > 
> >         - -D
> > 
> >         -
> > security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
> > 
> >         - -D
> > 
> >         -
> > security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jk
> > s
> > 
> >         - -D
> > 
> >         - security.ssl.rest.keystore-password=thepass # Replace with value
> > of flink-tls-keystore.password
> > 
> >         - -D
> > 
> >         - security.ssl.rest.key-password=thepass # Replace with value of
> > tls.p12.password
> > 
> >         - -D
> > 
> >         - security.ssl.rest.truststore-password=thepass # Replace with
> > value of flink-tls-ca.truststore.password
> > 
> >         - -D
> > 
> >         -
> > security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key
> > 
> >         - -D
> > 
> >         -
> > security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststor
> > e
> > .jks
> > 
> >         - -D
> > 
> >         - security.ssl.internal.keystore-password=thepass # Replace with
> > value of flink-tls-keystore.password
> > 
> >         - -D
> > 
> >         - security.ssl.internal.key-password=thepass # Replace with value
> > of flink-tls-keystore.password
> > 
> >         - -D
> > 
> >         - security.ssl.internal.truststore-password=thepass # Replace with
> > value of flink-tls-truststore.password
> > 
> >         - -m
> > 
> >         - tls-flink-cluster-1-11-jobmanager:8081
> > 
> >         - /opt/flink/examples/batch/WordCount.jar
> > 
> >         - --input
> > 
> >         - /opt/flink/NOTICE
> > 
> >  
> > with the secrets mounted in at the above location (if I exec into my
> > container, I can see they're all there OK). Note that it is a read-only
> > file system.
> > adamroberts/mycoolflink (at this time of this email) is just based
> > on https://github.com/apache/flink-docker .
> > Thanks!
> >  
> > Unless stated otherwise above:
> > IBM United Kingdom Limited - Registered in England and Wales with number
> > 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> > PO6 3AU
> 
>  
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU 
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU


-- 
Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton 
Wehner
-- 
Dr. Nico Kruber | Solutions Architect

Follow us @VervericaData Ververica
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton 
Wehner

RE: Example flink run with security options? Running on k8s in my case

Posted by Adam Roberts <AR...@uk.ibm.com>.
Hey folks, outside of Kubernetes things are great yep, with the same generated
files.



So to share what I'm doing a little more... and I've modified things to be
more inline with the current docs



keytool -genkeypair -alias flink.internal -keystore internal.keystore -dname
"CN=flink.internal" -storepass internal_store_password -keyalg RSA -keysize
4096 -storetype PKCS12  
keytool -genkeypair -alias flink.rest -keystore rest.keystore -dname
"CN=myhost.company.org" -ext "SAN=dns:myhost.company.org,ip:127.0.0.1"
-storepass rest_keystore_password -keyalg RSA -keysize 4096 -storetype PKCS12  
keytool -exportcert -keystore rest.keystore -alias flink.rest -storepass
rest_keystore_password -file flink.cer  
keytool -importcert -keystore rest.truststore -alias flink.rest -storepass
rest_truststore_password -file flink.cer -noprompt

kubectl delete secret flink-tls-secret-2  
# Create the simpler secret from main docs for Flink  
cat << EOF | kubectl create -n abp -f -  
  apiVersion: v1  
  kind: Secret  
  type: Opaque  
  metadata:  
    name: flink-tls-secret-2  
  data:  
    rest.keystore: $(cat ./rest.keystore | base64 | tr -d '\n')  
    rest.truststore: $(cat ./rest.truststore | base64 | tr -d '\n')  
    internal.keystore: $(cat ./internal.keystore | base64 | tr -d '\n')  
    internal.truststore: $(cat ./internal.keystore | base64 | tr -d '\n')  
EOF



I run this script to get flink-tls-secret-2 with those files in, the keytool
commands should be familiar since they're from the Flink 1.11 security docs).



Note I don't have a file called internal.truststore but neither do the docs,
they mention file.truststore but don't tell me how that's made...maybe this is
the problem? But things are fine with my normal Flink outside of Kubernetes
set up.



The Job CustomResource does:

  
apiVersion: batch/v1  
kind: Job  
metadata:  
  name: sample-job  
  labels:  
    app: flink-job  
spec:  
  template:  
    spec:  
      # Run as flink user  
      securityContext:  
        runAsUser: 9999  
        runAsGroup: 9999  
      containers:  
      \- name: wordcount  
        # Replace this to be a Docker image with your built Flink app at a known location  
        # Your build of Flink should be based on <https://github.com/apache/flink-docker/tree/master/1.11/scala_2.12-java8-debian>  
        # with a modification to the Dockerfile to add your jar in (with a COPY)  
        image: adamroberts/mycoolflink:latest  
        \- /opt/flink/bin/flink  
        \- run  
        \- -D security.ssl.internal.enabled=true  
        \- -D security.ssl.rest.enabled=true  
        \- -D security.ssl.rest.keystore=/etc/flink-secrets/rest.keystore  
        \- -D security.ssl.rest.truststore=/etc/flink-secrets/rest.truststore  
        \- -D security.ssl.rest.keystore-password=rest_keystore_password  
        \- -D security.ssl.rest.key-password=rest_keystore_password  
        \- -D security.ssl.rest.truststore-password=rest_truststore_password  
        \- -D security.ssl.internal.keystore=/etc/flink-secrets/internal.keystore  
        \- -D security.ssl.internal.truststore=/etc/flink-secrets/internal.keystore  
        \- -D security.ssl.internal.keystore-password=internal_store_password  
        \- -D security.ssl.internal.key-password=internal_store_password  
        \- -D security.ssl.internal.truststore-password=internal_store_password  
        \- -m  
        \- tls-flink-cluster-1-11-jobmanager:8081  
        \- /opt/flink/examples/batch/WordCount.jar   
        \- --input   
        \- /opt/flink/NOTICE  
        volumeMounts:  
          \- name: flink-secret-volume  
            mountPath: /etc/flink-secrets  
      volumes:  
      \- name: flink-secret-volume  
        secret:  
          secretName: flink-tls-secret-2  
      restartPolicy: Never



If I modify that to be a simple curl image but keeping the secrets mounted in,
I can kubectl exec in and curl the JobManager at  tls-flink-
cluster-1-11-jobmanager:8081 - I get no response, but I get an error if I go
to a different port or URL.



The secrets do look ok inside the container too.



The Cluster spec looks like this now



apiVersion: flinkoperator.k8s.io/v1beta1  
kind: FlinkCluster  
metadata:  
  name: tls-flink-cluster-1-11  
spec:  
  jobManager:  
    volumeMounts:  
    \- name: flink-secret-volume  
      mountPath: /etc/flink-secrets  
    volumes:  
    \- name: flink-secret-volume  
      secret:  
        secretName: flink-tls-secret-2  
    resources:  
      limits:  
        memory: 600Mi  
        cpu: "1.0"  
  taskManager:  
    volumeMounts:  
      \- name: flink-secret-volume  
        mountPath: /etc/flink-secrets  
    volumes:  
    \- name: flink-secret-volume  
      secret:  
        secretName: flink-tls-secret-2  
    replicas: 1  
    resources:  
      limits:  
        memory: 1Gi  
        cpu: "1.0"  
  image:  
    name: adamroberts/mycoolflink:latest  
  flinkProperties:  
    # <https://ci.apache.org/projects/flink/flink-docs-stable/ops/security-ssl.html> is helpful for this part.  
    web.submit.enable: "false"

    security.ssl.rest.enabled: "true"

    security.ssl.rest.keystore: "/etc/flink-secrets/rest.keystore"  
    security.ssl.rest.truststore: "/etc/flink-secrets/rest.truststore"  
    security.ssl.rest.keystore-password: "rest_keystore_password"  
    security.ssl.rest.key-password: "rest_keystore_password"  
    security.ssl.rest.truststore-password: "rest_truststore_password"

    security.ssl.internal.enabled: "true"

    security.ssl.internal.keystore: "/etc/flink-secrets/internal.keystore"  
    security.ssl.internal.truststore: "/etc/flink-secrets/internal.keystore"  
    security.ssl.internal.keystore-password: "internal_store_password"  
    security.ssl.internal.key-password: "internal_store_password"  
    security.ssl.internal.truststore-password: "internal_store_password"

    taskmanager.numberOfTaskSlots: "1"  
    jobmanager.heap.size: ""                # set empty value (only for Flink version 1.11 or above)  
    jobmanager.memory.process.size: 1gb   # job manager memory limit  (only for Flink version 1.11 or above)  
    taskmanager.heap.size: ""               # set empty value  
    taskmanager.memory.process.size: 1gb    # task manager memory limit



Cheers,

> \----- Original message -----  
> From: Andrey Zagrebin <az...@apache.org>  
> To: Adam Roberts <AR...@uk.ibm.com>  
> Cc: nkruber@apache.org, user <us...@flink.apache.org>  
> Subject: [EXTERNAL] Re: Example flink run with security options? Running on
k8s in my case  
> Date: Wed, Aug 26, 2020 5:35 PM  
>  
>

> Hi Adam,  
>  
> maybe also check your SSL setup in a local cluster to exclude possibly
related k8s things.  
>  
> Best,  
> Andrey

>

>  
>

> On Wed, Aug 26, 2020 at 3:59 PM Adam Roberts
<[AROBERTS@uk.ibm.com](mailto:AROBERTS@uk.ibm.com)> wrote:

>

>> Hey Nico - thanks for the prompt response, good catch - I've just tried
with the two security options (enabling rest and internal SSL communications)
and still hit the same problem

>>

>>  
>>

>> I've also tried turning off security (both in my Job definition and in my
Flink cluster JobManager/TaskManager settings) and the communication does
happen successfully, suggesting all is well otherwise.

>>

>>  
>>

>> With regards to testing with just a regular curl, I switched security back
on and did the curl, using this:

>>

>>  
>>

>> openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-
tls-keystore.key -out rest.pem -nodes

>>

>> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

>>

>> curl --cacert rest.pem --cert rest.pem tls-flink-
cluster-1-11-jobmanager:8081

>>

>>  
>>

>> from the Job CR pod, which is who runs the flink run against my JobManager
i'd like to connect to.

>>

>>  
>>

>> That gives

>>

>>  
>>

>> $ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-
secrets/flink-tls-keystore.key -out rest.pem -nodes

>>

>> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

>>

>> curl --cacert rest.pem --cert rest.pem tls-flink-
cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1 encoding
routines:ASN1_get_object:header too long:../crypto/asn1/asn1_lib.c:101:

>>

>> so I wonder if my security set up itself is flawed...I'll be happy to share
the scripting I have to do that if folks feel it'll be of use, thanks again

>>

>>  
>>

>>> \----- Original message -----  
> From: Nico Kruber <[nkruber@apache.org](mailto:nkruber@apache.org)>  
> To: [user@flink.apache.org](mailto:user@flink.apache.org)  
> Cc: Adam Roberts <[AROBERTS@uk.ibm.com](mailto:AROBERTS@uk.ibm.com)>  
> Subject: [EXTERNAL] Re: Example flink run with security options? Running on
k8s in my case  
> Date: Wed, Aug 26, 2020 11:40 AM  
>  
>>>

>>> Hi Adam,  
> the flink binary will pick up any configuration from the flink-conf.yaml of  
> its directory. If that is the same as in the cluster, you wouldn't have to  
> pass most of your parameters manually. However, if you prefer not having a  
> flink-conf.yaml in place, you could remove the security.ssl.internal.*  
> parameter from its call since those only affect internal communication.  
>  
> If the client's connection to the JM is denied, you would actually have this  
> in the JM logs as well which you could check.  
>  
> To check whether your whole setup works, I would suggest to try without  
> security enabled first and then enable it (just to rule out any other
issues)  
>  
> From the commands you mentioned, it looks like you're just missing  
> security.ssl.rest.enabled=true and because of that, the client would not use  
> SSL for the connection.  
>  
> For more information and setup, I recommend reading through [1] which also  
> contains an example at the bottom of the page and how to use curl to test or  
> use the REST endpoint.  
>  
>  
> Nico  
>  
>  
> [1] <https://ci.apache.org/projects/flink/flink-docs-
release-1.11/ops/security-ssl.html>  
>  
> On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:  
> > Hey everyone, I've been experimenting with Flink  
> > using <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator>  and
I  
> > believe I've successfully deployed a JobManager and TaskManager with  
> > security enabled, and a self-signed certificate (the pods come up great).  
> > However, I can't do much with this - I can't port-forward and access the
UI,  
> > nor can I submit jobs to it by running another pod and using the DNS name  
> > lookup of the service.  
> > I always get  
> >  
> > The program finished with the following exception:  
> > org.apache.flink.client.program.ProgramInvocationException: The main
method  
> > caused an error: java.util.concurrent.ExecutionException:  
> > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit  
> > JobGraph.  
> > ...  
> >  
> > Caused by: org.apache.flink.runtime.rest.ConnectionClosedException:
Channel  
> > became inactive. ... 37 more  
> >  
> >  
> > and this is even with all of the -D security options provided.  
> >  
> > The versions of Flink are the same for both my Job and my FlinkCluster  
> > (1.11.1).  
> > Is this a sensible thing to do? If I weren't using the operator for
example,  
> > would users be expected to flink run with all of these options?  
> > Does anything look odd here? My guess is because security's on, the Job  
> > Manager refuses to talk to my submitter.  
> > Running as the flink user in the container, I do  
> >  
> >  
> >       securityContext:  
> >  
> >         runAsUser: 9999  
> >  
> >         runAsGroup: 9999  
> >  
> >       containers:  
> >  
> >       \- name: wordcount  
> >  
> >         image: adamroberts/mycoolflink:latest  
> >  
> >         args:  
> >  
> >         \- /opt/flink/bin/flink  
> >  
> >         \- run  
> >  
> >         \- -D  
> >  
> >         -  
> > security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key  
> >  
> >         \- -D  
> >  
> >         -  
> > security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-
truststore.jks  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.rest.keystore-password=thepass # Replace with
value  
> > of flink-tls-keystore.password  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.rest.key-password=thepass # Replace with value of  
> > tls.p12.password  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.rest.truststore-password=thepass # Replace with
value  
> > of flink-tls-ca.truststore.password  
> >  
> >         \- -D  
> >  
> >         -  
> > security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key  
> >  
> >         \- -D  
> >  
> >         -  
> > security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-
truststore  
> > .jks  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.internal.keystore-password=thepass # Replace with  
> > value of flink-tls-keystore.password  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.internal.key-password=thepass # Replace with value
of  
> > flink-tls-keystore.password  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.internal.truststore-password=thepass # Replace
with  
> > value of flink-tls-truststore.password  
> >  
> >         \- -m  
> >  
> >         \- tls-flink-cluster-1-11-jobmanager:8081  
> >  
> >         \- /opt/flink/examples/batch/WordCount.jar  
> >  
> >         \- --input  
> >  
> >         \- /opt/flink/NOTICE  
> >  
> >  
> > with the secrets mounted in at the above location (if I exec into my  
> > container, I can see they're all there OK). Note that it is a read-only  
> > file system.  
> > adamroberts/mycoolflink (at this time of this email) is just based  
> > on <https://github.com/apache/flink-docker> .  
> > Thanks!  
> >  
> > Unless stated otherwise above:  
> > IBM United Kingdom Limited - Registered in England and Wales with number  
> > 741598\. Registered office: PO Box 41, North Harbour, Portsmouth,
Hampshire  
> > PO6 3AU  
>  
>  
>  
>>

>>  
>>

>> Unless stated otherwise above:  
> IBM United Kingdom Limited - Registered in England and Wales with number
741598.  
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU  
>  



Unless stated otherwise above:  
IBM United Kingdom Limited - Registered in England and Wales with number
741598.  
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU  
  


Re: Example flink run with security options? Running on k8s in my case

Posted by Andrey Zagrebin <az...@apache.org>.
Hi Adam,

maybe also check your SSL setup in a local cluster to exclude possibly
related k8s things.

Best,
Andrey

On Wed, Aug 26, 2020 at 3:59 PM Adam Roberts <AR...@uk.ibm.com> wrote:

> Hey Nico - thanks for the prompt response, good catch - I've just tried
> with the two security options (enabling rest and internal SSL
> communications) and still hit the same problem
>
> I've also tried turning off security (both in my Job definition and in my
> Flink cluster JobManager/TaskManager settings) and the communication does
> happen successfully, suggesting all is well otherwise.
>
> With regards to testing with just a regular curl, I switched security back
> on and did the curl, using this:
>
>
> openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in
> /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
>
> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
>
> curl --cacert rest.pem --cert rest.pem
> tls-flink-cluster-1-11-jobmanager:8081
>
> from the Job CR pod, which is who runs the flink run against my JobManager
> i'd like to connect to.
>
> That gives
>
>
> $ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in
> /etc/flink-secrets/flink-tls-keystore.key -out rest.pem -nodes
>
> curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081
>
> curl --cacert rest.pem --cert rest.pem
> tls-flink-cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1
> encoding routines:ASN1_get_object:header too
> long:../crypto/asn1/asn1_lib.c:101:
> so I wonder if my security set up itself is flawed...I'll be happy to
> share the scripting I have to do that if folks feel it'll be of use, thanks
> again
>
>
> ----- Original message -----
> From: Nico Kruber <nk...@apache.org>
> To: user@flink.apache.org
> Cc: Adam Roberts <AR...@uk.ibm.com>
> Subject: [EXTERNAL] Re: Example flink run with security options? Running
> on k8s in my case
> Date: Wed, Aug 26, 2020 11:40 AM
>
> Hi Adam,
> the flink binary will pick up any configuration from the flink-conf.yaml of
> its directory. If that is the same as in the cluster, you wouldn't have to
> pass most of your parameters manually. However, if you prefer not having a
> flink-conf.yaml in place, you could remove the security.ssl.internal.*
> parameter from its call since those only affect internal communication.
>
> If the client's connection to the JM is denied, you would actually have
> this
> in the JM logs as well which you could check.
>
> To check whether your whole setup works, I would suggest to try without
> security enabled first and then enable it (just to rule out any other
> issues)
>
> From the commands you mentioned, it looks like you're just missing
> security.ssl.rest.enabled=true and because of that, the client would not
> use
> SSL for the connection.
>
> For more information and setup, I recommend reading through [1] which also
> contains an example at the bottom of the page and how to use curl to test
> or
> use the REST endpoint.
>
>
> Nico
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html
>
>
> On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:
> > Hey everyone, I've been experimenting with Flink
> > using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator  and
> I
> > believe I've successfully deployed a JobManager and TaskManager with
> > security enabled, and a self-signed certificate (the pods come up great).
> > However, I can't do much with this - I can't port-forward and access the
> UI,
> > nor can I submit jobs to it by running another pod and using the DNS name
> > lookup of the service.
> > I always get
> >
> > The program finished with the following exception:
> > org.apache.flink.client.program.ProgramInvocationException: The main
> method
> > caused an error: java.util.concurrent.ExecutionException:
> > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> > JobGraph.
> > ...
> >
> > Caused by: org.apache.flink.runtime.rest.ConnectionClosedException:
> Channel
> > became inactive. ... 37 more
> >
> >
> > and this is even with all of the -D security options provided.
> >
> > The versions of Flink are the same for both my Job and my FlinkCluster
> > (1.11.1).
> > Is this a sensible thing to do? If I weren't using the operator for
> example,
> > would users be expected to flink run with all of these options?
> > Does anything look odd here? My guess is because security's on, the Job
> > Manager refuses to talk to my submitter.
> > Running as the flink user in the container, I do
> >
> >
> >       securityContext:
> >
> >         runAsUser: 9999
> >
> >         runAsGroup: 9999
> >
> >       containers:
> >
> >       - name: wordcount
> >
> >         image: adamroberts/mycoolflink:latest
> >
> >         args:
> >
> >         - /opt/flink/bin/flink
> >
> >         - run
> >
> >         - -D
> >
> >         -
> > security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
> >
> >         - -D
> >
> >         -
> >
> security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks
> >
> >         - -D
> >
> >         - security.ssl.rest.keystore-password=thepass # Replace with
> value
> > of flink-tls-keystore.password
> >
> >         - -D
> >
> >         - security.ssl.rest.key-password=thepass # Replace with value of
> > tls.p12.password
> >
> >         - -D
> >
> >         - security.ssl.rest.truststore-password=thepass # Replace with
> value
> > of flink-tls-ca.truststore.password
> >
> >         - -D
> >
> >         -
> > security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key
> >
> >         - -D
> >
> >         -
> >
> security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststore
> > .jks
> >
> >         - -D
> >
> >         - security.ssl.internal.keystore-password=thepass # Replace with
> > value of flink-tls-keystore.password
> >
> >         - -D
> >
> >         - security.ssl.internal.key-password=thepass # Replace with
> value of
> > flink-tls-keystore.password
> >
> >         - -D
> >
> >         - security.ssl.internal.truststore-password=thepass # Replace
> with
> > value of flink-tls-truststore.password
> >
> >         - -m
> >
> >         - tls-flink-cluster-1-11-jobmanager:8081
> >
> >         - /opt/flink/examples/batch/WordCount.jar
> >
> >         - --input
> >
> >         - /opt/flink/NOTICE
> >
> >
> > with the secrets mounted in at the above location (if I exec into my
> > container, I can see they're all there OK). Note that it is a read-only
> > file system.
> > adamroberts/mycoolflink (at this time of this email) is just based
> > on https://github.com/apache/flink-docker .
> > Thanks!
> >
> > Unless stated otherwise above:
> > IBM United Kingdom Limited - Registered in England and Wales with number
> > 741598. Registered office: PO Box 41, North Harbour, Portsmouth,
> Hampshire
> > PO6 3AU
>
>
>
>
>
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598.
> Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU
>
>

RE: Example flink run with security options? Running on k8s in my case

Posted by Adam Roberts <AR...@uk.ibm.com>.
Hey Nico - thanks for the prompt response, good catch - I've just tried with
the two security options (enabling rest and internal SSL communications) and
still hit the same problem



I've also tried turning off security (both in my Job definition and in my
Flink cluster JobManager/TaskManager settings) and the communication does
happen successfully, suggesting all is well otherwise.



With regards to testing with just a regular curl, I switched security back on
and did the curl, using this:



openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-tls-
keystore.key -out rest.pem -nodes

curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

curl --cacert rest.pem --cert rest.pem tls-flink-cluster-1-11-jobmanager:8081



from the Job CR pod, which is who runs the flink run against my JobManager i'd
like to connect to.



That gives



$ openssl pkcs12 -passin pass:OhQYGhmtYLxWhnMC -in /etc/flink-secrets/flink-
tls-keystore.key -out rest.pem -nodes

curl --cacert rest.pem tls-flink-cluster-1-11-jobmanager:8081

curl --cacert rest.pem --cert rest.pem tls-flink-
cluster-1-11-jobmanager:8081139676043637888:error:0D07207B:asn1 encoding
routines:ASN1_get_object:header too long:../crypto/asn1/asn1_lib.c:101:

so I wonder if my security set up itself is flawed...I'll be happy to share
the scripting I have to do that if folks feel it'll be of use, thanks again



> \----- Original message -----  
> From: Nico Kruber <nk...@apache.org>  
> To: user@flink.apache.org  
> Cc: Adam Roberts <AR...@uk.ibm.com>  
> Subject: [EXTERNAL] Re: Example flink run with security options? Running on
k8s in my case  
> Date: Wed, Aug 26, 2020 11:40 AM  
>  
>

> Hi Adam,  
> the flink binary will pick up any configuration from the flink-conf.yaml of  
> its directory. If that is the same as in the cluster, you wouldn't have to  
> pass most of your parameters manually. However, if you prefer not having a  
> flink-conf.yaml in place, you could remove the security.ssl.internal.*  
> parameter from its call since those only affect internal communication.  
>  
> If the client's connection to the JM is denied, you would actually have this  
> in the JM logs as well which you could check.  
>  
> To check whether your whole setup works, I would suggest to try without  
> security enabled first and then enable it (just to rule out any other
issues)  
>  
> From the commands you mentioned, it looks like you're just missing  
> security.ssl.rest.enabled=true and because of that, the client would not use  
> SSL for the connection.  
>  
> For more information and setup, I recommend reading through [1] which also  
> contains an example at the bottom of the page and how to use curl to test or  
> use the REST endpoint.  
>  
>  
> Nico  
>  
>  
> [1] <https://ci.apache.org/projects/flink/flink-docs-
release-1.11/ops/security-ssl.html>  
>  
> On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:  
> > Hey everyone, I've been experimenting with Flink  
> > using <https://github.com/GoogleCloudPlatform/flink-on-k8s-operator>  and
I  
> > believe I've successfully deployed a JobManager and TaskManager with  
> > security enabled, and a self-signed certificate (the pods come up great).  
> > However, I can't do much with this - I can't port-forward and access the
UI,  
> > nor can I submit jobs to it by running another pod and using the DNS name  
> > lookup of the service.  
> > I always get  
> >  
> > The program finished with the following exception:  
> > org.apache.flink.client.program.ProgramInvocationException: The main
method  
> > caused an error: java.util.concurrent.ExecutionException:  
> > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit  
> > JobGraph.  
> > ...  
> >  
> > Caused by: org.apache.flink.runtime.rest.ConnectionClosedException:
Channel  
> > became inactive. ... 37 more  
> >  
> >  
> > and this is even with all of the -D security options provided.  
> >  
> > The versions of Flink are the same for both my Job and my FlinkCluster  
> > (1.11.1).  
> > Is this a sensible thing to do? If I weren't using the operator for
example,  
> > would users be expected to flink run with all of these options?  
> > Does anything look odd here? My guess is because security's on, the Job  
> > Manager refuses to talk to my submitter.  
> > Running as the flink user in the container, I do  
> >  
> >  
> >       securityContext:  
> >  
> >         runAsUser: 9999  
> >  
> >         runAsGroup: 9999  
> >  
> >       containers:  
> >  
> >       \- name: wordcount  
> >  
> >         image: adamroberts/mycoolflink:latest  
> >  
> >         args:  
> >  
> >         \- /opt/flink/bin/flink  
> >  
> >         \- run  
> >  
> >         \- -D  
> >  
> >         -  
> > security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key  
> >  
> >         \- -D  
> >  
> >         -  
> > security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-
truststore.jks  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.rest.keystore-password=thepass # Replace with
value  
> > of flink-tls-keystore.password  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.rest.key-password=thepass # Replace with value of  
> > tls.p12.password  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.rest.truststore-password=thepass # Replace with
value  
> > of flink-tls-ca.truststore.password  
> >  
> >         \- -D  
> >  
> >         -  
> > security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key  
> >  
> >         \- -D  
> >  
> >         -  
> > security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-
truststore  
> > .jks  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.internal.keystore-password=thepass # Replace with  
> > value of flink-tls-keystore.password  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.internal.key-password=thepass # Replace with value
of  
> > flink-tls-keystore.password  
> >  
> >         \- -D  
> >  
> >         \- security.ssl.internal.truststore-password=thepass # Replace
with  
> > value of flink-tls-truststore.password  
> >  
> >         \- -m  
> >  
> >         \- tls-flink-cluster-1-11-jobmanager:8081  
> >  
> >         \- /opt/flink/examples/batch/WordCount.jar  
> >  
> >         \- --input  
> >  
> >         \- /opt/flink/NOTICE  
> >  
> >  
> > with the secrets mounted in at the above location (if I exec into my  
> > container, I can see they're all there OK). Note that it is a read-only  
> > file system.  
> > adamroberts/mycoolflink (at this time of this email) is just based  
> > on <https://github.com/apache/flink-docker> .  
> > Thanks!  
> >  
> > Unless stated otherwise above:  
> > IBM United Kingdom Limited - Registered in England and Wales with number  
> > 741598\. Registered office: PO Box 41, North Harbour, Portsmouth,
Hampshire  
> > PO6 3AU  
>  
>  
>  



Unless stated otherwise above:  
IBM United Kingdom Limited - Registered in England and Wales with number
741598.  
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU  
  


Re: Example flink run with security options? Running on k8s in my case

Posted by Nico Kruber <nk...@apache.org>.
Hi Adam,
the flink binary will pick up any configuration from the flink-conf.yaml of 
its directory. If that is the same as in the cluster, you wouldn't have to 
pass most of your parameters manually. However, if you prefer not having a 
flink-conf.yaml in place, you could remove the security.ssl.internal.* 
parameter from its call since those only affect internal communication.

If the client's connection to the JM is denied, you would actually have this 
in the JM logs as well which you could check.

To check whether your whole setup works, I would suggest to try without 
security enabled first and then enable it (just to rule out any other issues)

From the commands you mentioned, it looks like you're just missing 
security.ssl.rest.enabled=true and because of that, the client would not use 
SSL for the connection.

For more information and setup, I recommend reading through [1] which also 
contains an example at the bottom of the page and how to use curl to test or 
use the REST endpoint.


Nico


[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/security-ssl.html

On Tuesday, 25 August 2020 14:40:04 CEST Adam Roberts wrote:
> Hey everyone, I've been experimenting with Flink
> using https://github.com/GoogleCloudPlatform/flink-on-k8s-operator and I
> believe I've successfully deployed a JobManager and TaskManager with
> security enabled, and a self-signed certificate (the pods come up great). 
> However, I can't do much with this - I can't port-forward and access the UI,
> nor can I submit jobs to it by running another pod and using the DNS name
> lookup of the service. 
> I always get
>  
> The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.client.JobSubmissionException: Failed to submit
> JobGraph. 
> ...
>  
> Caused by: org.apache.flink.runtime.rest.ConnectionClosedException: Channel
> became inactive. ... 37 more
>  
>  
> and this is even with all of the -D security options provided.
>  
> The versions of Flink are the same for both my Job and my FlinkCluster
> (1.11.1). 
> Is this a sensible thing to do? If I weren't using the operator for example,
> would users be expected to flink run with all of these options? 
> Does anything look odd here? My guess is because security's on, the Job
> Manager refuses to talk to my submitter. 
> Running as the flink user in the container, I do
>  
> 
>       securityContext:
> 
>         runAsUser: 9999
> 
>         runAsGroup: 9999
> 
>       containers:
> 
>       - name: wordcount
> 
>         image: adamroberts/mycoolflink:latest
> 
>         args:
> 
>         - /opt/flink/bin/flink
> 
>         - run
> 
>         - -D
> 
>         -
> security.ssl.rest.keystore=/etc/flink-secrets/flink-tls-keystore.key
> 
>         - -D 
> 
>         -
> security.ssl.rest.truststore=/etc/flink-secrets/flink-tls-ca-truststore.jks
> 
>         - -D 
> 
>         - security.ssl.rest.keystore-password=thepass # Replace with value
> of flink-tls-keystore.password
> 
>         - -D 
> 
>         - security.ssl.rest.key-password=thepass # Replace with value of
> tls.p12.password
> 
>         - -D 
> 
>         - security.ssl.rest.truststore-password=thepass # Replace with value
> of flink-tls-ca.truststore.password
> 
>         - -D 
> 
>         -
> security.ssl.internal.keystore=/etc/flink-secrets/flink-tls-keystore.key
> 
>         - -D 
> 
>         -
> security.ssl.internal.truststore=/etc/flink-secrets/flink-tls-ca-truststore
> .jks
> 
>         - -D 
> 
>         - security.ssl.internal.keystore-password=thepass # Replace with
> value of flink-tls-keystore.password
> 
>         - -D 
> 
>         - security.ssl.internal.key-password=thepass # Replace with value of
> flink-tls-keystore.password
> 
>         - -D 
> 
>         - security.ssl.internal.truststore-password=thepass # Replace with
> value of flink-tls-truststore.password
> 
>         - -m
> 
>         - tls-flink-cluster-1-11-jobmanager:8081
> 
>         - /opt/flink/examples/batch/WordCount.jar 
> 
>         - --input 
> 
>         - /opt/flink/NOTICE
> 
>  
> with the secrets mounted in at the above location (if I exec into my
> container, I can see they're all there OK). Note that it is a read-only
> file system. 
> adamroberts/mycoolflink (at this time of this email) is just based
> on https://github.com/apache/flink-docker. 
> Thanks!
>  
> Unless stated otherwise above:
> IBM United Kingdom Limited - Registered in England and Wales with number
> 741598. Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire
> PO6 3AU