You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/30 14:10:32 UTC
[incubator-streampark] branch dev updated: [improve] history upload-jars improvement (#2114)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 4f7a3ed9b [improve] history upload-jars improvement (#2114)
4f7a3ed9b is described below
commit 4f7a3ed9b0bcf084bc1874a21d28c85d76c6e3a5
Author: benjobs <be...@apache.org>
AuthorDate: Wed Nov 30 22:10:27 2022 +0800
[improve] history upload-jars improvement (#2114)
* [improve] history upload-jars improvement
---
.../controller/ApplicationHistoryController.java | 27 ++++------
.../console/core/mapper/ApplicationMapper.java | 12 ++---
.../core/service/ApplicationHistoryService.java | 28 ----------
.../console/core/service/ApplicationService.java | 14 +++++
.../impl/ApplicationHistoryServiceImpl.java | 59 ----------------------
.../core/service/impl/ApplicationServiceImpl.java | 48 ++++++++++++++++++
.../streampark-console-webapp/index.html | 15 +++---
7 files changed, 85 insertions(+), 118 deletions(-)
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
index 1bdb9a82e..e97c00de0 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationHistoryController.java
@@ -18,10 +18,8 @@
package org.apache.streampark.console.core.controller;
import org.apache.streampark.common.enums.ExecutionMode;
-import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.console.base.domain.RestResponse;
-import org.apache.streampark.console.core.mapper.ApplicationMapper;
-import org.apache.streampark.console.core.service.ApplicationHistoryService;
+import org.apache.streampark.console.core.service.ApplicationService;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
@@ -42,27 +40,20 @@ import java.util.List;
@RequestMapping("flink/history")
public class ApplicationHistoryController {
- private static final int DEFAULT_HISTORY_RECORD_LIMIT = 25;
-
- private static final int DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT = 5;
-
- @Autowired
- private ApplicationHistoryService applicationHistoryService;
-
@Autowired
- private ApplicationMapper applicationMapper;
+ private ApplicationService applicationService;
@PostMapping("uploadJars")
@RequiresPermissions("app:create")
public RestResponse listUploadJars() {
- List<String> jars = applicationHistoryService.listUploadJars(StorageType.LFS, DEFAULT_HISTORY_RECORD_LIMIT);
+ List<String> jars = applicationService.historyUploadJars();
return RestResponse.success(jars);
}
@PostMapping("k8sNamespaces")
@RequiresPermissions("app:create")
public RestResponse listK8sNamespace() {
- List<String> namespaces = applicationMapper.getRecentK8sNamespace(DEFAULT_HISTORY_RECORD_LIMIT);
+ List<String> namespaces = applicationService.getRecentK8sNamespace();
return RestResponse.success(namespaces);
}
@@ -74,7 +65,7 @@ public class ApplicationHistoryController {
case KUBERNETES_NATIVE_SESSION:
case YARN_SESSION:
case REMOTE:
- clusterIds = applicationMapper.getRecentK8sClusterId(executionMode, DEFAULT_HISTORY_RECORD_LIMIT);
+ clusterIds = applicationService.getRecentK8sClusterId(executionMode);
break;
default:
clusterIds = new ArrayList<>(0);
@@ -86,28 +77,28 @@ public class ApplicationHistoryController {
@PostMapping("flinkBaseImages")
@RequiresPermissions("app:create")
public RestResponse listFlinkBaseImage() {
- List<String> images = applicationMapper.getRecentFlinkBaseImage(DEFAULT_HISTORY_RECORD_LIMIT);
+ List<String> images = applicationService.getRecentFlinkBaseImage();
return RestResponse.success(images);
}
@PostMapping("flinkPodTemplates")
@RequiresPermissions("app:create")
public RestResponse listPodTemplate() {
- List<String> templates = applicationMapper.getRecentK8sPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+ List<String> templates = applicationService.getRecentK8sPodTemplate();
return RestResponse.success(templates);
}
@PostMapping("flinkJmPodTemplates")
@RequiresPermissions("app:create")
public RestResponse listJmPodTemplate() {
- List<String> templates = applicationMapper.getRecentK8sJmPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+ List<String> templates = applicationService.getRecentK8sJmPodTemplate();
return RestResponse.success(templates);
}
@PostMapping("flinkTmPodTemplates")
@RequiresPermissions("app:create")
public RestResponse listTmPodTemplate() {
- List<String> templates = applicationMapper.getRecentK8sTmPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+ List<String> templates = applicationService.getRecentK8sTmPodTemplate();
return RestResponse.success(templates);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index e8ed4bed0..f6d94dc97 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -38,17 +38,17 @@ public interface ApplicationMapper extends BaseMapper<Application> {
boolean mapping(@Param("application") Application appParam);
- List<String> getRecentK8sNamespace(@Param("limitSize") int limit);
+ List<String> getRecentK8sNamespace(@Param("limitSize") Integer limit);
- List<String> getRecentK8sClusterId(@Param("executionMode") int executionMode, @Param("limitSize") int limit);
+ List<String> getRecentK8sClusterId(@Param("executionMode") Integer executionMode, @Param("limitSize") Integer limit);
- List<String> getRecentFlinkBaseImage(@Param("limitSize") int limit);
+ List<String> getRecentFlinkBaseImage(@Param("limitSize") Integer limit);
- List<String> getRecentK8sPodTemplate(@Param("limitSize") int limit);
+ List<String> getRecentK8sPodTemplate(@Param("limitSize") Integer limit);
- List<String> getRecentK8sJmPodTemplate(@Param("limitSize") int limit);
+ List<String> getRecentK8sJmPodTemplate(@Param("limitSize") Integer limit);
- List<String> getRecentK8sTmPodTemplate(@Param("limitSize") int limit);
+ List<String> getRecentK8sTmPodTemplate(@Param("limitSize") Integer limit);
void resetOptionState();
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationHistoryService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationHistoryService.java
deleted file mode 100644
index 51d5216f6..000000000
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationHistoryService.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service;
-
-import org.apache.streampark.common.enums.StorageType;
-
-import java.util.List;
-
-public interface ApplicationHistoryService {
-
- List<String> listUploadJars(StorageType storageType, int limit);
-
-}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 274ab0d13..f1aa9e2c7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -101,4 +101,18 @@ public interface ApplicationService extends IService<Application> {
boolean existsRunningJobByClusterId(Long clusterId);
boolean existsJobByClusterId(Long id);
+
+ List<String> getRecentK8sNamespace();
+
+ List<String> getRecentK8sClusterId(Integer executionMode);
+
+ List<String> getRecentFlinkBaseImage();
+
+ List<String> getRecentK8sPodTemplate();
+
+ List<String> getRecentK8sJmPodTemplate();
+
+ List<String> getRecentK8sTmPodTemplate();
+
+ List<String> historyUploadJars();
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationHistoryServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationHistoryServiceImpl.java
deleted file mode 100644
index 590b9e7d3..000000000
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationHistoryServiceImpl.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.streampark.console.core.service.impl;
-
-import static org.apache.streampark.common.enums.StorageType.LFS;
-
-import org.apache.streampark.common.conf.Workspace;
-import org.apache.streampark.common.enums.StorageType;
-import org.apache.streampark.common.fs.LfsOperator;
-import org.apache.streampark.console.core.service.ApplicationHistoryService;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Slf4j
-@Service
-public class ApplicationHistoryServiceImpl implements ApplicationHistoryService {
-
- @Override
- public List<String> listUploadJars(StorageType storageType, int limit) {
- switch (storageType) {
- case LFS:
- return Arrays.stream(LfsOperator.listDir(Workspace.of(LFS).APP_UPLOADS()))
- .filter(File::isFile)
- .sorted(Comparator.comparingLong(File::lastModified).reversed())
- .map(File::getName)
- .filter(fn -> fn.endsWith(".jar"))
- .limit(limit)
- .collect(Collectors.toList());
- case HDFS:
- // temporarily does not provide support for hdfs.
- default:
- return new ArrayList<>(0);
- }
- }
-
-}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 87e6601f1..b851f3cd7 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.impl;
+import static org.apache.streampark.common.enums.StorageType.LFS;
import static org.apache.streampark.console.core.task.K8sFlinkTrackMonitorWrapper.Bridge.toTrackId;
import static org.apache.streampark.console.core.task.K8sFlinkTrackMonitorWrapper.isKubernetesApp;
@@ -29,6 +30,7 @@ import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.ResolveOrder;
import org.apache.streampark.common.enums.StorageType;
import org.apache.streampark.common.fs.HdfsOperator;
+import org.apache.streampark.common.fs.LfsOperator;
import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.DeflaterUtils;
@@ -127,6 +129,7 @@ import java.io.Serializable;
import java.net.URI;
import java.util.Arrays;
import java.util.Base64;
+import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -153,6 +156,10 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
private final Map<Long, Boolean> tailBeginning = new ConcurrentHashMap<>();
+ private static final int DEFAULT_HISTORY_RECORD_LIMIT = 25;
+
+ private static final int DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT = 5;
+
private final ExecutorService executorService = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 5,
Runtime.getRuntime().availableProcessors() * 10,
@@ -549,6 +556,47 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
return baseMapper.existsJobByClusterId(clusterId);
}
+ @Override
+ public List<String> getRecentK8sNamespace() {
+ return baseMapper.getRecentK8sNamespace(DEFAULT_HISTORY_RECORD_LIMIT);
+ }
+
+ @Override
+ public List<String> getRecentK8sClusterId(Integer executionMode) {
+ return baseMapper.getRecentK8sClusterId(executionMode, DEFAULT_HISTORY_RECORD_LIMIT);
+ }
+
+ @Override
+ public List<String> getRecentFlinkBaseImage() {
+ return baseMapper.getRecentFlinkBaseImage(DEFAULT_HISTORY_RECORD_LIMIT);
+ }
+
+ @Override
+ public List<String> getRecentK8sPodTemplate() {
+ return baseMapper.getRecentK8sPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+ }
+
+ @Override
+ public List<String> getRecentK8sJmPodTemplate() {
+ return baseMapper.getRecentK8sJmPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+ }
+
+ @Override
+ public List<String> getRecentK8sTmPodTemplate() {
+ return baseMapper.getRecentK8sTmPodTemplate(DEFAULT_HISTORY_POD_TMPL_RECORD_LIMIT);
+ }
+
+ @Override
+ public List<String> historyUploadJars() {
+ return Arrays.stream(LfsOperator.listDir(Workspace.of(LFS).APP_UPLOADS()))
+ .filter(File::isFile)
+ .sorted(Comparator.comparingLong(File::lastModified).reversed())
+ .map(File::getName)
+ .filter(fn -> fn.endsWith(".jar"))
+ .limit(DEFAULT_HISTORY_RECORD_LIMIT)
+ .collect(Collectors.toList());
+ }
+
@Override
public String getYarnName(Application appParam) {
String[] args = new String[2];
diff --git a/streampark-console/streampark-console-webapp/index.html b/streampark-console/streampark-console-webapp/index.html
index 9e061e6ab..151c3bbc5 100644
--- a/streampark-console/streampark-console-webapp/index.html
+++ b/streampark-console/streampark-console-webapp/index.html
@@ -34,8 +34,8 @@
<body>
<script>
(() => {
- var htmlRoot = document.getElementById('htmlRoot');
- var theme = window.localStorage.getItem('__APP__DARK__MODE__');
+ let htmlRoot = document.getElementById('htmlRoot');
+ let theme = window.localStorage.getItem('__APP__DARK__MODE__');
if (htmlRoot && theme) {
htmlRoot.setAttribute('data-theme', theme);
theme = htmlRoot = null;
@@ -171,12 +171,13 @@
</style>
<div class="app-loading">
<div class="app-loading-wrap">
- <img src="/resource/img/logo.svg" class="app-loading-logo" alt="Logo" />
<div class="app-loading-dots">
- <span class="dot dot-spin"><i></i><i></i><i></i><i></i></span>
- </div>
- <div class="app-loading-title">
- <%= title %>
+ <span class="dot dot-spin">
+ <i></i>
+ <i></i>
+ <i></i>
+ <i></i>
+ </span>
</div>
</div>
</div>