You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/07 13:34:37 UTC

[incubator-streampark] branch dev updated: [bug]Resolve that running logs do not display exceptions. (#1981)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 732a5abb1 [bug]Resolve that running logs do not display exceptions. (#1981)
732a5abb1 is described below

commit 732a5abb1dbe570185974377ec615a02877a4072
Author: monster <60...@users.noreply.github.com>
AuthorDate: Mon Nov 7 21:34:32 2022 +0800

    [bug]Resolve that running logs do not display exceptions. (#1981)
    
    * [bug]Resolve that running logs do not display exceptions.
---
 .../flink/app/components/AppView/LogModal.vue      |  1 +
 .../core/controller/ApplicationController.java     |  3 ++-
 .../console/core/service/LoggerService.java        |  2 +-
 .../core/service/impl/ApplicationServiceImpl.java  |  2 +-
 .../core/service/impl/LoggerServiceImpl.java       | 15 ++++++------
 .../helper/KubernetesDeploymentHelper.scala        | 27 ++++++++++------------
 .../kubernetes/watcher/FlinkJobStatusWatcher.scala |  2 +-
 7 files changed, 25 insertions(+), 27 deletions(-)

diff --git a/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue b/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue
index d5b01e30b..f2e3c387e 100644
--- a/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue
+++ b/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue
@@ -61,6 +61,7 @@
       getLogLoading.value = true;
       const { data } = await fetchStartLog({
         namespace: app.k8sNamespace,
+        jobName: app.jobName,
         jobId: app.jobId,
         limit: 100000000,
         skipLineNum: 0,
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 1e257e3c5..1ec165574 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -335,9 +335,10 @@ public class ApplicationController {
     @PostMapping(value = "/detail")
     public RestResponse detail(@ApiParam("K8s name spaces") @RequestParam(value = "namespace", required = false) String namespace,
                                @ApiParam("Job name") @RequestParam(value = "jobName", required = false) String jobName,
+                               @ApiParam("Job id") @RequestParam(value = "jobId", required = false) String jobId,
                                @ApiParam("Number of log lines skipped loading") @RequestParam(value = "skipLineNum", required = false) Integer skipLineNum,
                                @ApiParam("Number of log lines loaded at once") @RequestParam(value = "limit", required = false) Integer limit) {
-        return RestResponse.success(MoreFutures.derefUsingDefaultTimeout(logService.queryLog(namespace, jobName, skipLineNum, limit)));
+        return RestResponse.success(MoreFutures.derefUsingDefaultTimeout(logService.queryLog(namespace, jobName, jobId, skipLineNum, limit)));
     }
 
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java
index 5b7dd3a22..6b6795405 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java
@@ -20,5 +20,5 @@ package org.apache.streampark.console.core.service;
 import java.util.concurrent.CompletionStage;
 
 public interface LoggerService {
-    CompletionStage<String> queryLog(String namespac, String jobName, int skipLineNum, int limit);
+    CompletionStage<String> queryLog(String namespac, String jobName, String jobId, int skipLineNum, int limit);
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 47e86973a..980a1bc92 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -928,7 +928,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
         CompletableFuture<CancelResponse> cancelFuture = cancelFutureMap.remove(app.getId());
         Application application = this.baseMapper.getApp(app);
         if (isKubernetesApp(application)) {
-            KubernetesDeploymentHelper.watchPodTerminatedLog(application.getK8sNamespace(), application.getJobName());
+            KubernetesDeploymentHelper.watchPodTerminatedLog(application.getK8sNamespace(), application.getJobName(), application.getJobId());
             KubernetesDeploymentHelper.deleteTaskDeployment(application.getK8sNamespace(), application.getJobName());
             KubernetesDeploymentHelper.deleteTaskConfigMap(application.getK8sNamespace(), application.getJobName());
             IngressController.deleteIngress(application.getK8sNamespace(), application.getJobName());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
index 7ae6ec5fa..ecc171ba2 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.console.base.util.WebUtils;
 import org.apache.streampark.console.core.service.LogClientService;
 import org.apache.streampark.console.core.service.LoggerService;
 import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
@@ -25,8 +26,6 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
-import java.io.File;
-import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 
@@ -47,20 +46,20 @@ public class LoggerServiceImpl implements LoggerService {
      * @param limit       limit
      * @return log string data
      */
-    public CompletionStage<String> queryLog(String nameSpace, String jobName, int skipLineNum, int limit) {
-        return CompletableFuture.supplyAsync(() -> jobDeploymentsWatch(nameSpace, jobName)
+    public CompletionStage<String> queryLog(String nameSpace, String jobName, String jobId, int skipLineNum, int limit) {
+        return CompletableFuture.supplyAsync(() -> jobDeploymentsWatch(nameSpace, jobName, jobId)
         ).exceptionally(e -> {
             try {
-                return String.format("%s/%s_%s_err.log", new File("").getCanonicalPath(), nameSpace, jobName);
-            } catch (IOException ex) {
+                return String.format("%s/%s_err.log", WebUtils.getAppTempDir(), jobId);
+            } catch (Exception ex) {
                 log.error("Generate log path exception:{}", ex.getMessage());
                 return null;
             }
         }).thenApply(path -> logClient.rollViewLog(String.valueOf(path), skipLineNum, limit));
     }
 
-    private String jobDeploymentsWatch(String nameSpace, String jobName) {
-        return KubernetesDeploymentHelper.watchDeploymentLog(nameSpace, jobName);
+    private String jobDeploymentsWatch(String nameSpace, String jobName, String jobId) {
+        return KubernetesDeploymentHelper.watchDeploymentLog(nameSpace, jobName, jobId);
     }
 }
 
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 375dd4d23..e9922385b 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -17,10 +17,9 @@
 
 package org.apache.streampark.flink.kubernetes.helper
 
-
 import com.google.common.base.Charsets
 import com.google.common.io.Files
-import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
 import org.apache.streampark.common.util.Utils.tryWithResource
 import org.apache.streampark.flink.kubernetes.KubernetesRetriever
 import io.fabric8.kubernetes.api.model.Pod
@@ -82,25 +81,23 @@ object KubernetesDeploymentHelper extends Logger {
     }
   }
 
-  def watchDeploymentLog(nameSpace: String, jobName: String): String = {
+  def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): String = {
     tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
-      Try {
-        val projectPath = new File("").getCanonicalPath
-        val path = s"$projectPath/${nameSpace}_$jobName.log"
-        val file = new File(path)
-        val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
-        Files.asCharSink(file, Charsets.UTF_8).write(log)
-        path
-      }.getOrElse(null)
-    }(error => throw error)
+      val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
+      val path = s"$projectPath/$jobId.log"
+      val file = new File(path)
+      val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
+      Files.asCharSink(file, Charsets.UTF_8).write(log)
+      path
+    }
   }
 
-  def watchPodTerminatedLog(nameSpace: String, jobName: String): String = {
+  def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId: String): String = {
     tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
       Try {
         val podName = getPods(nameSpace, jobName).head.getMetadata.getName
-        val projectPath = new File("").getCanonicalPath
-        val path = s"$projectPath/${nameSpace}_${jobName}_err.log"
+        val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
+        val path = s"$projectPath/${jobId}_err.log"
         val file = new File(path)
         val log = client.pods.inNamespace(nameSpace).withName(podName).terminated().withPrettyOutput.getLog
         Files.asCharSink(file, Charsets.UTF_8).write(log)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index acaec9594..d4bd7e923 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -272,7 +272,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
             FlinkJobState.K8S_INITIALIZING
           } else if (isConnection) {
             logger.info("Enter the task failure deletion process.")
-            KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId)
+            KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId, trackId.jobId)
             KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, trackId.clusterId)
             IngressController.deleteIngress(trackId.namespace, trackId.clusterId)
             FlinkJobState.FAILED