You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by GitBox <gi...@apache.org> on 2022/10/14 09:19:16 UTC
[GitHub] [incubator-streampark] MonsterChenzhuo opened a new issue, #1841: [Feature] FlinkJobStatusWatcher
MonsterChenzhuo opened a new issue, #1841:
URL: https://github.com/apache/incubator-streampark/issues/1841
### Search before asking
- [X] I had searched in the [feature](https://github.com/apache/streampark/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement.
### Description
FlinkJobStatusWatcher 线程模式设计有缺陷。
```java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.streampark.flink.kubernetes.watcher
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
import org.apache.streampark.flink.kubernetes.model._
import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkTrackController, IngressController, JobStatusWatcherConfig, KubernetesRetriever}
import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
import org.apache.hc.client5.http.fluent.Request
import org.apache.hc.core5.util.Timeout
import org.apache.streampark.archives.FetchArchives
import org.json4s.{DefaultFormats, JNothing, JNull}
import org.json4s.JsonAST.JArray
import org.json4s.jackson.JsonMethods.parse
import java.nio.charset.StandardCharsets
import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
import javax.annotation.Nonnull
import javax.annotation.concurrent.ThreadSafe
import scala.concurrent.duration.DurationLong
import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
import scala.language.{implicitConversions, postfixOps}
import scala.util.{Failure, Success, Try}
/**
* Watcher for continuously monitor flink job status on kubernetes-mode,
* the traced flink identifiers from FlinkTrackCachePool.trackIds, the traced
* result of flink jobs status would written to FlinkTrackCachePool.jobStatuses.
*
*/
@ThreadSafe
class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfig.defaultConf)
(implicit val trackController: FlinkTrackController,
implicit val eventBus: ChangeEventBus) extends Logger with FlinkWatcher {
private val trackTaskExecPool = Executors.newWorkStealingPool()
private implicit val trackTaskExecutor: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(trackTaskExecPool)
private val timerExec = Executors.newSingleThreadScheduledExecutor()
private var timerSchedule: ScheduledFuture[_] = _
/**
* stop watcher process
*/
override def doStart(): Unit = {
timerSchedule = timerExec.scheduleAtFixedRate(() => doWatch(), 0, conf.requestIntervalSec, TimeUnit.SECONDS)
logInfo("[flink-k8s] FlinkJobStatusWatcher started.")
}
/**
* stop watcher process
*/
override def doStop(): Unit = {
// interrupt all running threads
timerSchedule.cancel(true)
logInfo("[flink-k8s] FlinkJobStatusWatcher stopped.")
}
/**
* closes resource, relinquishing any underlying resources.
*/
override def doClose(): Unit = {
timerExec.shutdownNow()
trackTaskExecutor.shutdownNow()
logInfo("[flink-k8s] FlinkJobStatusWatcher closed.")
}
/**
* single flink job status tracking task
*/
override def doWatch(): Unit = {
logInfo("========整个流程执行开始:"+Thread.currentThread().getName+"========")
// get all legal tracking ids
val trackIds = Try(trackController.collectAllTrackIds()).filter(_.nonEmpty).getOrElse(return)
logInfo("======"+trackIds.toString()+"======")
// retrieve flink job status in thread pool
val tracksFuture: Set[Future[Option[JobStatusCV]]] = trackIds.map { id =>
val future = Future {
id.executeMode match {
case SESSION => touchSessionJob(id)
case APPLICATION => touchApplicationJob(id)
}
}
future onComplete (_.getOrElse(None) match {
case Some(jobState) =>
val trackId = id.copy(jobId = jobState.jobId)
val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
if (latest == null || latest.jobState != jobState.jobState || latest.jobId != jobState.jobId) {
// put job status to cache
trackController.jobStatuses.put(trackId, jobState)
// set jobId to trackIds
trackController.trackIds.update(trackId)
eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
}
if (FlinkJobState.isEndState(jobState.jobState)) {
// remove trackId from cache of job that needs to be untracked
trackController.unTracking(trackId)
if (trackId.executeMode == APPLICATION) {
trackController.endpoints.invalidate(trackId.toClusterKey)
}
}
case _ =>
})
future
}
// blocking until all future are completed or timeout is reached
Try(Await.ready(Future.sequence(tracksFuture), conf.requestTimeoutSec seconds))
.failed.map { _ =>
logInfo(s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes mode timeout," +
s" limitSeconds=${conf.requestTimeoutSec}," +
s" trackIds=${trackIds.mkString(",")}")
}
logInfo("========整个流程执行完成一遍:"+Thread.currentThread().getName+"========")
}
/**
* Get flink status information from kubernetes-native-session cluster.
* When the flink-cluster-client request fails, the job state would be
* LOST or SILENT.
*
* This method can be called directly from outside, without affecting the
* current cachePool result.
*/
def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
val pollEmitTime = System.currentTimeMillis
val clusterId = trackId.clusterId
val namespace = trackId.namespace
val appId = trackId.appId
val jobId = trackId.jobId
val rsMap = touchSessionAllJob(clusterId, namespace, appId).toMap
val id = TrackId.onSession(namespace, clusterId, appId, jobId)
val jobState = rsMap.get(id).filter(_.jobState != FlinkJobState.SILENT).getOrElse {
val preCache = trackController.jobStatuses.get(id)
val state = inferSilentOrLostFromPreCache(preCache)
val nonFirstSilent = state == FlinkJobState.SILENT && preCache != null && preCache.jobState == FlinkJobState.SILENT
if (nonFirstSilent) {
JobStatusCV(jobState = state, jobId = id.jobId, pollEmitTime = preCache.pollEmitTime, pollAckTime = preCache.pollAckTime)
} else {
JobStatusCV(jobState = state, jobId = id.jobId, pollEmitTime = pollEmitTime, pollAckTime = System.currentTimeMillis)
}
}
Some(jobState)
}
/**
* Get all flink job status information from kubernetes-native-session cluster.
* The empty array will returned when the k8s-client or flink-cluster-client
* request fails.
*
* This method can be called directly from outside, without affecting the
* current cachePool result.
*/
protected[kubernetes] def touchSessionAllJob(@Nonnull clusterId: String, @Nonnull namespace: String, @Nonnull appId: Long): Array[(TrackId, JobStatusCV)] = {
lazy val defaultResult = Array.empty[(TrackId, JobStatusCV)]
val pollEmitTime = System.currentTimeMillis
val jobDetails = listJobsDetails(ClusterKey(SESSION, namespace, clusterId)).getOrElse(return defaultResult).jobs
if (jobDetails.isEmpty) {
defaultResult
} else {
jobDetails.map { d =>
TrackId.onSession(namespace, clusterId, appId, d.jid) -> d.toJobStatusCV(pollEmitTime, System.currentTimeMillis)
}
}
}
/**
* Get flink status information from kubernetes-native-application cluster.
* When the flink-cluster-client request fails, will infer the job statue
* from k8s events.
*
* This method can be called directly from outside, without affecting the
* current cachePool result.
*/
def touchApplicationJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
implicit val pollEmitTime: Long = System.currentTimeMillis
val clusterId = trackId.clusterId
val namespace = trackId.namespace
val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace, clusterId))
if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
inferApplicationFlinkJobStateFromK8sEvent(trackId)
} else {
Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime, System.currentTimeMillis))
}
}
/**
* list flink jobs details
*/
private def listJobsDetails(clusterKey: ClusterKey): Option[JobDetails] = {
// get flink rest api
Try {
val clusterRestUrl = trackController.getClusterRestUrl(clusterKey).filter(_.nonEmpty).getOrElse(return None)
// list flink jobs from rest api
callJobsOverviewsApi(clusterRestUrl)
}.getOrElse {
logger.warn("Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.")
val clusterRestUrl = trackController.refreshClusterRestUrl(clusterKey).getOrElse(return None)
Try(callJobsOverviewsApi(clusterRestUrl)) match {
case Success(s) =>
logger.info("The retry is successful.")
s
case Failure(e) =>
logger.warn(s"The retry fetch failed, final status failed, errorStack=${e.getMessage}.")
None
}
}
}
/**
* list flink jobs details from rest api
*/
private def callJobsOverviewsApi(restUrl: String): Option[JobDetails] = {
val jobDetails = JobDetails.as(
Request.get(s"$restUrl/jobs/overview")
.connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
.responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
.execute.returnContent().asString(StandardCharsets.UTF_8)
)
jobDetails
}
/**
* Infer the current flink state from the last relevant k8s events.
* This method is only used for application-mode job inference in
* case of a failed JM rest request.
*/
private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId: TrackId)
(implicit pollEmitTime: Long): Option[JobStatusCV] = {
this.synchronized {
// infer from k8s deployment and event
val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
logger.info(s"Query the local cache result:${trackController.canceling.has(trackId).toString},trackId ${trackId.toString}.")
val jobState = {
if (trackController.canceling.has(trackId)) FlinkJobState.CANCELED else {
// whether deployment exists on kubernetes cluster
val isDeployExists = KubernetesRetriever.isDeploymentExists(trackId.clusterId, trackId.namespace)
val deployStateOfTheError = KubernetesDeploymentHelper.getDeploymentStatusChanges(trackId.namespace, trackId.clusterId)
val isConnection = KubernetesDeploymentHelper.isTheK8sConnectionNormal()
if (isDeployExists) {
if (!deployStateOfTheError) {
logger.info("Task Enter the initialization process.")
FlinkJobState.K8S_INITIALIZING
} else if (isConnection) {
logger.info("Enter the task failure deletion process.")
KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId)
KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, trackId.clusterId)
IngressController.deleteIngress(trackId.namespace, trackId.clusterId)
FlinkJobState.FAILED
} else {
inferSilentOrLostFromPreCache(latest)
}
} else if (isConnection) {
logger.info("The deployment is deleted and enters the task failure process.")
FlinkJobState.of(FetchArchives.fetchArchives("002c63091ecc95bd395260dbbdc69d2a"))
} else {
inferSilentOrLostFromPreCache(latest)
}
}
}
val jobStatusCV = JobStatusCV(
jobState = jobState,
jobId = null,
pollEmitTime = pollEmitTime,
pollAckTime = System.currentTimeMillis
)
if (jobState == FlinkJobState.SILENT && latest != null && latest.jobState == FlinkJobState.SILENT) {
Some(jobStatusCV.copy(pollEmitTime = latest.pollEmitTime, pollAckTime = latest.pollAckTime))
} else {
Some(jobStatusCV)
}
}
}
private[this] def inferSilentOrLostFromPreCache(preCache: JobStatusCV) = preCache match {
case preCache if preCache == null => FlinkJobState.SILENT
case preCache if preCache.jobState == FlinkJobState.SILENT &&
System.currentTimeMillis() - preCache.pollAckTime >= conf.silentStateJobKeepTrackingSec * 1000 => FlinkJobState.LOST
case _ => FlinkJobState.SILENT
}
}
object FlinkJobStatusWatcher {
private val effectEndStates: Seq[FlinkJobState.Value] = FlinkJobState.endingStates.filter(_ != FlinkJobState.LOST)
/**
* infer flink job state before persistence.
* so drama, so sad.
*
* @param current current flink job state
* @param previous previous flink job state from persistent storage
*/
def inferFlinkJobStateFromPersist(current: FlinkJobState.Value, previous: FlinkJobState.Value): FlinkJobState.Value = {
current match {
case FlinkJobState.LOST => if (effectEndStates.contains(current)) previous else FlinkJobState.TERMINATED
case FlinkJobState.POS_TERMINATED | FlinkJobState.TERMINATED => previous match {
case FlinkJobState.CANCELLING => FlinkJobState.CANCELED
case FlinkJobState.FAILING => FlinkJobState.FAILED
case _ => if (current == FlinkJobState.POS_TERMINATED) FlinkJobState.FINISHED else FlinkJobState.TERMINATED
}
case _ => current
}
}
}
private[kubernetes] case class JobDetails(jobs: Array[JobDetail] = Array())
private[kubernetes] case class JobDetail(jid: String,
name: String,
state: String,
startTime: Long,
endTime: Long,
duration: Long,
lastModification: Long,
tasks: JobTask) {
def toJobStatusCV(pollEmitTime: Long, pollAckTime: Long): JobStatusCV = {
JobStatusCV(
jobState = FlinkJobState.of(state),
jobId = jid,
jobName = name,
jobStartTime = startTime,
jobEndTime = endTime,
duration = duration,
taskTotal = tasks.total,
pollEmitTime = pollEmitTime,
pollAckTime = pollAckTime)
}
}
private[kubernetes] case class JobTask(total: Int,
created: Int,
scheduled: Int,
deploying: Int,
running: Int,
finished: Int,
canceling: Int,
canceled: Int,
failed: Int,
reconciling: Int,
initializing: Int)
private[kubernetes] object JobDetails {
@transient
implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
def as(json: String): Option[JobDetails] = {
Try(parse(json)) match {
case Success(ok) =>
ok \ "jobs" match {
case JNothing | JNull => None
case JArray(arr) =>
val details = arr.map(x => {
val task = x \ "tasks"
JobDetail(
(x \ "jid").extractOpt[String].orNull,
(x \ "name").extractOpt[String].orNull,
(x \ "state").extractOpt[String].orNull,
(x \ "start-time").extractOpt[Long].getOrElse(0),
(x \ "end-time").extractOpt[Long].getOrElse(0),
(x \ "duration").extractOpt[Long].getOrElse(0),
(x \ "last-modification").extractOpt[Long].getOrElse(0),
JobTask(
(task \ "total").extractOpt[Int].getOrElse(0),
(task \ "created").extractOpt[Int].getOrElse(0),
(task \ "scheduled").extractOpt[Int].getOrElse(0),
(task \ "deploying").extractOpt[Int].getOrElse(0),
(task \ "running").extractOpt[Int].getOrElse(0),
(task \ "finished").extractOpt[Int].getOrElse(0),
(task \ "canceling").extractOpt[Int].getOrElse(0),
(task \ "canceled").extractOpt[Int].getOrElse(0),
(task \ "failed").extractOpt[Int].getOrElse(0),
(task \ "reconciling").extractOpt[Int].getOrElse(0),
(task \ "initializing").extractOpt[Int].getOrElse(0)
)
)
}).toArray
Some(JobDetails(details))
case _ => None
}
case Failure(_) => None
}
}
}
```
首先上运行日志:
========整个流程执行开始:pool-7-thread-1========
2022-10-14 17:15:51 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ======Set(TrackId(kubernetes-application,native-flink,test33,100000,null))======
2022-10-14 17:15:51 | WARN | ForkJoinPool-1-worker-4 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:220] Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.
2022-10-14 17:15:51 | INFO | ForkJoinPool-1-worker-4 | org.apache.flink.kubernetes.KubernetesClusterDescriptor:147] Retrieve flink cluster test33 successfully, JobManager Web Interface: http://10.216.138.19:30998
2022-10-14 17:15:52 | INFO | ForkJoinPool-1-worker-4 | org.apache.streampark.flink.kubernetes.KubernetesRetriever:131] retrieve flink jobManager rest url: http://10.216.138.19:30998
2022-10-14 17:16:02 | WARN | ForkJoinPool-1-worker-4 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:227] The retry fetch failed, final status failed, errorStack=Connect to http://10.216.138.19:30998 [/10.216.138.19] failed: connect timed out.
2022-10-14 17:16:02 | INFO | ForkJoinPool-1-worker-4 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:256] Query the local cache result:false,trackId TrackId(kubernetes-application,native-flink,test33,100000,null).
2022-10-14 17:16:02 | INFO | ForkJoinPool-1-worker-4 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:278] The deployment is deleted and enters the task failure process.
/jobs/overview
{"jobs":[{"jid":"002c63091ecc95bd395260dbbdc69d2a","name":"SQLJob-passport-api_kp_ziroom_com_dpLogin-1594315680","state":"FINISHED","start-time":1594315681845,"end-time":1594315691522,"duration":9677,"last-modification":1594315691522,"tasks":{"total":2,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":1,"reconciling":0}}]}
==============FINISHED==============
2022-10-14 17:16:02 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行完成一遍:pool-7-thread-1========
2022-10-14 17:16:02 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行开始:pool-7-thread-1========
2022-10-14 17:16:02 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ======Set(TrackId(kubernetes-application,native-flink,test33,100000,null))======
2022-10-14 17:16:03 | ERROR | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.KubernetesRetriever:71] [StreamPark] Get flinkClient error, the error is: java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the rest endpoint of test33
2022-10-14 17:16:03 | INFO | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:256] Query the local cache result:false,trackId TrackId(kubernetes-application,native-flink,test33,100000,null).
2022-10-14 17:16:03 | INFO | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:278] The deployment is deleted and enters the task failure process.
/jobs/overview
{"jobs":[{"jid":"002c63091ecc95bd395260dbbdc69d2a","name":"SQLJob-passport-api_kp_ziroom_com_dpLogin-1594315680","state":"FINISHED","start-time":1594315681845,"end-time":1594315691522,"duration":9677,"last-modification":1594315691522,"tasks":{"total":2,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":1,"reconciling":0}}]}
==============FINISHED==============
2022-10-14 17:16:03 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行完成一遍:pool-7-thread-1========
2022-10-14 17:16:03 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行开始:pool-7-thread-1========
2022-10-14 17:16:03 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ======Set(TrackId(kubernetes-application,native-flink,test33,100000,null))======
2022-10-14 17:16:03 | ERROR | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.KubernetesRetriever:71] [StreamPark] Get flinkClient error, the error is: java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the rest endpoint of test33
2022-10-14 17:16:03 | INFO | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:256] Query the local cache result:false,trackId TrackId(kubernetes-application,native-flink,test33,100000,null).
2022-10-14 17:16:04 | INFO | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:278] The deployment is deleted and enters the task failure process.
/jobs/overview
{"jobs":[{"jid":"002c63091ecc95bd395260dbbdc69d2a","name":"SQLJob-passport-api_kp_ziroom_com_dpLogin-1594315680","state":"FINISHED","start-time":1594315681845,"end-time":1594315691522,"duration":9677,"last-modification":1594315691522,"tasks":{"total":2,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":1,"reconciling":0}}]}
==============FINISHED==============
2022-10-14 17:16:04 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行完成一遍:pool-7-thread-1========
2022-10-14 17:16:04 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行开始:pool-7-thread-1========
2022-10-14 17:16:04 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ======Set(TrackId(kubernetes-application,native-flink,test33,100000,null))======
2022-10-14 17:16:04 | ERROR | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.KubernetesRetriever:71] [StreamPark] Get flinkClient error, the error is: java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the rest endpoint of test33
2022-10-14 17:16:04 | INFO | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:256] Query the local cache result:false,trackId TrackId(kubernetes-application,native-flink,test33,100000,null).
2022-10-14 17:16:04 | INFO | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:278] The deployment is deleted and enters the task failure process.
/jobs/overview
{"jobs":[{"jid":"002c63091ecc95bd395260dbbdc69d2a","name":"SQLJob-passport-api_kp_ziroom_com_dpLogin-1594315680","state":"FINISHED","start-time":1594315681845,"end-time":1594315691522,"duration":9677,"last-modification":1594315691522,"tasks":{"total":2,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":1,"reconciling":0}}]}
==============FINISHED==============
2022-10-14 17:16:04 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行完成一遍:pool-7-thread-1========
2022-10-14 17:16:08 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行开始:pool-7-thread-1========
2022-10-14 17:16:13 | INFO | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行开始:pool-7-thread-1========
### Usage Scenario
_No response_
### Related issues
_No response_
### Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
Re: [I] [Feature] FlinkJobStatusWatcher [incubator-streampark]
Posted by "linxq1995 (via GitHub)" <gi...@apache.org>.
linxq1995 commented on issue #1841:
URL: https://github.com/apache/incubator-streampark/issues/1841#issuecomment-2028043949
你好,kubernetes application方式提交任务,遇到jobmanger ui无法访问,如何解决?
日志:
The retry fetch failed, final status failed, errorStack=Connect to http://10.216.138.19:30998/ [/10.216.138.19] failed: connect timed out.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-streampark] wolfboys closed issue #1841: [Feature] FlinkJobStatusWatcher
Posted by GitBox <gi...@apache.org>.
wolfboys closed issue #1841: [Feature] FlinkJobStatusWatcher
URL: https://github.com/apache/incubator-streampark/issues/1841
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [incubator-streampark] MonsterChenzhuo commented on issue #1841: [Feature] FlinkJobStatusWatcher
Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on issue #1841:
URL: https://github.com/apache/incubator-streampark/issues/1841#issuecomment-1284304239
解决方案:使用java1.8的forkjoin机制,将一个trackIds列表,通过递归的方式进行拆分,这个拆分粒度设置为默认50,也就是说,如果我有500个任务,当执行到状态判断逻辑时,会将原来的单线程遍历访问jobmanage,变为10个线程,去遍历各自的sub_trackIds列表,效率大大提高。
<img width="1190" alt="图片" src="https://user-images.githubusercontent.com/60029759/196754552-03ed1fb1-8d6b-4dbe-99f1-90df1db9a99a.png">
<img width="601" alt="图片" src="https://user-images.githubusercontent.com/60029759/196754575-a80a80b0-3c09-4c51-a380-3c32396be599.png">
Solution: Use java1.8 forkjoin mechanism, a list of trackIds, through the recursive way to split, this split granularity is set to the default 50, that is, if I have 500 tasks, when the implementation of the status judgment logic, will be the original single-threaded traversal to access the jobmanage, into 10 threads, to traverse the respective sub_trackIds list, which is much more efficient.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org