You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by "RocMarshal (via GitHub)" <gi...@apache.org> on 2023/05/11 15:19:21 UTC

[GitHub] [incubator-streampark] RocMarshal opened a new pull request, #2744: [Issue-2535] Refactor FlinkClusterService.

RocMarshal opened a new pull request, #2744:
URL: https://github.com/apache/incubator-streampark/pull/2744

   <!--
   Thank you for contributing to StreamPark! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   ## Contribution Checklist
   
     - If this is your first time, please read our contributor guidelines: [Submit Code](https://streampark.apache.org/community/submit_guide/submit_code).
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-streampark/issues).
   
     - Name the pull request in the form "[Feature] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
   
     - If the PR is unfinished, add `[WIP]` in your PR title, e.g., `[WIP][Feature] Title of the pull request`.
   
   -->
   
   ## What changes were proposed in this pull request
   
   Issue Number: close #2535  <!-- REMOVE this line if no issue to close -->
   
   <!--(For example: This pull request proposed to add checkstyle plugin).-->
   
   ## Brief change log
   
   Refactor FlinkClusterService.
   
   ## Verifying this change
   
   <!--*(Please pick either of the following options)*-->
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   
   ## Does this pull request potentially affect one of the following parts
    - Dependencies (does it add or upgrade a dependency): (yes / **no**)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys merged pull request #2744: [Issue-2535] Refactor FlinkClusterService.

Posted by "wolfboys (via GitHub)" <gi...@apache.org>.
wolfboys merged PR #2744:
URL: https://github.com/apache/incubator-streampark/pull/2744


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] RocMarshal commented on pull request #2744: [Issue-2535] Refactor FlinkClusterService.

Posted by "RocMarshal (via GitHub)" <gi...@apache.org>.
RocMarshal commented on PR #2744:
URL: https://github.com/apache/incubator-streampark/pull/2744#issuecomment-1545668998

   Hi, @zhoulii ,Thanks a lot for the review.
   I made some change based on your comments. Looking forward your next review in your free time. 
   Thx~!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] zhoulii commented on a diff in pull request #2744: [Issue-2535] Refactor FlinkClusterService.

Posted by "zhoulii (via GitHub)" <gi...@apache.org>.
zhoulii commented on code in PR #2744:
URL: https://github.com/apache/incubator-streampark/pull/2744#discussion_r1191820116


##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java:
##########
@@ -422,4 +321,91 @@ private boolean isYarnNotDefaultQueue(FlinkCluster cluster) {
     return ExecutionMode.isYarnSessionMode(cluster.getExecutionModeEnum())
         && !yarnQueueService.isDefaultQueue(cluster.getYarnQueue());
   }
