You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/13 08:44:54 UTC

[GitHub] [flink] zhengcanbin opened a new pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

zhengcanbin opened a new pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715
 
 
   ## What is the purpose of the change
   
   Currently, we parse the `rest.port` from the Flink `Configuration` when a user submits his jobs to an existing native Kubernetes session cluster, this is definitely wrong since we do not respect the real port exposed by the external Service created when deploying the session cluster. This PR will fix this problem.
   
   
   ## Brief change log
   
     - *Refactor the logic on retrieving the rest-port in the method of `Fabric8FlinkKubeClient#getRestEndpoint`*
     - *Introduce a new test class `KubernetesClientTestBase` which provides some tools for the Service*
   
   
   ## Verifying this change
   
   This change added unit tests and can be verified as follows:
   
     - *Start a native Kubernetes session cluster, by default, the rest port exposed by the rest Service is 8081*
     - *Then submit a job to the session cluster, but specify a different rest port via -Drest.port=8082*
     - *As expected, the job is successfully submitted to the session cluster*
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-612818039
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0d42d620980962950546b1c7090ed88e67a179c5 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409308841
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   I have no idea if you really want to completely separate the `LoadBalancer` and `NodePort`. However, in such case, we should wait for the load balancer ready and return the `EXTERNAL-IP:8081` as JobManager url. Otherwise, the Flink client will timeout and should clean-up the cluster resources.
   
   Then for unmanaged K8s cluster without load balancer, we will enforce the users to set `kubernetes.rest-service.exposed.type=NodePort` explicitly, rather than return a confusing JobManager url `MASTER_ADDRESS:8081`. Also the submission will always fail with timeout.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409297786
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   The azure pipeline passed just because we have disabled the K8s related e2e tests. If you run in an unmanaged K8s cluster without load balancer support, then it could not work now.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-614379194
 
 
   > Thanks for your contribution @zhengcanbin !
   > 
   > Is there a test that fails without this patch and passes with this patch so that we guard the behaviour?
   
   Thanks, @TisonKun! `Fabric8FlinkKubeClientTest#testServiceLoadBalancerWithNoIP` and `Fabric8FlinkKubeClientTest#testClusterIPService` are exactly the cases cover this test branch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409364554
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   Glad to hear that you will create a new ticket to rework the `getRestEndpoint` according to `LoadBalacer` or `NodePort`. I think this change makes sense to me.
   
   Currently, the problem in the unmanaged K8s cluster without LD is Flink client print a wrong JobManager url, which is in such schema `K8s_Master:8081`, it should be `K8s_Master:NodePort`. I think this could be fixed in your upcoming tickets.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin removed a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin removed a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-614359006
 
 
   > Thanks for your contribution @zhengcanbin !
   > 
   > Is there a test that fails without this patch and passes with this patch so that we guard the behaviour?
   
   Thanks, @TisonKun! Sorry for the late reply. We have a case described in [FLINK-16598](uhttps://issues.apache.org/jira/browse/FLINK-16598rl).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-612818039
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160027473",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "CANCELED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160027473",
       "triggerID" : "612843367",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392",
       "triggerID" : "612843367",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7396",
       "triggerID" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160035358",
       "triggerID" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0d42d620980962950546b1c7090ed88e67a179c5 Travis: [CANCELED](https://travis-ci.com/github/flink-ci/flink/builds/160027473) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392) 
   * a8ae7d9df47cc47da6abf9f91321b665213a1482 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160035358) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7396) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-612818039
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160027473",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160027473",
       "triggerID" : "612843367",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392",
       "triggerID" : "612843367",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "status" : "SUCCESS",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7396",
       "triggerID" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160035358",
       "triggerID" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a8ae7d9df47cc47da6abf9f91321b665213a1482 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160035358) Azure: [SUCCESS](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7396) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-612818039
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160027473",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "DELETED",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160027473",
       "triggerID" : "612843367",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "DELETED",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392",
       "triggerID" : "612843367",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "status" : "PENDING",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7396",
       "triggerID" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "triggerType" : "PUSH"
     }, {
       "hash" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "status" : "SUCCESS",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160035358",
       "triggerID" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * a8ae7d9df47cc47da6abf9f91321b665213a1482 Travis: [SUCCESS](https://travis-ci.com/github/flink-ci/flink/builds/160035358) Azure: [PENDING](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7396) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-612818039
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160027473",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0d42d620980962950546b1c7090ed88e67a179c5 Travis: [FAILURE](https://travis-ci.com/github/flink-ci/flink/builds/160027473) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
flinkbot commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-612812558
 
 
   Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
   to review your pull request. We will use this comment to track the progress of the review.
   
   
   ## Automated Checks
   Last check on commit 0d42d620980962950546b1c7090ed88e67a179c5 (Mon Apr 13 08:48:12 UTC 2020)
   
   **Warnings:**
    * No documentation files were touched! Remember to keep the Flink docs up to date!
   
   
   <sub>Mention the bot in a comment to re-run the automated checks.</sub>
   ## Review Progress
   
   * ❓ 1. The [description] looks good.
   * ❓ 2. There is [consensus] that the contribution should go into to Flink.
   * ❓ 3. Needs [attention] from.
   * ❓ 4. The change fits into the overall [architecture].
   * ❓ 5. Overall code [quality] is good.
   
   Please see the [Pull Request Review Guide](https://flink.apache.org/contributing/reviewing-prs.html) for a full explanation of the review process.<details>
    The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot approve description` to approve one or more aspects (aspects: `description`, `consensus`, `architecture` and `quality`)
    - `@flinkbot approve all` to approve all aspects
    - `@flinkbot approve-until architecture` to approve everything until `architecture`
    - `@flinkbot attention @username1 [@username2 ..]` to require somebody's attention
    - `@flinkbot disapprove architecture` to remove an approval you gave earlier
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] TisonKun closed pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
TisonKun closed pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409334611
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   > However, in such case, we should wait for the load balancer ready and return the `EXTERNAL-IP:8081` as JobManager url. Otherwise, the Flink client will timeout and should clean-up the cluster resources.
   
   Currently, we clean up the K8s resources once errors happen during the creation of the `Deployment`, `ConfigMap`, and `Service`; we throw an Exception instead of cleaning up all the resources if we failed to retrieve the Endpoint, which I think is reasonable since the external Service is a bypass to some extent.
   
   Regarding whether the `Service` is ready, previously I left a question at https://github.com/apache/flink/pull/11233, I will file another ticket for further discussion and fixup.
   
   > Then for unmanaged K8s cluster without load balancer, we will enforce the users to set kubernetes.rest-service.exposed.type=NodePort explicitly, rather that return a confusing JobManager url MASTER_ADDRESS:8081. Also the submission will always fail with timeout.
   
   Sorry that I don't understand the core problem here. It would be nice if you provide more information.
   
   Lastly, I was planning to rework the implementation of `Fabric8FlinkKubeClient#getRestEndpoint` to clearly separating the functionality of `NodePort` and `LB` Service, meanwhile fix some bugs in that method. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-612818039
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "FAILURE",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160027473",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160027473",
       "triggerID" : "612843367",
       "triggerType" : "MANUAL"
     }, {
       "hash" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "a8ae7d9df47cc47da6abf9f91321b665213a1482",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0d42d620980962950546b1c7090ed88e67a179c5 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160027473) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392) 
   * a8ae7d9df47cc47da6abf9f91321b665213a1482 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409299161
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   If the cluster does not provide the LoadBalancer, I think we should throw Exception instead of return the NodePort URL. Can we separate the Service type more clearly?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-614359006
 
 
   > Thanks for your contribution @zhengcanbin !
   > 
   > Is there a test that fails without this patch and passes with this patch so that we guard the behaviour?
   
   Thanks, @TisonKun! Sorry for the late reply. We have a case described in [FLINK-16598](uhttps://issues.apache.org/jira/browse/FLINK-16598rl).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-612843401
 
 
   @flinkbot run azure

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409308841
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   I have no idea if you really want to completely separate the `LoadBalancer` and `NodePort`. However, in such case, we should wait for the load balancer ready and return the `EXTERNAL-IP:8081` as JobManager url. Otherwise, the Flink client will timeout and should clean-up the cluster resources.
   
   Then for unmanaged K8s cluster without load balancer, we will enforce the users to set `kubernetes.rest-service.exposed.type=NodePort` explicitly, rather that return a confusing JobManager url `MASTER_ADDRESS:8081`. Also the submission will always fail with timeout.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409380796
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   > Glad to hear that you will create a new ticket to rework the `getRestEndpoint` according to `LoadBalacer` or `NodePort`. I think this change makes sense to me.
   > 
   > Currently, the problem in the unmanaged K8s cluster without LD is Flink client print a wrong JobManager url, which is in such schema `K8s_Master:8081`, it should be `K8s_Master:NodePort`. I think this could be fixed in your upcoming tickets.
   
   The fix for this problem is in the plan.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409330048
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   I know your concern and i do not insist on the fallback logics. What i am talking is how to make separating `LoadBalancer` and `NodePort` more reasonable. 
   
   If the user set the `kubernetes.rest-service.exposed.type=LoadBalancer`, then we should wait for the load balancer ready or timeout, just as you say, we should not print a confusing JobManager url in the log. This is the current problem.
   
   About the default value of `kubernetes.rest-service.exposed.type`, it needs a wider discussion.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-614379194
 
 
   > Thanks for your contribution @zhengcanbin !
   > 
   > Is there a test that fails without this patch and passes with this patch so that we guard the behaviour?
   
   Thanks, @TisonKun! `Fabric8FlinkKubeClientTest#testServiceLoadBalancerWithNoIP` and `Fabric8FlinkKubeClientTest#testClusterIPService` are exactly the cases covering this test branch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409302511
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   Since `LoadBalancer` is a superset of `NodePort`, so when the service exposed type is `LoadBalancer`, K8s will always create `NodePort`. In my opinion, returning a node port url when the `LoadBalancer` is not available is a reasonable solution. And i believe that many users are running in unmanaged K8s cluster without load balancer. Now they have to explicitly set the `kubernetes.rest-service.exposed.type=NodePort`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409299161
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   If the cluster does not provide the LoadBalancer, I think we should throw an Exception instead of return the NodePort URL since this is not the expected behaviour. Can we separate the Service type more clearly?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409319298
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   > Since `LoadBalancer` is a superset of `NodePort`, so when the service exposed type is `LoadBalancer`, K8s will always create `NodePort`.
   
   Yes, you are right!
   
   > In my opinion, returning a node port url when the `LoadBalancer` is not available is a reasonable solution. And i believe that many users are running in unmanaged K8s cluster without load balancer. Now they have to explicitly set the `kubernetes.rest-service.exposed.type=NodePort`.
   
   I tend to think differently. If people use the LB Service, then the LB should work as expected, otherwise, we should throw Exceptions when retrieving the Endpoint to tell the users that the LB is unready or abnormal. 
   Imagine what could happen if we fall back to the NodePort in such a situation, the jobs may never be submitted to the cluster since the Kubernetes node IP/address are not accessible in the submission client due to network security policy, which could be the usual cases, especially on Cloud. 
   Therefore, such toleration or fallback could confuse the users and they may need to dive into the code to get what really happened behind.
   If the users run their workload on K8s clusters without LB, then it is reasonable for them to explicitly set the `kubernetes.rest-service.exposed.type=NodePort`.
   
   Another concern, maybe we'd better change the default value of the  `kubernetes.rest-service.exposed.type` to NodePort or ClusterIP.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin commented on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-612843367
 
 
   @flinkbot run travis

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
zhengcanbin commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409319298
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   > Since `LoadBalancer` is a superset of `NodePort`, so when the service exposed type is `LoadBalancer`, K8s will always create `NodePort`.
   
   Yes, you are right!
   
   > In my opinion, returning a node port url when the `LoadBalancer` is not available is a reasonable solution. And i believe that many users are running in unmanaged K8s cluster without load balancer. Now they have to explicitly set the `kubernetes.rest-service.exposed.type=NodePort`.
   
   I tend to think differently. If people use the LB Service, then the LB should work as expected, otherwise, we should throw Exceptions when retrieving the Endpoint to tell the users that the LB is unready or abnormal. 
   
   Imagine what could happen if we fall back to the NodePort in such a situation, the jobs may never be submitted to the cluster since the Kubernetes node IP/address are not accessible in the submission client due to network security policy, which could be the usual cases, especially on Cloud. 
   
   Therefore, such toleration or fallback could confuse the users and they may need to dive into the code to get what really happened behind.
   
   If the users run their workload on K8s clusters without LB, then it is reasonable for them to explicitly set the `kubernetes.rest-service.exposed.type=NodePort`.
   
   Another concern, maybe we'd better change the default value of the  `kubernetes.rest-service.exposed.type` to NodePort or ClusterIP.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
wangyang0918 commented on a change in pull request #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#discussion_r409295087
 
 

 ##########
 File path: flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/Fabric8FlinkKubeClient.java
 ##########
 @@ -264,17 +263,32 @@ private void setOwnerReference(Deployment deployment, List<HasMetadata> resource
 	}
 
 	/**
-	 * To get nodePort of configured ports.
+	 * Get rest port from the external Service.
 	 */
-	private int getServiceNodePort(Service service, ConfigOption<Integer> configPort) {
-		final int port = this.flinkConfig.getInteger(configPort);
-		if (service.getSpec() != null && service.getSpec().getPorts() != null) {
-			for (ServicePort p : service.getSpec().getPorts()) {
-				if (p.getPort() == port) {
-					return p.getNodePort();
-				}
-			}
+	private int getRestPortFromExternalService(Service externalService) {
+		final List<ServicePort> servicePortCandidates = externalService.getSpec().getPorts()
+			.stream()
+			.filter(x -> x.getName().equals(Constants.REST_PORT_NAME))
+			.collect(Collectors.toList());
+
+		if (servicePortCandidates.isEmpty()) {
+			throw new RuntimeException("Failed to find port \"" + Constants.REST_PORT_NAME + "\" in Service \"" +
+				KubernetesUtils.getRestServiceName(this.clusterId) + "\"");
+		}
+
+		final ServicePort externalServicePort = servicePortCandidates.get(0);
+
+		final KubernetesConfigOptions.ServiceExposedType externalServiceType =
+			KubernetesConfigOptions.ServiceExposedType.valueOf(externalService.getSpec().getType());
+
+		switch (externalServiceType) {
+			case ClusterIP:
+			case LoadBalancer:
 
 Review comment:
   I think we change the behavior here. When the `REST_SERVICE_EXPOSED_TYPE` is `LoadBalancer` and the cluster do not provide the loadbalancer, then a NodePort url should be returned. After this change, it could not work now.
   
   This change will also make the K8s session e2e tests failed.
   
   @zhengcanbin @TisonKun Could you please have a look?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [flink] flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint

Posted by GitBox <gi...@apache.org>.
flinkbot edited a comment on issue #11715: [FLINK-16598][k8s] Respect the rest-port exposed by the external Service when retrieving Endpoint
URL: https://github.com/apache/flink/pull/11715#issuecomment-612818039
 
 
   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "PENDING",
       "url" : "https://travis-ci.com/github/flink-ci/flink/builds/160027473",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     }, {
       "hash" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "status" : "FAILURE",
       "url" : "https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392",
       "triggerID" : "0d42d620980962950546b1c7090ed88e67a179c5",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 0d42d620980962950546b1c7090ed88e67a179c5 Travis: [PENDING](https://travis-ci.com/github/flink-ci/flink/builds/160027473) Azure: [FAILURE](https://dev.azure.com/rmetzger/5bd3ef0a-4359-41af-abca-811b04098d2e/_build/results?buildId=7392) 
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run travis` re-run the last Travis build
    - `@flinkbot run azure` re-run the last Azure build
   </details>

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services