You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/07 10:44:27 UTC
[incubator-streampark] branch loggg updated: [bug]Resolve that running logs do not display exceptions.
This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch loggg
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/loggg by this push:
new 6f197dc1f [bug]Resolve that running logs do not display exceptions.
6f197dc1f is described below
commit 6f197dc1fb4429ac9d98fd10d16806af7f3038d9
Author: Monster <25...@qq.com>
AuthorDate: Mon Nov 7 18:44:09 2022 +0800
[bug]Resolve that running logs do not display exceptions.
---
.../flink/app/components/AppView/LogModal.vue | 1 +
.../core/controller/ApplicationController.java | 3 ++-
.../console/core/service/LoggerService.java | 2 +-
.../core/service/impl/ApplicationServiceImpl.java | 2 +-
.../core/service/impl/LoggerServiceImpl.java | 10 ++++-----
.../helper/KubernetesDeploymentHelper.scala | 24 ++++++++++------------
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 2 +-
7 files changed, 22 insertions(+), 22 deletions(-)
diff --git a/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue b/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue
index d5b01e30b..f2e3c387e 100644
--- a/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue
+++ b/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue
@@ -61,6 +61,7 @@
getLogLoading.value = true;
const { data } = await fetchStartLog({
namespace: app.k8sNamespace,
+ jobName: app.jobName,
jobId: app.jobId,
limit: 100000000,
skipLineNum: 0,
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 1e257e3c5..1ec165574 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -335,9 +335,10 @@ public class ApplicationController {
@PostMapping(value = "/detail")
public RestResponse detail(@ApiParam("K8s name spaces") @RequestParam(value = "namespace", required = false) String namespace,
@ApiParam("Job name") @RequestParam(value = "jobName", required = false) String jobName,
+ @ApiParam("Job id") @RequestParam(value = "jobId", required = false) String jobId,
@ApiParam("Number of log lines skipped loading") @RequestParam(value = "skipLineNum", required = false) Integer skipLineNum,
@ApiParam("Number of log lines loaded at once") @RequestParam(value = "limit", required = false) Integer limit) {
- return RestResponse.success(MoreFutures.derefUsingDefaultTimeout(logService.queryLog(namespace, jobName, skipLineNum, limit)));
+ return RestResponse.success(MoreFutures.derefUsingDefaultTimeout(logService.queryLog(namespace, jobName, jobId, skipLineNum, limit)));
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java
index 5b7dd3a22..6b6795405 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java
@@ -20,5 +20,5 @@ package org.apache.streampark.console.core.service;
import java.util.concurrent.CompletionStage;
public interface LoggerService {
- CompletionStage<String> queryLog(String namespac, String jobName, int skipLineNum, int limit);
+ CompletionStage<String> queryLog(String namespac, String jobName, String jobId, int skipLineNum, int limit);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 47e86973a..980a1bc92 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -928,7 +928,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
CompletableFuture<CancelResponse> cancelFuture = cancelFutureMap.remove(app.getId());
Application application = this.baseMapper.getApp(app);
if (isKubernetesApp(application)) {
- KubernetesDeploymentHelper.watchPodTerminatedLog(application.getK8sNamespace(), application.getJobName());
+ KubernetesDeploymentHelper.watchPodTerminatedLog(application.getK8sNamespace(), application.getJobName(), application.getJobId());
KubernetesDeploymentHelper.deleteTaskDeployment(application.getK8sNamespace(), application.getJobName());
KubernetesDeploymentHelper.deleteTaskConfigMap(application.getK8sNamespace(), application.getJobName());
IngressController.deleteIngress(application.getK8sNamespace(), application.getJobName());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
index 7ae6ec5fa..611e447b5 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
@@ -47,11 +47,11 @@ public class LoggerServiceImpl implements LoggerService {
* @param limit limit
* @return log string data
*/
- public CompletionStage<String> queryLog(String nameSpace, String jobName, int skipLineNum, int limit) {
- return CompletableFuture.supplyAsync(() -> jobDeploymentsWatch(nameSpace, jobName)
+ public CompletionStage<String> queryLog(String nameSpace, String jobName, String jobId, int skipLineNum, int limit) {
+ return CompletableFuture.supplyAsync(() -> jobDeploymentsWatch(nameSpace, jobName, jobId)
).exceptionally(e -> {
try {
- return String.format("%s/%s_%s_err.log", new File("").getCanonicalPath(), nameSpace, jobName);
+ return String.format("%s/%s_err.log", new File("temp").getCanonicalPath(), jobId);
} catch (IOException ex) {
log.error("Generate log path exception:{}", ex.getMessage());
return null;
@@ -59,8 +59,8 @@ public class LoggerServiceImpl implements LoggerService {
}).thenApply(path -> logClient.rollViewLog(String.valueOf(path), skipLineNum, limit));
}
- private String jobDeploymentsWatch(String nameSpace, String jobName) {
- return KubernetesDeploymentHelper.watchDeploymentLog(nameSpace, jobName);
+ private String jobDeploymentsWatch(String nameSpace, String jobName, String jobId) {
+ return KubernetesDeploymentHelper.watchDeploymentLog(nameSpace, jobName, jobId);
}
}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 375dd4d23..3dfd1617d 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -82,25 +82,23 @@ object KubernetesDeploymentHelper extends Logger {
}
}
- def watchDeploymentLog(nameSpace: String, jobName: String): String = {
+ def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): String = {
tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
- Try {
- val projectPath = new File("").getCanonicalPath
- val path = s"$projectPath/${nameSpace}_$jobName.log"
- val file = new File(path)
- val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
- Files.asCharSink(file, Charsets.UTF_8).write(log)
- path
- }.getOrElse(null)
- }(error => throw error)
+ val projectPath = new File("temp").getCanonicalPath
+ val path = s"$projectPath/$jobId.log"
+ val file = new File(path)
+ val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
+ path
+ }
}
- def watchPodTerminatedLog(nameSpace: String, jobName: String): String = {
+ def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId: String): String = {
tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
Try {
val podName = getPods(nameSpace, jobName).head.getMetadata.getName
- val projectPath = new File("").getCanonicalPath
- val path = s"$projectPath/${nameSpace}_${jobName}_err.log"
+ val projectPath = new File("temp").getCanonicalPath
+ val path = s"$projectPath/${jobId}_err.log"
val file = new File(path)
val log = client.pods.inNamespace(nameSpace).withName(podName).terminated().withPrettyOutput.getLog
Files.asCharSink(file, Charsets.UTF_8).write(log)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index acaec9594..d4bd7e923 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -272,7 +272,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
FlinkJobState.K8S_INITIALIZING
} else if (isConnection) {
logger.info("Enter the task failure deletion process.")
- KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId)
+ KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId, trackId.jobId)
KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, trackId.clusterId)
IngressController.deleteIngress(trackId.namespace, trackId.clusterId)
FlinkJobState.FAILED