+
+  private ShutDownResponse shutdownInternal(FlinkCluster flinkCluster, String clusterId)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    ShutDownRequest stopRequest =
+        new ShutDownRequest(
+            flinkEnvService.getById(flinkCluster.getVersionId()).getFlinkVersion(),
+            flinkCluster.getExecutionModeEnum(),
+            flinkCluster.getProperties(),
+            clusterId,
+            getExtendDeployDesc(flinkCluster, "shutdown"));
+    Future<ShutDownResponse> future =
+        executorService.submit(() -> FlinkClient.shutdown(stopRequest));
+    return future.get(60, TimeUnit.SECONDS);
+  }
+
+  private DeployResponse startInternal(FlinkCluster flinkCluster)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    DeployRequest deployRequest =
+        new DeployRequest(
+            flinkEnvService.getById(flinkCluster.getVersionId()).getFlinkVersion(),
+            flinkCluster.getExecutionModeEnum(),
+            flinkCluster.getProperties(),
+            flinkCluster.getClusterId(),
+            getExtendDeployDesc(flinkCluster, "start"));
+    log.info("Deploy cluster request " + deployRequest);
+    Future<DeployResponse> future = executorService.submit(() -> FlinkClient.deploy(deployRequest));
+    return future.get(60, TimeUnit.SECONDS);
+  }
+
+  private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
+    if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())) {
+      ApiAlertException.throwIfFalse(
+          ClusterState.STARTED.getValue().equals((flinkCluster.getClusterState())),
+          "Current cluster is not active, please check!");
+      if (!flinkCluster.verifyClusterConnection()) {
+        flinkCluster.setAddress(null);
+        flinkCluster.setClusterState(ClusterState.LOST.getValue());
+        updateById(flinkCluster);
+        throw new ApiAlertException("Current cluster is not active, please check!");
+      }
+    }
+  }
+
+  private void updateCluster(FlinkCluster cluster, FlinkCluster flinkCluster) {
+    flinkCluster.setClusterName(cluster.getClusterName());
+    flinkCluster.setDescription(cluster.getDescription());
+    if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {
+      flinkCluster.setAddress(cluster.getAddress());
+    } else {
+      flinkCluster.setAddress(null);
+      flinkCluster.setClusterId(cluster.getClusterId());
+      flinkCluster.setVersionId(cluster.getVersionId());
+      flinkCluster.setDynamicProperties(cluster.getDynamicProperties());
+      flinkCluster.setOptions(cluster.getOptions());
+      flinkCluster.setResolveOrder(cluster.getResolveOrder());
+      flinkCluster.setK8sHadoopIntegration(cluster.getK8sHadoopIntegration());
+      flinkCluster.setK8sConf(cluster.getK8sConf());
+      flinkCluster.setK8sNamespace(cluster.getK8sNamespace());
+      flinkCluster.setK8sRestExposedType(cluster.getK8sRestExposedType());
+      flinkCluster.setServiceAccount(cluster.getServiceAccount());
+      flinkCluster.setFlinkImage(cluster.getFlinkImage());
+      flinkCluster.setYarnQueue(cluster.getYarnQueue());
+    }
+  }
+
+  @Nullable
+  private KubernetesDeployParam getExtendDeployDesc(

Review Comment:
   Since this method is only used for get deploy params for kubernetes mode, maybe we can name it `getKubernetesDeployDesc` ?



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java:
##########
@@ -153,54 +153,21 @@ public Boolean create(FlinkCluster flinkCluster) {
   public void start(FlinkCluster cluster) {
     FlinkCluster flinkCluster = getById(cluster.getId());
     try {
-      ExecutionMode executionModeEnum = flinkCluster.getExecutionModeEnum();
-      KubernetesDeployParam kubernetesDeployParam = null;
-      switch (executionModeEnum) {
-        case YARN_SESSION:
-          break;
-        case KUBERNETES_NATIVE_SESSION:
-          kubernetesDeployParam =
-              new KubernetesDeployParam(
-                  flinkCluster.getClusterId(),
-                  flinkCluster.getK8sNamespace(),
-                  flinkCluster.getK8sConf(),
-                  flinkCluster.getServiceAccount(),
-                  flinkCluster.getFlinkImage(),
-                  flinkCluster.getK8sRestExposedTypeEnum());
-          break;
-        default:
-          throw new ApiAlertException(
-              "the ExecutionModeEnum " + executionModeEnum.getName() + "can't start!");
-      }
-      FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
-      DeployRequest deployRequest =
-          new DeployRequest(
-              flinkEnv.getFlinkVersion(),
-              executionModeEnum,
-              flinkCluster.getProperties(),
-              flinkCluster.getClusterId(),
-              kubernetesDeployParam);
-      log.info("deploy cluster request " + deployRequest);
-      Future<DeployResponse> future =
-          executorService.submit(() -> FlinkClient.deploy(deployRequest));
-      DeployResponse deployResponse = future.get(60, TimeUnit.SECONDS);
-      if (deployResponse != null) {
-        if (ExecutionMode.YARN_SESSION.equals(executionModeEnum)) {
-          String address =
-              YarnUtils.getRMWebAppURL() + "/proxy/" + deployResponse.clusterId() + "/";
-          flinkCluster.setAddress(address);
-        } else {
-          flinkCluster.setAddress(deployResponse.address());
-        }
-        flinkCluster.setClusterId(deployResponse.clusterId());
-        flinkCluster.setClusterState(ClusterState.STARTED.getValue());
-        flinkCluster.setException(null);
-        updateById(flinkCluster);
-        FlinkRESTAPIWatcher.removeFlinkCluster(flinkCluster);
+      DeployResponse deployResponse = startInternal(flinkCluster);
+      ApiAlertException.throwIfNull(
+          deployResponse,
+          "Deploy cluster failed, unknown reason,please check you params or StreamPark error log");
+      if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())) {

Review Comment:
   ```suggestion
         if (ExecutionMode.isYarnSessionMode(flinkCluster.getExecutionModeEnum())) {
   ```



##########
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java:
##########
@@ -422,4 +321,91 @@ private boolean isYarnNotDefaultQueue(FlinkCluster cluster) {
     return ExecutionMode.isYarnSessionMode(cluster.getExecutionModeEnum())
         && !yarnQueueService.isDefaultQueue(cluster.getYarnQueue());
   }
+
+  private ShutDownResponse shutdownInternal(FlinkCluster flinkCluster, String clusterId)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    ShutDownRequest stopRequest =
+        new ShutDownRequest(
+            flinkEnvService.getById(flinkCluster.getVersionId()).getFlinkVersion(),
+            flinkCluster.getExecutionModeEnum(),
+            flinkCluster.getProperties(),
+            clusterId,
+            getExtendDeployDesc(flinkCluster, "shutdown"));
+    Future<ShutDownResponse> future =
+        executorService.submit(() -> FlinkClient.shutdown(stopRequest));
+    return future.get(60, TimeUnit.SECONDS);
+  }
+
+  private DeployResponse startInternal(FlinkCluster flinkCluster)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    DeployRequest deployRequest =
+        new DeployRequest(
+            flinkEnvService.getById(flinkCluster.getVersionId()).getFlinkVersion(),
+            flinkCluster.getExecutionModeEnum(),
+            flinkCluster.getProperties(),
+            flinkCluster.getClusterId(),
+            getExtendDeployDesc(flinkCluster, "start"));
+    log.info("Deploy cluster request " + deployRequest);
+    Future<DeployResponse> future = executorService.submit(() -> FlinkClient.deploy(deployRequest));
+    return future.get(60, TimeUnit.SECONDS);
+  }
+
+  private void checkActiveIfNeeded(FlinkCluster flinkCluster) {
+    if (ExecutionMode.YARN_SESSION.equals(flinkCluster.getExecutionModeEnum())) {
+      ApiAlertException.throwIfFalse(
+          ClusterState.STARTED.getValue().equals((flinkCluster.getClusterState())),
+          "Current cluster is not active, please check!");
+      if (!flinkCluster.verifyClusterConnection()) {
+        flinkCluster.setAddress(null);
+        flinkCluster.setClusterState(ClusterState.LOST.getValue());
+        updateById(flinkCluster);
+        throw new ApiAlertException("Current cluster is not active, please check!");
+      }
+    }
+  }
+
+  private void updateCluster(FlinkCluster cluster, FlinkCluster flinkCluster) {
+    flinkCluster.setClusterName(cluster.getClusterName());
+    flinkCluster.setDescription(cluster.getDescription());
+    if (ExecutionMode.REMOTE.equals(flinkCluster.getExecutionModeEnum())) {

Review Comment:
   ```suggestion
       if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org