You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/07 13:34:37 UTC
[incubator-streampark] branch dev updated: [bug]Resolve that running logs do not display exceptions. (#1981)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 732a5abb1 [bug]Resolve that running logs do not display exceptions. (#1981)
732a5abb1 is described below
commit 732a5abb1dbe570185974377ec615a02877a4072
Author: monster <60...@users.noreply.github.com>
AuthorDate: Mon Nov 7 21:34:32 2022 +0800
[bug]Resolve that running logs do not display exceptions. (#1981)
* [bug]Resolve that running logs do not display exceptions.
---
.../flink/app/components/AppView/LogModal.vue | 1 +
.../core/controller/ApplicationController.java | 3 ++-
.../console/core/service/LoggerService.java | 2 +-
.../core/service/impl/ApplicationServiceImpl.java | 2 +-
.../core/service/impl/LoggerServiceImpl.java | 15 ++++++------
.../helper/KubernetesDeploymentHelper.scala | 27 ++++++++++------------
.../kubernetes/watcher/FlinkJobStatusWatcher.scala | 2 +-
7 files changed, 25 insertions(+), 27 deletions(-)
diff --git a/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue b/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue
index d5b01e30b..f2e3c387e 100644
--- a/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue
+++ b/streampark-console/streampark-console-newui/src/views/flink/app/components/AppView/LogModal.vue
@@ -61,6 +61,7 @@
getLogLoading.value = true;
const { data } = await fetchStartLog({
namespace: app.k8sNamespace,
+ jobName: app.jobName,
jobId: app.jobId,
limit: 100000000,
skipLineNum: 0,
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
index 1e257e3c5..1ec165574 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/ApplicationController.java
@@ -335,9 +335,10 @@ public class ApplicationController {
@PostMapping(value = "/detail")
public RestResponse detail(@ApiParam("K8s name spaces") @RequestParam(value = "namespace", required = false) String namespace,
@ApiParam("Job name") @RequestParam(value = "jobName", required = false) String jobName,
+ @ApiParam("Job id") @RequestParam(value = "jobId", required = false) String jobId,
@ApiParam("Number of log lines skipped loading") @RequestParam(value = "skipLineNum", required = false) Integer skipLineNum,
@ApiParam("Number of log lines loaded at once") @RequestParam(value = "limit", required = false) Integer limit) {
- return RestResponse.success(MoreFutures.derefUsingDefaultTimeout(logService.queryLog(namespace, jobName, skipLineNum, limit)));
+ return RestResponse.success(MoreFutures.derefUsingDefaultTimeout(logService.queryLog(namespace, jobName, jobId, skipLineNum, limit)));
}
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java
index 5b7dd3a22..6b6795405 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/LoggerService.java
@@ -20,5 +20,5 @@ package org.apache.streampark.console.core.service;
import java.util.concurrent.CompletionStage;
public interface LoggerService {
- CompletionStage<String> queryLog(String namespac, String jobName, int skipLineNum, int limit);
+ CompletionStage<String> queryLog(String namespac, String jobName, String jobId, int skipLineNum, int limit);
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 47e86973a..980a1bc92 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -928,7 +928,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
CompletableFuture<CancelResponse> cancelFuture = cancelFutureMap.remove(app.getId());
Application application = this.baseMapper.getApp(app);
if (isKubernetesApp(application)) {
- KubernetesDeploymentHelper.watchPodTerminatedLog(application.getK8sNamespace(), application.getJobName());
+ KubernetesDeploymentHelper.watchPodTerminatedLog(application.getK8sNamespace(), application.getJobName(), application.getJobId());
KubernetesDeploymentHelper.deleteTaskDeployment(application.getK8sNamespace(), application.getJobName());
KubernetesDeploymentHelper.deleteTaskConfigMap(application.getK8sNamespace(), application.getJobName());
IngressController.deleteIngress(application.getK8sNamespace(), application.getJobName());
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
index 7ae6ec5fa..ecc171ba2 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/LoggerServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.console.base.util.WebUtils;
import org.apache.streampark.console.core.service.LogClientService;
import org.apache.streampark.console.core.service.LoggerService;
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper;
@@ -25,8 +26,6 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import java.io.File;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -47,20 +46,20 @@ public class LoggerServiceImpl implements LoggerService {
* @param limit limit
* @return log string data
*/
- public CompletionStage<String> queryLog(String nameSpace, String jobName, int skipLineNum, int limit) {
- return CompletableFuture.supplyAsync(() -> jobDeploymentsWatch(nameSpace, jobName)
+ public CompletionStage<String> queryLog(String nameSpace, String jobName, String jobId, int skipLineNum, int limit) {
+ return CompletableFuture.supplyAsync(() -> jobDeploymentsWatch(nameSpace, jobName, jobId)
).exceptionally(e -> {
try {
- return String.format("%s/%s_%s_err.log", new File("").getCanonicalPath(), nameSpace, jobName);
- } catch (IOException ex) {
+ return String.format("%s/%s_err.log", WebUtils.getAppTempDir(), jobId);
+ } catch (Exception ex) {
log.error("Generate log path exception:{}", ex.getMessage());
return null;
}
}).thenApply(path -> logClient.rollViewLog(String.valueOf(path), skipLineNum, limit));
}
- private String jobDeploymentsWatch(String nameSpace, String jobName) {
- return KubernetesDeploymentHelper.watchDeploymentLog(nameSpace, jobName);
+ private String jobDeploymentsWatch(String nameSpace, String jobName, String jobId) {
+ return KubernetesDeploymentHelper.watchDeploymentLog(nameSpace, jobName, jobId);
}
}
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
index 375dd4d23..e9922385b 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/helper/KubernetesDeploymentHelper.scala
@@ -17,10 +17,9 @@
package org.apache.streampark.flink.kubernetes.helper
-
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.apache.streampark.common.util.Logger
+import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
import org.apache.streampark.common.util.Utils.tryWithResource
import org.apache.streampark.flink.kubernetes.KubernetesRetriever
import io.fabric8.kubernetes.api.model.Pod
@@ -82,25 +81,23 @@ object KubernetesDeploymentHelper extends Logger {
}
}
- def watchDeploymentLog(nameSpace: String, jobName: String): String = {
+ def watchDeploymentLog(nameSpace: String, jobName: String, jobId: String): String = {
tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
- Try {
- val projectPath = new File("").getCanonicalPath
- val path = s"$projectPath/${nameSpace}_$jobName.log"
- val file = new File(path)
- val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
- Files.asCharSink(file, Charsets.UTF_8).write(log)
- path
- }.getOrElse(null)
- }(error => throw error)
+ val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
+ val path = s"$projectPath/$jobId.log"
+ val file = new File(path)
+ val log = client.apps.deployments.inNamespace(nameSpace).withName(jobName).getLog
+ Files.asCharSink(file, Charsets.UTF_8).write(log)
+ path
+ }
}
- def watchPodTerminatedLog(nameSpace: String, jobName: String): String = {
+ def watchPodTerminatedLog(nameSpace: String, jobName: String, jobId: String): String = {
tryWithResource(KubernetesRetriever.newK8sClient()) { client =>
Try {
val podName = getPods(nameSpace, jobName).head.getMetadata.getName
- val projectPath = new File("").getCanonicalPath
- val path = s"$projectPath/${nameSpace}_${jobName}_err.log"
+ val projectPath = SystemPropertyUtils.get("java.io.tmpdir", "temp")
+ val path = s"$projectPath/${jobId}_err.log"
val file = new File(path)
val log = client.pods.inNamespace(nameSpace).withName(podName).terminated().withPrettyOutput.getLog
Files.asCharSink(file, Charsets.UTF_8).write(log)
diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
index acaec9594..d4bd7e923 100644
--- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
+++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala
@@ -272,7 +272,7 @@ class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfi
FlinkJobState.K8S_INITIALIZING
} else if (isConnection) {
logger.info("Enter the task failure deletion process.")
- KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId)
+ KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId, trackId.jobId)
KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, trackId.clusterId)
IngressController.deleteIngress(trackId.namespace, trackId.clusterId)
FlinkJobState.FAILED