You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/30 14:10:32 UTC

[incubator-streampark] branch dev updated: [improve] history upload-jars improvement (#2114)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 4f7a3ed9b [improve] history upload-jars improvement (#2114)
4f7a3ed9b is described below

commit 4f7a3ed9b0bcf084bc1874a21d28c85d76c6e3a5
Author: benjobs <be...@apache.org>
AuthorDate: Wed Nov 30 22:10:27 2022 +0800

    [improve] history upload-jars improvement (#2114)
    
    * [improve] history upload-jars improvement
---
 .../controller/ApplicationHistoryController.java   | 27 ++++------
 .../console/core/mapper/ApplicationMapper.java     | 12 ++---
 .../core/service/ApplicationHistoryService.java    | 28 ----------
 .../console/core/service/ApplicationService.java   | 14 +++++
 .../impl/ApplicationHistoryServiceImpl.java        | 59 ----------------------
 .../core/service/impl/ApplicationServiceImpl.java  | 48 ++++++++++++++++++
 .../streampark-console-webapp/index.html           | 15 +++---
 7 files changed, 85 insertions(+), 118 deletions(-)

diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
index 1bdb9a82e..e97c00de0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
@@ -18,10 +18,8 @@
 package org.apache.streampark.console.core.controller;
 
 import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.core.mapper.ApplicationMapper;
-import org.apache.streampark.console.core.service.ApplicationHistoryService;
+import org.apache.streampark.console.core.service.ApplicationService;
 
 import io.swagger.annotations.Api;
 import lombok.extern.slf4j.Slf4j;
@@ -42,27 +40,20 @@ import java.util.List;
 @RequestMapping("flink/history")
 public class ApplicationHistoryController {
 
-    private static final int DEFAULT_HISTORY_RECORD_LIMIT = 25;
-
-    private static final int DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT = 5;
-
-    @Autowired
-    private ApplicationHistoryService applicationHistoryService;
-
     @Autowired
-    private ApplicationMapper applicationMapper;
+    private ApplicationService applicationService;
 
     @PostMapping("uploadJars")
     @RequiresPermissions("app:create")
     public RestResponse listUploadJars() {
-        List<String> jars = applicationHistoryService.listUploadJars(StorageType.LFS, DEFAULT_HISTORY_RECORD_LIMIT);
+        List<String> jars = applicationService.historyUploadJars();
         return RestResponse.success(jars);
     }
 
     @PostMapping("k8sNamespaces")
     @RequiresPermissions("app:create")
     public RestResponse listK8sNamespace() {
-        List<String> namespaces = applicationMapper.getRecentK8sNamespace(DEFAULT_HISTORY_RECORD_LIMIT);
+        List<String> namespaces = applicationService.getRecentK8sNamespace();
         return RestResponse.success(namespaces);
     }
 
@@ -74,7 +65,7 @@ public class ApplicationHistoryController {
             case KUBERNETES_NATIVE_SESSION:
             case YARN_SESSION:
             case REMOTE:
-                clusterIds = applicationMapper.getRecentK8sClusterId(executionMode, DEFAULT_HISTORY_RECORD_LIMIT);
+                clusterIds = applicationService.getRecentK8sClusterId(executionMode);
                 break;
             default:
                 clusterIds = new ArrayList<>(0);
@@ -86,28 +77,28 @@ public class ApplicationHistoryController {
     @PostMapping("flinkBaseImages")
     @RequiresPermissions("app:create")
     public RestResponse listFlinkBaseImage() {
-        List<String> images = applicationMapper.getRecentFlinkBaseImage(DEFAULT_HISTORY_RECORD_LIMIT);
+        List<String> images = applicationService.getRecentFlinkBaseImage();
         return RestResponse.success(images);
     }
 
     @PostMapping("flinkPodTemplates")
     @RequiresPermissions("app:create")
     public RestResponse listPodTemplate() {
-        List<String> templates = applicationMapper.getRecentK8sPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+        List<String> templates = applicationService.getRecentK8sPodTemplate();
         return RestResponse.success(templates);
     }
 
     @PostMapping("flinkJmPodTemplates")
     @RequiresPermissions("app:create")
     public RestResponse listJmPodTemplate() {
-        List<String> templates = applicationMapper.getRecentK8sJmPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+        List<String> templates = applicationService.getRecentK8sJmPodTemplate();
         return RestResponse.success(templates);
     }
 
     @PostMapping("flinkTmPodTemplates")
     @RequiresPermissions("app:create")
     public RestResponse listTmPodTemplate() {
-        List<String> templates = applicationMapper.getRecentK8sTmPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+        List<String> templates = applicationService.getRecentK8sTmPodTemplate();
         return RestResponse.success(templates);
     }
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index e8ed4bed0..f6d94dc97 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -38,17 +38,17 @@ public interface ApplicationMapper extends BaseMapper<Application> {
 
     boolean mapping(@Param("application") Application appParam);
 
-    List<String> getRecentK8sNamespace(@Param("limitSize") int limit);
+    List<String> getRecentK8sNamespace(@Param("limitSize") Integer limit);
 
-    List<String> getRecentK8sClusterId(@Param("executionMode") int executionMode, @Param("limitSize") int limit);
+    List<String> getRecentK8sClusterId(@Param("executionMode") Integer executionMode, @Param("limitSize") Integer limit);
 
-    List<String> getRecentFlinkBaseImage(@Param("limitSize") int limit);
+    List<String> getRecentFlinkBaseImage(@Param("limitSize") Integer limit);
 
-    List<String> getRecentK8sPodTemplate(@Param("limitSize") int limit);
+    List<String> getRecentK8sPodTemplate(@Param("limitSize") Integer limit);
 
-    List<String> getRecentK8sJmPodTemplate(@Param("limitSize") int limit);
+    List<String> getRecentK8sJmPodTemplate(@Param("limitSize") Integer limit);
 
-    List<String> getRecentK8sTmPodTemplate(@Param("limitSize") int limit);
+    List<String> getRecentK8sTmPodTemplate(@Param("limitSize") Integer limit);
 
     void resetOptionState();
 
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationHistoryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationHistoryService.java
deleted file mode 100644
index 51d5216f6..000000000
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationHistoryService.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service;
-
-import org.apache.streampark.common.enums.StorageType;
-
-import java.util.List;
-
-public interface ApplicationHistoryService {
-
-    List<String> listUploadJars(StorageType storageType, int limit);
-
-}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 274ab0d13..f1aa9e2c7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -101,4 +101,18 @@ public interface ApplicationService extends IService<Application> {
     boolean existsRunningJobByClusterId(Long clusterId);
 
     boolean existsJobByClusterId(Long id);
+
+    List<String> getRecentK8sNamespace();
+
+    List<String> getRecentK8sClusterId(Integer executionMode);
+
+    List<String> getRecentFlinkBaseImage();
+
+    List<String> getRecentK8sPodTemplate();
+
+    List<String> getRecentK8sJmPodTemplate();
+
+    List<String> getRecentK8sTmPodTemplate();
+
+    List<String> historyUploadJars();
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationHistoryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationHistoryServiceImpl.java
deleted file mode 100644
index 590b9e7d3..000000000
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationHistoryServiceImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service.impl;
-
-import static org.apache.streampark.common.enums.StorageType.LFS;
-
-import org.apache.streampark.common.conf.Workspace;
-import org.apache.streampark.common.enums.StorageType;
-import org.apache.streampark.common.fs.LfsOperator;
-import org.apache.streampark.console.core.service.ApplicationHistoryService;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Slf4j
-@Service
-public class ApplicationHistoryServiceImpl implements ApplicationHistoryService {
-
-    @Override
-    public List<String> listUploadJars(StorageType storageType, int limit) {
-        switch (storageType) {
-            case LFS:
-                return Arrays.stream(LfsOperator.listDir(Workspace.of(LFS).APP_UPLOADS()))
-                    .filter(File::isFile)
-                    .sorted(Comparator.comparingLong(File::lastModified).reversed())
-                    .map(File::getName)
-                    .filter(fn -> fn.endsWith(".jar"))
-                    .limit(limit)
-                    .collect(Collectors.toList());
-            case HDFS:
-                // temporarily does not provide support for hdfs.
-            default:
-                return new ArrayList<>(0);
-        }
-    }
-
-}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 87e6601f1..b851f3cd7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import static org.apache.streampark.common.enums.StorageType.LFS;
 import static org.apache.streampark.console.core.task.K8sFlinkTrackMonitorWrapper.Bridge.toTrackId;
 import static org.apache.streampark.console.core.task.K8sFlinkTrackMonitorWrapper.isKubernetesApp;
 
@@ -29,6 +30,7 @@ import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.enums.StorageType;
 import org.apache.streampark.common.fs.HdfsOperator;
+import org.apache.streampark.common.fs.LfsOperator;
 import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
@@ -127,6 +129,7 @@ import java.io.Serializable;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Base64;
+import java.util.Comparator;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -153,6 +156,10 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
 
     private final Map<Long, Boolean> tailBeginning = new ConcurrentHashMap<>();
 
+    private static final int DEFAULT_HISTORY_RECORD_LIMIT = 25;
+
+    private static final int DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT = 5;
+
     private final ExecutorService executorService = new ThreadPoolExecutor(
         Runtime.getRuntime().availableProcessors() * 5,
         Runtime.getRuntime().availableProcessors() * 10,
@@ -549,6 +556,47 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
         return baseMapper.existsJobByClusterId(clusterId);
     }
 
+    @Override
+    public List<String> getRecentK8sNamespace() {
+        return baseMapper.getRecentK8sNamespace(DEFAULT_HISTORY_RECORD_LIMIT);
+    }
+
+    @Override
+    public List<String> getRecentK8sClusterId(Integer executionMode) {
+        return baseMapper.getRecentK8sClusterId(executionMode, DEFAULT_HISTORY_RECORD_LIMIT);
+    }
+
+    @Override
+    public List<String> getRecentFlinkBaseImage() {
+        return baseMapper.getRecentFlinkBaseImage(DEFAULT_HISTORY_RECORD_LIMIT);
+    }
+
+    @Override
+    public List<String> getRecentK8sPodTemplate() {
+        return baseMapper.getRecentK8sPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+    }
+
+    @Override
+    public List<String> getRecentK8sJmPodTemplate() {
+        return baseMapper.getRecentK8sJmPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+    }
+
+    @Override
+    public List<String> getRecentK8sTmPodTemplate() {
+        return baseMapper.getRecentK8sTmPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+    }
+
+    @Override
+    public List<String> historyUploadJars() {
+        return Arrays.stream(LfsOperator.listDir(Workspace.of(LFS).APP_UPLOADS()))
+            .filter(File::isFile)
+            .sorted(Comparator.comparingLong(File::lastModified).reversed())
+            .map(File::getName)
+            .filter(fn -> fn.endsWith(".jar"))
+            .limit(DEFAULT_HISTORY_RECORD_LIMIT)
+            .collect(Collectors.toList());
+    }
+
     @Override
     public String getYarnName(Application appParam) {
         String[] args = new String[2];
diff --git a/streampark-console/streampark-console-webapp/index.html b/streampark-console/streampark-console-webapp/index.html
index 9e061e6ab..151c3bbc5 100644
--- a/streampark-console/streampark-console-webapp/index.html
+++ b/streampark-console/streampark-console-webapp/index.html
@@ -34,8 +34,8 @@
 <body>
   <script>
     (() => {
-      var htmlRoot = document.getElementById('htmlRoot');
-      var theme = window.localStorage.getItem('__APP__DARK__MODE__');
+      let htmlRoot = document.getElementById('htmlRoot');
+      let theme = window.localStorage.getItem('__APP__DARK__MODE__');
       if (htmlRoot && theme) {
         htmlRoot.setAttribute('data-theme', theme);
         theme = htmlRoot = null;
@@ -171,12 +171,13 @@
     </style>
     <div class="app-loading">
       <div class="app-loading-wrap">
-        <img src="/resource/img/logo.svg" class="app-loading-logo" alt="Logo" />
         <div class="app-loading-dots">
-          <span class="dot dot-spin"><i></i><i></i><i></i><i></i></span>
-        </div>
-        <div class="app-loading-title">
-          <%= title %>
+          <span class="dot dot-spin">
+            <i></i>
+            <i></i>
+            <i></i>
+            <i></i>
+          </span>
         </div>
       </div>
     </div>