You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/09/29 04:00:47 UTC
[incubator-streampark] branch dev updated: [Feature] Show the project build log in Web UI (#1673)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new b53fd6390 [Feature] Show the project build log in Web UI (#1673)
b53fd6390 is described below
commit b53fd63900ee7ae7c4cd7058070b190b73954aee
Author: 1996fanrui <19...@gmail.com>
AuthorDate: Thu Sep 29 12:00:41 2022 +0800
[Feature] Show the project build log in Web UI (#1673)
* [Feature] Show the project build log in Web UI
* address comments
* When the project is building, query the log periodically
* Update the style
---
.../streampark/common/conf/CommonConfig.scala | 6 +
.../apache/streampark/common/conf/Workspace.scala | 10 +
.../common/util/CompletableFutureUtils.scala | 20 +-
.../streampark/console/base/util/FileUtils.java | 71 ++++++
.../console/core/controller/ProjectController.java | 19 +-
.../console/core/service/ProjectService.java | 6 +-
.../core/service/impl/ProjectServiceImpl.java | 273 ++++-----------------
.../console/core/task/AbstractLogFileTask.java | 117 +++++++++
.../console/core/task/ProjectBuildTask.java | 240 ++++++++++++++++++
.../console/base/util/FileUtilsTest.java | 119 +++++++++
.../streampark-console-webapp/src/api/index.js | 1 -
.../streampark-console-webapp/src/api/project.js | 4 -
.../src/views/flink/project/View.vue | 70 ++----
13 files changed, 659 insertions(+), 297 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala
index fc4a6cdf7..9b89105d1 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/CommonConfig.scala
@@ -95,4 +95,10 @@ object CommonConfig {
classType = classOf[String],
description = "kerberos default ttl")
+ val READ_LOG_MAX_SIZE: InternalOption = InternalOption(
+ key = "streampark.read-log.max-size",
+ defaultValue = "1mb",
+ classType = classOf[String],
+ description = "The maximum size of the default read log")
+
}
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index 6a1b0f711..d8b918e26 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -112,5 +112,15 @@ case class Workspace(storageType: StorageType) {
*/
lazy val PROJECT_LOCAL_DIR = s"${Workspace.local.WORKSPACE}/project"
+ /**
+ * local log dir.
+ */
+ lazy val LOG_LOCAL_DIR = s"${Workspace.local.WORKSPACE}/logs"
+
+ /**
+ * project build log dir.
+ */
+ lazy val PROJECT_BUILD_LOG_DIR = s"$LOG_LOCAL_DIR/build_logs"
+
}
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala
index b5cb02c5b..40378bda5 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/CompletableFutureUtils.scala
@@ -72,11 +72,27 @@ object CompletableFutureUtils {
exceptionally: Consumer[Throwable]): CompletableFuture[Unit] = {
future.applyToEither(setTimeout(timeout, unit), new JavaFunc[T, Unit] {
override def apply(t: T): Unit = {
- handle.accept(t)
+ if (handle != null) {
+ handle.accept(t)
+ }
}
}).exceptionally(new JavaFunc[Throwable, Unit] {
override def apply(t: Throwable): Unit = {
- exceptionally.accept(t)
+ if (exceptionally != null) {
+ exceptionally.accept(t)
+ }
+ }
+ })
+ }
+
+ def runTimeout[T](future: CompletableFuture[T],
+ timeout: Long,
+ unit: TimeUnit): CompletableFuture[Unit] = {
+ runTimeout(future, timeout, unit, null, new Consumer[Throwable] {
+ override def accept(t: Throwable): Unit = {
+ if (!future.isDone) {
+ future.cancel(true)
+ }
}
})
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
new file mode 100644
index 000000000..1d4053da3
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/FileUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/**
+ * The file utils.
+ */
+public class FileUtils {
+
+ /**
+ * Read the end of the file.
+ *
+ * @param file The file
+ * @param maxSize Maximum size of read file
+ * @return The file content
+ * @throws IOException
+ */
+ public static byte[] readEndOfFile(File file, long maxSize) throws IOException {
+ long readSize = maxSize;
+ RandomAccessFile raFile = new RandomAccessFile(file, "r");
+ if (raFile.length() > maxSize) {
+ raFile.seek(raFile.length() - maxSize);
+ } else if (raFile.length() < maxSize) {
+ readSize = (int) raFile.length();
+ }
+ byte[] fileContent = new byte[(int) readSize];
+ raFile.read(fileContent);
+ return fileContent;
+ }
+
+ /**
+ * Read the end of the file.
+ *
+ * @param file The file
+ * @param maxSize Maximum size of read file
+ * @return The file content
+ * @throws IOException
+ */
+ public static byte[] readFileFromOffset(File file, long startOffset, long maxSize) throws IOException {
+ if (file.length() < startOffset) {
+ throw new IllegalArgumentException(
+ String.format("The startOffset %s is great than the file length %s", startOffset, file.length()));
+ }
+ RandomAccessFile raFile = new RandomAccessFile(file, "r");
+ long readSize = Math.min(maxSize, file.length() - startOffset);
+ raFile.seek(startOffset);
+ byte[] fileContent = new byte[(int) readSize];
+ raFile.read(fileContent);
+ return fileContent;
+ }
+
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java
index acc97e780..f0f56945c 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ProjectController.java
@@ -30,6 +30,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
@@ -64,23 +65,17 @@ public class ProjectController {
@PostMapping("build")
@RequiresPermissions("project:build")
- public RestResponse build(Long id, String socketId) throws Exception {
- projectService.build(id, socketId);
+ public RestResponse build(Long id) throws Exception {
+ projectService.build(id);
return RestResponse.success();
}
@PostMapping("buildlog")
@RequiresPermissions("project:build")
- public RestResponse buildLog(Long id) throws Exception {
- projectService.tailBuildLog(id);
- return RestResponse.success();
- }
-
- @PostMapping("closebuild")
- @RequiresPermissions("project:build")
- public RestResponse closeBuild(Long id) {
- projectService.closeBuildLog(id);
- return RestResponse.success();
+ public RestResponse buildLog(
+ Long id,
+ @RequestParam(value = "startOffset", required = false) Long startOffset) {
+ return projectService.getBuildLog(id, startOffset);
}
@PostMapping("list")
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProjectService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProjectService.java
index d05eb4d2d..a28c4e045 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProjectService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ProjectService.java
@@ -38,11 +38,9 @@ public interface ProjectService extends IService<Project> {
IPage<Project> page(Project project, RestRequest restRequest);
- void build(Long id, String socketId) throws Exception;
+ void build(Long id) throws Exception;
- void tailBuildLog(Long id);
-
- void closeBuildLog(Long id);
+ RestResponse getBuildLog(Long id, Long startOffset);
List<String> modules(Long id);
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 402242f33..655cf71ed 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -17,14 +17,18 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.conf.CommonConfig;
+import org.apache.streampark.common.conf.InternalConfigHolder;
+import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.domain.FlinkMemorySize;
import org.apache.streampark.common.util.AssertUtils;
-import org.apache.streampark.common.util.CommandUtils;
+import org.apache.streampark.common.util.CompletableFutureUtils;
import org.apache.streampark.common.util.ThreadUtils;
-import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.console.base.domain.ResponseCode;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.domain.RestResponse;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
-import org.apache.streampark.console.base.util.CommonUtils;
+import org.apache.streampark.console.base.util.FileUtils;
import org.apache.streampark.console.base.util.GZipUtils;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.Project;
@@ -34,16 +38,13 @@ import org.apache.streampark.console.core.mapper.ProjectMapper;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.ProjectService;
import org.apache.streampark.console.core.task.FlinkTrackingTask;
-import org.apache.streampark.console.core.websocket.WebSocketEndpoint;
+import org.apache.streampark.console.core.task.ProjectBuildTask;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
-import org.eclipse.jgit.api.CloneCommand;
-import org.eclipse.jgit.api.Git;
-import org.eclipse.jgit.lib.StoredConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
@@ -51,18 +52,18 @@ import org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -73,12 +74,6 @@ import java.util.concurrent.TimeUnit;
public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
implements ProjectService {
- private final Map<Long, Byte> tailOutMap = new ConcurrentHashMap<>();
-
- private final Map<Long, StringBuilder> tailBuffer = new ConcurrentHashMap<>();
-
- private final Map<Long, Byte> tailBeginning = new ConcurrentHashMap<>();
-
@Autowired
private ApplicationService applicationService;
@@ -172,123 +167,13 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
}
@Override
- public void build(Long id, String socketId) throws Exception {
+ public void build(Long id) throws Exception {
Project project = getById(id);
this.baseMapper.startBuild(project);
- StringBuilder builder = new StringBuilder();
- tailBuffer.put(id, builder.append(project.getLog4BuildStart()));
- boolean cloneSuccess = cloneSourceCode(project, socketId);
- if (!cloneSuccess) {
- log.error("[StreamPark] clone or pull error.");
- tailBuffer.remove(project.getId());
- this.baseMapper.failureBuild(project);
- return;
- }
- executorService.execute(() -> {
- boolean build = projectBuild(project, socketId);
- if (!build) {
- this.baseMapper.failureBuild(project);
- log.error("build error, project name: {} ", project.getName());
- return;
- }
- this.baseMapper.successBuild(project);
- try {
- this.deploy(project);
- List<Application> applications = getApplications(project);
- // Update the deploy state
- FlinkTrackingTask.refreshTracking(() -> applications.forEach((app) -> {
- log.info("update deploy by project: {}, appName:{}", project.getName(), app.getJobName());
- app.setLaunch(LaunchState.NEED_LAUNCH.get());
- app.setBuild(true);
- this.applicationService.updateLaunch(app);
- }));
- } catch (Exception e) {
- this.baseMapper.failureBuild(project);
- log.error("deploy error, project name: {}, detail: {}", project.getName(), e.getMessage());
- }
- });
- }
-
- private void deploy(Project project) throws Exception {
- File path = project.getAppSource();
- List<File> apps = new ArrayList<>();
- // find the compiled tar.gz (Stream Park project) file or jar (normal or official standard flink project) under the project path
- findTarOrJar(apps, path);
- if (apps.isEmpty()) {
- throw new RuntimeException("[StreamPark] can't find tar.gz or jar in " + path.getAbsolutePath());
- }
- for (File app : apps) {
- String appPath = app.getAbsolutePath();
- // 1). tar.gz file
- if (appPath.endsWith("tar.gz")) {
- File deployPath = project.getDistHome();
- if (!deployPath.exists()) {
- deployPath.mkdirs();
- }
- // xzvf jar
- if (app.exists()) {
- String cmd = String.format(
- "tar -xzvf %s -C %s",
- app.getAbsolutePath(),
- deployPath.getAbsolutePath()
- );
- CommandUtils.execute(cmd);
- }
- } else {
- // 2) .jar file(normal or official standard flink project)
- Utils.checkJarFile(app.toURI().toURL());
- String moduleName = app.getName().replace(".jar", "");
- File distHome = project.getDistHome();
- File targetDir = new File(distHome, moduleName);
- if (!targetDir.exists()) {
- targetDir.mkdirs();
- }
- File targetJar = new File(targetDir, app.getName());
- app.renameTo(targetJar);
- }
- }
- }
-
- private void findTarOrJar(List<File> list, File path) {
- for (File file : Objects.requireNonNull(path.listFiles())) {
- // navigate to the target directory:
- if (file.isDirectory() && "target".equals(file.getName())) {
- // find the tar.gz file or the jar file in the target path.
- // note: only one of the two can be selected, which cannot be satisfied at the same time.
- File tar = null;
- File jar = null;
- for (File targetFile : Objects.requireNonNull(file.listFiles())) {
- // 1) exit once the tar.gz file is found.
- if (targetFile.getName().endsWith("tar.gz")) {
- tar = targetFile;
- break;
- }
- // 2) try look for jar files, there may be multiple jars found.
- if (!targetFile.getName().startsWith("original-")
- && !targetFile.getName().endsWith("-sources.jar")
- && targetFile.getName().endsWith(".jar")) {
- if (jar == null) {
- jar = targetFile;
- } else {
- // there may be multiple jars found, in this case, select the jar with the largest and return
- if (targetFile.length() > jar.length()) {
- jar = targetFile;
- }
- }
- }
- }
- File target = tar == null ? jar : tar;
- if (target == null) {
- log.warn("[StreamPark] can't find tar.gz or jar in {}", file.getAbsolutePath());
- } else {
- list.add(target);
- }
- }
-
- if (file.isDirectory()) {
- findTarOrJar(list, file);
- }
- }
+ CompletableFuture<Void> buildTask = CompletableFuture.runAsync(
+ new ProjectBuildTask(getBuildLogPath(id), project, baseMapper, applicationService), executorService);
+ // TODO May need to define parameters to set the build timeout in the future.
+ CompletableFutureUtils.runTimeout(buildTask, 20, TimeUnit.MINUTES);
}
@Override
@@ -361,69 +246,6 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
return null;
}
- private boolean cloneSourceCode(Project project, String socketId) {
- try {
- project.cleanCloned();
- log.info("clone {}, {} starting...", project.getName(), project.getUrl());
-
- WebSocketEndpoint.writeMessage(socketId, String.format("clone %s starting..., url: %s", project.getName(), project.getUrl()));
-
- tailBuffer.get(project.getId()).append(project.getLog4CloneStart());
- CloneCommand cloneCommand = Git.cloneRepository()
- .setURI(project.getUrl())
- .setDirectory(project.getAppSource())
- .setBranch(project.getBranches());
-
- if (CommonUtils.notEmpty(project.getUserName(), project.getPassword())) {
- cloneCommand.setCredentialsProvider(project.getCredentialsProvider());
- }
-
- Future<Git> future = executorService.submit(cloneCommand);
- Git git = future.get(60, TimeUnit.SECONDS);
-
- StoredConfig config = git.getRepository().getConfig();
- config.setBoolean("http", project.getUrl(), "sslVerify", false);
- config.setBoolean("https", project.getUrl(), "sslVerify", false);
- config.save();
-
- File workTree = git.getRepository().getWorkTree();
- gitWorkTree(project.getId(), workTree, "");
- String successMsg = String.format(
- "[StreamPark] project [%s] git clone successful!\n",
- project.getName()
- );
- tailBuffer.get(project.getId()).append(successMsg);
- WebSocketEndpoint.writeMessage(socketId, successMsg);
- git.close();
- return true;
- } catch (Exception e) {
- String errorLog = String.format(
- "[StreamPark] project [%s] branch [%s] git clone failure, err: %s",
- project.getName(),
- project.getBranches(),
- e
- );
- tailBuffer.get(project.getId()).append(errorLog);
- WebSocketEndpoint.writeMessage(socketId, errorLog);
- log.error(String.format("project %s clone error ", project.getName()), e);
- return false;
- }
- }
-
- private void gitWorkTree(Long id, File workTree, String space) {
- File[] files = workTree.listFiles();
- for (File file : Objects.requireNonNull(files)) {
- if (!file.getName().startsWith(".git")) {
- if (file.isFile()) {
- tailBuffer.get(id).append(space).append("/").append(file.getName()).append("\n");
- } else if (file.isDirectory()) {
- tailBuffer.get(id).append(space).append("/").append(file.getName()).append("\n");
- gitWorkTree(id, file, space.concat("/").concat(file.getName()));
- }
- }
- }
- }
-
private void eachFile(File file, List<Map<String, Object>> list, Boolean isRoot) {
if (file != null && file.exists() && file.listFiles() != null) {
if (isRoot) {
@@ -456,37 +278,44 @@ public class ProjectServiceImpl extends ServiceImpl<ProjectMapper, Project>
}
}
- private boolean projectBuild(Project project, String socketId) {
- StringBuilder builder = tailBuffer.get(project.getId());
- int code = CommandUtils.execute(project.getMavenWorkHome(),
- Collections.singletonList(project.getMavenArgs()),
- (line) -> {
- builder.append(line).append("\n");
- if (tailOutMap.containsKey(project.getId())) {
- if (tailBeginning.containsKey(project.getId())) {
- tailBeginning.remove(project.getId());
- Arrays.stream(builder.toString().split("\n"))
- .forEach(out -> WebSocketEndpoint.writeMessage(socketId, out));
- }
- WebSocketEndpoint.writeMessage(socketId, line);
- }
- });
- closeBuildLog(project.getId());
- log.info(builder.toString());
- tailBuffer.remove(project.getId());
- return code == 0;
- }
-
@Override
- public void tailBuildLog(Long id) {
- this.tailOutMap.put(id, Byte.valueOf("0"));
- this.tailBeginning.put(id, Byte.valueOf("0"));
+ public RestResponse getBuildLog(Long id, Long startOffset) {
+ File logFile = Paths.get(getBuildLogPath(id)).toFile();
+ if (!logFile.exists()) {
+ String errorMsg = String.format("Build log file(fileName=%s) not found, please build first.", logFile);
+ log.warn(errorMsg);
+ return RestResponse.success().data(errorMsg);
+ }
+ boolean isBuilding = this.getById(id).getBuildState() == 0;
+ byte[] fileContent;
+ long endOffset = 0L;
+ boolean readFinished = true;
+ // Read log from earliest when project is building
+ if (startOffset == null && isBuilding) {
+ startOffset = 0L;
+ }
+ try {
+ long maxSize = FlinkMemorySize.parse(InternalConfigHolder.get(CommonConfig.READ_LOG_MAX_SIZE())).getBytes();
+ if (startOffset == null) {
+ fileContent = FileUtils.readEndOfFile(logFile, maxSize);
+ } else {
+ fileContent = FileUtils.readFileFromOffset(logFile, startOffset, maxSize);
+ endOffset = startOffset + fileContent.length;
+ readFinished = logFile.length() == endOffset && !isBuilding;
+ }
+ return RestResponse.success()
+ .data(new String(fileContent, StandardCharsets.UTF_8))
+ .put("offset", endOffset)
+ .put("readFinished", readFinished);
+ } catch (IOException e) {
+ String error = String.format("Read build log file(fileName=%s) caused an exception: ", logFile);
+ log.error(error, e);
+ return RestResponse.fail(error + e.getMessage(), ResponseCode.CODE_FAIL);
+ }
}
- @Override
- public void closeBuildLog(Long id) {
- tailOutMap.remove(id);
- tailBeginning.remove(id);
+ private String getBuildLogPath(Long projectId) {
+ return String.format("%s/%s/build.log", Workspace.local().PROJECT_BUILD_LOG_DIR(), projectId);
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AbstractLogFileTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AbstractLogFileTask.java
new file mode 100644
index 000000000..9844f826c
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/AbstractLogFileTask.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.LoggerContext;
+import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.FileAppender;
+import lombok.extern.slf4j.Slf4j;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+@Slf4j
+public abstract class AbstractLogFileTask implements Runnable {
+
+ private final Path logPath;
+
+ /**
+ * Whether old log files need to be overwritten?
+ * If true, the log file will be clean before write.
+ * If false, the log will be appended to new log file.
+ */
+ private final boolean isOverwrite;
+
+ protected Logger fileLogger;
+
+ private FileAppender<ILoggingEvent> fileAppender;
+
+ private PatternLayoutEncoder ple;
+
+ public AbstractLogFileTask(String logPath, boolean isOverwrite) {
+ this.logPath = Paths.get(logPath);
+ this.isOverwrite = isOverwrite;
+ }
+
+ @Override
+ public void run() {
+ try {
+ Path logDir = logPath.getParent();
+ if (!Files.isDirectory(logDir)) {
+ log.info("Created log dir {}", logDir);
+ Files.createDirectories(logDir);
+ }
+ if (isOverwrite) {
+ Files.deleteIfExists(logPath);
+ }
+ this.fileLogger = createFileLogger();
+ doRun();
+ } catch (Throwable t) {
+ log.warn("Failed to run task.", t);
+ if (fileLogger != null) {
+ fileLogger.error("Failed to run task.", t);
+ }
+ processException(t);
+ } finally {
+ doFinally();
+ if (ple != null) {
+ ple.stop();
+ }
+ if (fileAppender != null) {
+ fileAppender.stop();
+ }
+ if (fileLogger != null) {
+ fileLogger.detachAppender(fileAppender);
+ }
+ }
+ }
+
+ private Logger createFileLogger() {
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+
+ ple = new PatternLayoutEncoder();
+ ple.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS,Asia/Singapore} %-5p - %m%n");
+ ple.setContext(lc);
+ ple.start();
+
+ this.fileAppender = new FileAppender<>();
+ fileAppender.setFile(logPath.toString());
+ fileAppender.setEncoder(ple);
+ fileAppender.setContext(lc);
+ fileAppender.start();
+
+ ch.qos.logback.classic.Logger logger =
+ (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Thread.currentThread().getName());
+ logger.addAppender(fileAppender);
+ logger.setLevel(Level.INFO);
+ logger.setAdditive(false);
+ return logger;
+ }
+
+ protected abstract void doRun() throws Throwable;
+
+ protected abstract void processException(Throwable t);
+
+ protected abstract void doFinally();
+
+}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
new file mode 100644
index 000000000..b738058e6
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.task;
+
+import org.apache.streampark.common.util.CommandUtils;
+import org.apache.streampark.common.util.Utils;
+import org.apache.streampark.console.base.util.CommonUtils;
+import org.apache.streampark.console.core.entity.Application;
+import org.apache.streampark.console.core.entity.Project;
+import org.apache.streampark.console.core.enums.LaunchState;
+import org.apache.streampark.console.core.mapper.ProjectMapper;
+import org.apache.streampark.console.core.service.ApplicationService;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.jgit.api.CloneCommand;
+import org.eclipse.jgit.api.Git;
+import org.eclipse.jgit.lib.StoredConfig;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+@Slf4j
+public class ProjectBuildTask extends AbstractLogFileTask {
+
+ final Project project;
+
+ final ProjectMapper baseMapper;
+
+ final ApplicationService applicationService;
+
+ public ProjectBuildTask(
+ String logPath,
+ Project project,
+ ProjectMapper baseMapper,
+ ApplicationService applicationService) {
+ super(logPath, true);
+ this.project = project;
+ this.baseMapper = baseMapper;
+ this.applicationService = applicationService;
+ }
+
+ @Override
+ protected void doRun() throws Throwable {
+ log.info("Project {} start build", project.getName());
+ fileLogger.info(project.getLog4BuildStart());
+ boolean cloneSuccess = cloneSourceCode(project);
+ if (!cloneSuccess) {
+ fileLogger.error("[StreamPark] clone or pull error.");
+ this.baseMapper.failureBuild(project);
+ return;
+ }
+ boolean build = projectBuild(project);
+ if (!build) {
+ this.baseMapper.failureBuild(project);
+ fileLogger.error("build error, project name: {} ", project.getName());
+ return;
+ }
+ this.baseMapper.successBuild(project);
+ this.deploy(project);
+ List<Application> applications = this.applicationService.getByProjectId(project.getId());
+ // Update the deploy state
+ FlinkTrackingTask.refreshTracking(() -> applications.forEach((app) -> {
+ fileLogger.info("update deploy by project: {}, appName:{}", project.getName(), app.getJobName());
+ app.setLaunch(LaunchState.NEED_LAUNCH.get());
+ app.setBuild(true);
+ this.applicationService.updateLaunch(app);
+ }));
+ }
+
+ @Override
+ protected void processException(Throwable t) {
+ this.baseMapper.failureBuild(project);
+ fileLogger.error("Build error, project name: {}", project.getName(), t);
+ }
+
+ @Override
+ protected void doFinally() {
+ }
+
+ private boolean cloneSourceCode(Project project) {
+ try {
+ project.cleanCloned();
+ fileLogger.info("clone {}, {} starting...", project.getName(), project.getUrl());
+ fileLogger.info(project.getLog4CloneStart());
+ CloneCommand cloneCommand = Git.cloneRepository()
+ .setURI(project.getUrl())
+ .setDirectory(project.getAppSource())
+ .setBranch(project.getBranches());
+
+ if (CommonUtils.notEmpty(project.getUserName(), project.getPassword())) {
+ cloneCommand.setCredentialsProvider(project.getCredentialsProvider());
+ }
+ Git git = cloneCommand.call();
+ StoredConfig config = git.getRepository().getConfig();
+ config.setBoolean("http", project.getUrl(), "sslVerify", false);
+ config.setBoolean("https", project.getUrl(), "sslVerify", false);
+ config.save();
+
+ File workTree = git.getRepository().getWorkTree();
+ gitWorkTree(project.getId(), workTree, "");
+ String successMsg = String.format(
+ "[StreamPark] project [%s] git clone successful!\n",
+ project.getName()
+ );
+ fileLogger.info(successMsg);
+ git.close();
+ return true;
+ } catch (Exception e) {
+ fileLogger.error(String.format(
+ "[StreamPark] project [%s] branch [%s] git clone failure, err: %s",
+ project.getName(),
+ project.getBranches(),
+ e
+ ));
+ fileLogger.error(String.format("project %s clone error ", project.getName()), e);
+ return false;
+ }
+ }
+
+ private void gitWorkTree(Long id, File workTree, String space) {
+ File[] files = workTree.listFiles();
+ for (File file : Objects.requireNonNull(files)) {
+ if (!file.getName().startsWith(".git")) {
+ continue;
+ }
+ if (file.isFile()) {
+ fileLogger.info("{} / {}", space, file.getName());
+ } else if (file.isDirectory()) {
+ fileLogger.info("{} / {}", space, file.getName());
+ gitWorkTree(id, file, space.concat("/").concat(file.getName()));
+ }
+ }
+ }
+
+ private boolean projectBuild(Project project) {
+ int code = CommandUtils.execute(project.getMavenWorkHome(),
+ Collections.singletonList(project.getMavenArgs()),
+ (line) -> fileLogger.info(line));
+ return code == 0;
+ }
+
+ private void deploy(Project project) throws Exception {
+ File path = project.getAppSource();
+ List<File> apps = new ArrayList<>();
+ // find the compiled tar.gz (Stream Park project) file or jar (normal or official standard flink project) under the project path
+ findTarOrJar(apps, path);
+ if (apps.isEmpty()) {
+ throw new RuntimeException("[StreamPark] can't find tar.gz or jar in " + path.getAbsolutePath());
+ }
+ for (File app : apps) {
+ String appPath = app.getAbsolutePath();
+ // 1). tar.gz file
+ if (appPath.endsWith("tar.gz")) {
+ File deployPath = project.getDistHome();
+ if (!deployPath.exists()) {
+ deployPath.mkdirs();
+ }
+ // xzvf jar
+ if (app.exists()) {
+ String cmd = String.format(
+ "tar -xzvf %s -C %s",
+ app.getAbsolutePath(),
+ deployPath.getAbsolutePath()
+ );
+ CommandUtils.execute(cmd);
+ }
+ } else {
+ // 2) .jar file(normal or official standard flink project)
+ Utils.checkJarFile(app.toURI().toURL());
+ String moduleName = app.getName().replace(".jar", "");
+ File distHome = project.getDistHome();
+ File targetDir = new File(distHome, moduleName);
+ if (!targetDir.exists()) {
+ targetDir.mkdirs();
+ }
+ File targetJar = new File(targetDir, app.getName());
+ app.renameTo(targetJar);
+ }
+ }
+ }
+
+ private void findTarOrJar(List<File> list, File path) {
+ for (File file : Objects.requireNonNull(path.listFiles())) {
+ // navigate to the target directory:
+ if (file.isDirectory() && "target".equals(file.getName())) {
+ // find the tar.gz file or the jar file in the target path.
+ // note: only one of the two can be selected, which cannot be satisfied at the same time.
+ File tar = null;
+ File jar = null;
+ for (File targetFile : Objects.requireNonNull(file.listFiles())) {
+ // 1) exit once the tar.gz file is found.
+ if (targetFile.getName().endsWith("tar.gz")) {
+ tar = targetFile;
+ break;
+ }
+ // 2) try look for jar files, there may be multiple jars found.
+ if (!targetFile.getName().startsWith("original-")
+ && !targetFile.getName().endsWith("-sources.jar")
+ && targetFile.getName().endsWith(".jar")) {
+ if (jar == null) {
+ jar = targetFile;
+ } else {
+ // there may be multiple jars found, in this case, select the jar with the largest and return
+ if (targetFile.length() > jar.length()) {
+ jar = targetFile;
+ }
+ }
+ }
+ }
+ File target = tar == null ? jar : tar;
+ if (target == null) {
+ fileLogger.warn("[StreamPark] can't find tar.gz or jar in {}", file.getAbsolutePath());
+ } else {
+ list.add(target);
+ }
+ }
+ if (file.isDirectory()) {
+ findTarOrJar(list, file);
+ }
+ }
+ }
+}
diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
new file mode 100644
index 000000000..68846b46f
--- /dev/null
+++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/FileUtilsTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.base.util;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Random;
+
+/**
+ * Test for {@link FileUtils}
+ */
+public class FileUtilsTest {
+
+ @ClassRule
+ public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+ @Test
+ public void testReadEndOfFile() throws IOException {
+ final File outDir = TEMP_FOLDER.newFolder();
+ File file = new File(outDir, "tmp_file");
+ FileOutputStream outputStream = new FileOutputStream(file);
+ Random random = new Random();
+ int fileSize = 1000000;
+ byte[] fileBytes = new byte[fileSize];
+ random.nextBytes(fileBytes);
+ outputStream.write(fileBytes);
+ outputStream.flush();
+ outputStream.close();
+
+ // The read size is larger than the file size
+ byte[] readBytes = FileUtils.readEndOfFile(file, fileSize + 1);
+ assertArrayEquals(fileBytes, readBytes);
+
+ // The read size is equals the file size
+ readBytes = FileUtils.readEndOfFile(file, fileSize);
+ assertArrayEquals(fileBytes, readBytes);
+
+ // The read size is less than the file size
+ int readSize = 50000;
+ readBytes = FileUtils.readEndOfFile(file, readSize);
+ byte[] expectedBytes = new byte[readSize];
+ System.arraycopy(fileBytes, fileSize - readSize, expectedBytes, 0, expectedBytes.length);
+ assertArrayEquals(expectedBytes, readBytes);
+ }
+
+ @Test
+ public void testReadEndOfFileWithChinese() throws IOException {
+ final File outDir = TEMP_FOLDER.newFolder();
+
+ File file = new File(outDir, "tmp_file");
+ PrintWriter writer = new PrintWriter(file);
+ String logWithChinese = "Hello world! 你好啊,hello xxxx";
+ writer.write(logWithChinese);
+ writer.close();
+
+ byte[] bytes = FileUtils.readEndOfFile(file, 1000000);
+ String readString = new String(bytes);
+ assertEquals(logWithChinese, readString);
+ }
+
+ @Test
+ public void testReadFileFromOffset() throws IOException {
+ final File outDir = TEMP_FOLDER.newFolder();
+ File file = new File(outDir, "tmp_file");
+ FileOutputStream outputStream = new FileOutputStream(file);
+ Random random = new Random();
+ int fileSize = 1000000;
+ byte[] fileBytes = new byte[fileSize];
+ random.nextBytes(fileBytes);
+ outputStream.write(fileBytes);
+ outputStream.flush();
+ outputStream.close();
+
+ // The read size is larger than the file size
+ byte[] readBytes = FileUtils.readFileFromOffset(file, 0, fileSize + 1);
+ assertArrayEquals(fileBytes, readBytes);
+
+ // The read size is equals the file size
+ readBytes = FileUtils.readFileFromOffset(file, 0, fileSize);
+ assertArrayEquals(fileBytes, readBytes);
+
+ // The read size is less than the file size
+ int readSize = 3456;
+ readBytes = new byte[fileSize];
+ byte[] tmpReadBytes;
+ for (int i = 0; i < fileSize; i += tmpReadBytes.length) {
+ tmpReadBytes = FileUtils.readFileFromOffset(file, i, readSize);
+ assertTrue(tmpReadBytes.length <= readSize);
+ System.arraycopy(tmpReadBytes, 0, readBytes, i, tmpReadBytes.length);
+ }
+ assertArrayEquals(fileBytes, readBytes);
+ }
+
+}
diff --git a/streampark-console/streampark-console-webapp/src/api/index.js b/streampark-console/streampark-console-webapp/src/api/index.js
index d0e50a0d5..e6c50f5d9 100644
--- a/streampark-console/streampark-console-webapp/src/api/index.js
+++ b/streampark-console/streampark-console-webapp/src/api/index.js
@@ -30,7 +30,6 @@ export default {
COPY: '/flink/project/copy',
BUILD: '/flink/project/build',
BUILD_LOG: '/flink/project/buildlog',
- CLOSE_BUILD: '/flink/project/closebuild',
LIST: '/flink/project/list',
FILE_LIST: '/flink/project/filelist',
MODULES: '/flink/project/modules',
diff --git a/streampark-console/streampark-console-webapp/src/api/project.js b/streampark-console/streampark-console-webapp/src/api/project.js
index 221ac5e33..658385fe8 100644
--- a/streampark-console/streampark-console-webapp/src/api/project.js
+++ b/streampark-console/streampark-console-webapp/src/api/project.js
@@ -53,10 +53,6 @@ export function buildlog (params) {
return http.post(api.Project.BUILD_LOG, params)
}
-export function closebuild(params) {
- return http.post(api.Project.CLOSE_BUILD, params)
-}
-
export function fileList (params) {
return http.post(api.Project.FILE_LIST, params)
}
diff --git a/streampark-console/streampark-console-webapp/src/views/flink/project/View.vue b/streampark-console/streampark-console-webapp/src/views/flink/project/View.vue
index 08ee5f1ca..7f73ba98e 100644
--- a/streampark-console/streampark-console-webapp/src/views/flink/project/View.vue
+++ b/streampark-console/streampark-console-webapp/src/views/flink/project/View.vue
@@ -163,19 +163,14 @@
<div class="operation">
- <a-tooltip
- title="See Build log"
- v-if="item.buildState === 0">
+ <a-tooltip title="See Build log">
<a-button
shape="circle"
size="small"
style="margin-left: 8px"
@click.native="handleSeeLog(item)"
class="control-button ctl-btn-color">
- <a-icon
- spin
- type="sync"
- style="color:#4a9ff5"/>
+ <a-icon type="eye"/>
</a-button>
</a-tooltip>
@@ -238,10 +233,9 @@
width="65%"
:body-style="controller.modalStyle"
:destroy-on-close="controller.modalDestroyOnClose"
- :footer="null"
- @cancel="handleClose">
+ :footer="null">
<template slot="title">
- <svg-icon name="code" />
+ <a-icon type="file-text" />
{{ controller.consoleName }}
</template>
<div
@@ -252,15 +246,13 @@
</div>
</template>
<script>
-import { build, buildlog, list,remove,closebuild } from '@api/project'
+import { build, buildlog, list,remove } from '@api/project'
import Ellipsis from '@comp/Ellipsis'
import {mapActions} from 'vuex'
import { Terminal } from 'xterm'
import 'xterm/css/xterm.css'
import SvgIcon from '@/components/SvgIcon'
-import {baseUrl} from '@/api/baseUrl'
-import storage from '@/utils/storage'
export default {
components: { Ellipsis, SvgIcon },
@@ -274,8 +266,6 @@ export default {
stompClient: null,
terminal: null,
projectId: null,
- socketId: null,
- storageKey: 'BUILD_SOCKET_ID',
controller: {
ellipsis: 100,
modalStyle: {
@@ -332,11 +322,8 @@ export default {
showConfirmButton: false,
timer: 2000
}).then((r)=> {
- this.socketId = this.uuid()
- storage.set(this.storageKey,this.socketId)
build({
id: record.id,
- socketId: this.socketId
})
})
},
@@ -407,40 +394,19 @@ export default {
})
const container = document.getElementById('terminal')
this.terminal.open(container, true)
-
- const url = baseUrl().concat('/websocket/' + this.handleGetSocketId())
-
- const socket = this.getSocket(url)
-
- socket.onopen = () => {
- buildlog({id:project.id})
- }
-
- socket.onmessage = (event) => {
- this.terminal.writeln(event.data)
- }
-
- socket.onclose = () => {
- this.socketId = null
- storage.rm(this.storageKey)
- }
-
- },
-
- handleGetSocketId() {
- if (this.socketId == null) {
- return storage.get(this.storageKey) || null
- }
- return this.socketId
+ this.fetchBuildLog(project, null)
},
- handleClose () {
- closebuild({ id: this.projectId })
- this.stompClient.disconnect()
- this.controller.visible = false
- this.terminal.clear()
- this.terminal.clearSelection()
- this.terminal = null
+ fetchBuildLog(project, startOffset) {
+ buildlog({
+ id: project.id,
+ startOffset: startOffset
+ }).then((resp) => {
+ this.terminal.write(resp.data)
+ if (resp.readFinished === false) {
+ window.setTimeout(() => this.fetchBuildLog(project, resp.offset), 500)
+ }
+ })
},
handleQuery (state) {
@@ -490,7 +456,7 @@ export default {
display: inline-block;
vertical-align: middle;
font-size: 14px;
- margin-left: 40px;
+ margin-left: 60px;
span {
line-height: 20px;
@@ -518,7 +484,7 @@ export default {
}
.operation {
- width: 120px;
+ width: 150px;
}
.ant-tag {