You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by GitBox <gi...@apache.org> on 2023/01/05 16:25:58 UTC

[GitHub] [incubator-streampark] monrg opened a new pull request, #2229: [Bug] fix after flink 1.15 K8S session start&stop error

monrg opened a new pull request, #2229:
URL: https://github.com/apache/incubator-streampark/pull/2229

   
   
   ## What changes were proposed in this pull request
   
   Issue Number:  2188
   
   1. because after flink 1.15 FlinkKubeClient removed the getRestService method, Therefore, the logic to verify the existence of the getRestService method is added
   
   2.When the streampark shutdown the k8s session network exception, the response returned is null, the actual k8s session is closed, but the streampark is still in the started state, this scenario should be lost state
   
   
   
   ## Brief change log
   
   fix after flink 1.15 K8S session start&Stop error
   
   ## Verifying this change
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   This change added tests and can be verified as follows:
   
   - *Manually verified the change by testing locally.* -->
   
   ## Does this pull request potentially affect one of the following parts
    - Dependencies (does it add or upgrade a dependency): no
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #2229: [Bug] fix after flink 1.15 K8S session start&stop error

Posted by GitBox <gi...@apache.org>.
wolfboys commented on code in PR #2229:
URL: https://github.com/apache/incubator-streampark/pull/2229#discussion_r1063062543


##########
streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala:
##########
@@ -152,8 +152,9 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
       val kubernetesClusterDescriptor = getK8sClusterDescriptorAndSpecification(flinkConfig)
       clusterDescriptor = kubernetesClusterDescriptor._1
       kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
-
-      if (deployRequest.clusterId != null && kubeClient.getRestService(deployRequest.clusterId).isPresent) {
+      // after flink 1.15 FlinkKubeClient removed the getRestService method
+      if (deployRequest.clusterId != null && hasMethod(kubeClient.getClass, "getRestService")

Review Comment:
   This code logic is strange,
   
   e.g: `if ( a.hasMethod("bbb") && a.bbb() )`
   if a has no bbb method, Next, call the bbb method,  The code compiles will fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2229: [Bug] fix after flink 1.15 K8S session start&stop error

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2229:
URL: https://github.com/apache/incubator-streampark/pull/2229#discussion_r1063974750


##########
streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala:
##########
@@ -152,8 +152,9 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
       val kubernetesClusterDescriptor = getK8sClusterDescriptorAndSpecification(flinkConfig)
       clusterDescriptor = kubernetesClusterDescriptor._1
       kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
-
-      if (deployRequest.clusterId != null && kubeClient.getRestService(deployRequest.clusterId).isPresent) {
+      // after flink 1.15 FlinkKubeClient removed the getRestService method
+      if (deployRequest.clusterId != null && hasMethod(kubeClient.getClass, "getRestService")

Review Comment:
   Thanks @monrg 's contribution.
   
   It's similar to `FlinkClientTrait`, we can add the `FlinkKubeClientTrait` in the `streampark-flink-shims-base` module, it depends flink-1.14 by default.
   
   Every flink-shims extends the `FlinkKubeClientTrait` and call the `FlinkKubeClient#getRestService before flink 1.15` or the `FlinkKubeClient#getService since flink-1.15`.
   
   And KubernetesNativeSessionSubmit always calls the `FlinkKubeClientTrait#getService`, don't care call `getService` or `getRestService`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] 1996fanrui commented on a diff in pull request #2229: [Bug] fix after flink 1.15 K8S session start&stop error

Posted by GitBox <gi...@apache.org>.
1996fanrui commented on code in PR #2229:
URL: https://github.com/apache/incubator-streampark/pull/2229#discussion_r1063975108


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java:
##########
@@ -263,9 +263,7 @@ public void shutdown(FlinkCluster cluster) {
         || ExecutionMode.REMOTE.equals(executionModeEnum)) {
       if (ClusterState.STARTED.equals(ClusterState.of(flinkCluster.getClusterState()))) {
         if (!flinkCluster.verifyClusterConnection()) {
-          updateWrapper.set(FlinkCluster::getAddress, null);
-          updateWrapper.set(FlinkCluster::getClusterState, ClusterState.LOST.getValue());
-          update(updateWrapper);
+          updateClusterIsLost(updateWrapper);

Review Comment:
   There are some conflicts here, please rebase the dev branch first, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] monrg commented on a diff in pull request #2229: [Bug] fix after flink 1.15 K8S session start&stop error

Posted by GitBox <gi...@apache.org>.
monrg commented on code in PR #2229:
URL: https://github.com/apache/incubator-streampark/pull/2229#discussion_r1063969832


##########
streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/impl/KubernetesNativeSessionSubmit.scala:
##########
@@ -152,8 +152,9 @@ object KubernetesNativeSessionSubmit extends KubernetesNativeSubmitTrait with Lo
       val kubernetesClusterDescriptor = getK8sClusterDescriptorAndSpecification(flinkConfig)
       clusterDescriptor = kubernetesClusterDescriptor._1
       kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
-
-      if (deployRequest.clusterId != null && kubeClient.getRestService(deployRequest.clusterId).isPresent) {
+      // after flink 1.15 FlinkKubeClient removed the getRestService method
+      if (deployRequest.clusterId != null && hasMethod(kubeClient.getClass, "getRestService")

Review Comment:
   The compilation used is 1.14, the actual run will load the corresponding version of the dependency, after 1.15 removed the method getRestService, so it will fail. I will look at the detailed logic of flink again and implement it under optimization
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] monrg closed pull request #2229: [Bug] fix after flink 1.15 K8S session start&stop error

Posted by GitBox <gi...@apache.org>.
monrg closed pull request #2229: [Bug] fix after flink 1.15 K8S session start&stop error
URL: https://github.com/apache/incubator-streampark/pull/2229


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org