You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by GitBox <gi...@apache.org> on 2022/10/14 09:19:16 UTC

[GitHub] [incubator-streampark] MonsterChenzhuo opened a new issue, #1841: [Feature] FlinkJobStatusWatcher

MonsterChenzhuo opened a new issue, #1841:
URL: https://github.com/apache/incubator-streampark/issues/1841

   ### Search before asking
   
   - [X] I had searched in the [feature](https://github.com/apache/streampark/issues?q=is%3Aissue+label%3A%22Feature%22) and found no similar feature requirement.
   
   
   ### Description
   
   FlinkJobStatusWatcher 线程模式设计有缺陷。
   ```java
   /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements.  See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License.  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
   
   package org.apache.streampark.flink.kubernetes.watcher
   
   import org.apache.streampark.common.util.Logger
   import org.apache.streampark.flink.kubernetes.enums.FlinkJobState
   import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode.{APPLICATION, SESSION}
   import org.apache.streampark.flink.kubernetes.event.FlinkJobStatusChangeEvent
   import org.apache.streampark.flink.kubernetes.model._
   import org.apache.streampark.flink.kubernetes.{ChangeEventBus, FlinkTrackController, IngressController, JobStatusWatcherConfig, KubernetesRetriever}
   import org.apache.streampark.flink.kubernetes.helper.KubernetesDeploymentHelper
   import org.apache.hc.client5.http.fluent.Request
   import org.apache.hc.core5.util.Timeout
   import org.apache.streampark.archives.FetchArchives
   import org.json4s.{DefaultFormats, JNothing, JNull}
   import org.json4s.JsonAST.JArray
   import org.json4s.jackson.JsonMethods.parse
   
   import java.nio.charset.StandardCharsets
   import java.util.concurrent.{Executors, ScheduledFuture, TimeUnit}
   import javax.annotation.Nonnull
   import javax.annotation.concurrent.ThreadSafe
   import scala.concurrent.duration.DurationLong
   import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutorService, Future}
   import scala.language.{implicitConversions, postfixOps}
   import scala.util.{Failure, Success, Try}
   
   /**
    * Watcher for continuously monitor flink job status on kubernetes-mode,
    * the traced flink identifiers from FlinkTrackCachePool.trackIds, the traced
    * result of flink jobs status would written to FlinkTrackCachePool.jobStatuses.
    *
    */
   @ThreadSafe
   class FlinkJobStatusWatcher(conf: JobStatusWatcherConfig = JobStatusWatcherConfig.defaultConf)
                              (implicit val trackController: FlinkTrackController,
                               implicit val eventBus: ChangeEventBus) extends Logger with FlinkWatcher {
   
     private val trackTaskExecPool = Executors.newWorkStealingPool()
     private implicit val trackTaskExecutor: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(trackTaskExecPool)
   
     private val timerExec = Executors.newSingleThreadScheduledExecutor()
     private var timerSchedule: ScheduledFuture[_] = _
   
     /**
      * stop watcher process
      */
     override def doStart(): Unit = {
       timerSchedule = timerExec.scheduleAtFixedRate(() => doWatch(), 0, conf.requestIntervalSec, TimeUnit.SECONDS)
       logInfo("[flink-k8s] FlinkJobStatusWatcher started.")
     }
   
     /**
      * stop watcher process
      */
     override def doStop(): Unit = {
       // interrupt all running threads
       timerSchedule.cancel(true)
       logInfo("[flink-k8s] FlinkJobStatusWatcher stopped.")
     }
   
     /**
      * closes resource, relinquishing any underlying resources.
      */
     override def doClose(): Unit = {
       timerExec.shutdownNow()
       trackTaskExecutor.shutdownNow()
       logInfo("[flink-k8s] FlinkJobStatusWatcher closed.")
     }
   
     /**
      * single flink job status tracking task
      */
     override def doWatch(): Unit = {
   
       logInfo("========整个流程执行开始:"+Thread.currentThread().getName+"========")
       // get all legal tracking ids
       val trackIds = Try(trackController.collectAllTrackIds()).filter(_.nonEmpty).getOrElse(return)
       logInfo("======"+trackIds.toString()+"======")
       // retrieve flink job status in thread pool
       val tracksFuture: Set[Future[Option[JobStatusCV]]] = trackIds.map { id =>
   
         val future = Future {
           id.executeMode match {
             case SESSION => touchSessionJob(id)
             case APPLICATION => touchApplicationJob(id)
           }
         }
   
         future onComplete (_.getOrElse(None) match {
           case Some(jobState) =>
             val trackId = id.copy(jobId = jobState.jobId)
             val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
             if (latest == null || latest.jobState != jobState.jobState || latest.jobId != jobState.jobId) {
               // put job status to cache
               trackController.jobStatuses.put(trackId, jobState)
               // set jobId to trackIds
               trackController.trackIds.update(trackId)
               eventBus.postSync(FlinkJobStatusChangeEvent(trackId, jobState))
             }
             if (FlinkJobState.isEndState(jobState.jobState)) {
               // remove trackId from cache of job that needs to be untracked
               trackController.unTracking(trackId)
               if (trackId.executeMode == APPLICATION) {
                 trackController.endpoints.invalidate(trackId.toClusterKey)
               }
             }
           case _ =>
         })
         future
       }
   
       // blocking until all future are completed or timeout is reached
       Try(Await.ready(Future.sequence(tracksFuture), conf.requestTimeoutSec seconds))
         .failed.map { _ =>
         logInfo(s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes mode timeout," +
           s" limitSeconds=${conf.requestTimeoutSec}," +
           s" trackIds=${trackIds.mkString(",")}")
       }
       logInfo("========整个流程执行完成一遍:"+Thread.currentThread().getName+"========")
     }
   
     /**
      * Get flink status information from kubernetes-native-session cluster.
      * When the flink-cluster-client request fails, the job state would be
      * LOST or SILENT.
      *
      * This method can be called directly from outside, without affecting the
      * current cachePool result.
      */
     def touchSessionJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
       val pollEmitTime = System.currentTimeMillis
       val clusterId = trackId.clusterId
       val namespace = trackId.namespace
       val appId = trackId.appId
       val jobId = trackId.jobId
   
       val rsMap = touchSessionAllJob(clusterId, namespace, appId).toMap
       val id = TrackId.onSession(namespace, clusterId, appId, jobId)
       val jobState = rsMap.get(id).filter(_.jobState != FlinkJobState.SILENT).getOrElse {
         val preCache = trackController.jobStatuses.get(id)
         val state = inferSilentOrLostFromPreCache(preCache)
         val nonFirstSilent = state == FlinkJobState.SILENT && preCache != null && preCache.jobState == FlinkJobState.SILENT
         if (nonFirstSilent) {
           JobStatusCV(jobState = state, jobId = id.jobId, pollEmitTime = preCache.pollEmitTime, pollAckTime = preCache.pollAckTime)
         } else {
           JobStatusCV(jobState = state, jobId = id.jobId, pollEmitTime = pollEmitTime, pollAckTime = System.currentTimeMillis)
         }
       }
   
       Some(jobState)
     }
   
     /**
      * Get all flink job status information from kubernetes-native-session cluster.
      * The empty array will returned when the k8s-client or flink-cluster-client
      * request fails.
      *
      * This method can be called directly from outside, without affecting the
      * current cachePool result.
      */
     protected[kubernetes] def touchSessionAllJob(@Nonnull clusterId: String, @Nonnull namespace: String, @Nonnull appId: Long): Array[(TrackId, JobStatusCV)] = {
       lazy val defaultResult = Array.empty[(TrackId, JobStatusCV)]
       val pollEmitTime = System.currentTimeMillis
       val jobDetails = listJobsDetails(ClusterKey(SESSION, namespace, clusterId)).getOrElse(return defaultResult).jobs
       if (jobDetails.isEmpty) {
         defaultResult
       } else {
         jobDetails.map { d =>
           TrackId.onSession(namespace, clusterId, appId, d.jid) -> d.toJobStatusCV(pollEmitTime, System.currentTimeMillis)
         }
       }
     }
   
     /**
      * Get flink status information from kubernetes-native-application cluster.
      * When the flink-cluster-client request fails, will infer the job statue
      * from k8s events.
      *
      * This method can be called directly from outside, without affecting the
      * current cachePool result.
      */
     def touchApplicationJob(@Nonnull trackId: TrackId): Option[JobStatusCV] = {
       implicit val pollEmitTime: Long = System.currentTimeMillis
       val clusterId = trackId.clusterId
       val namespace = trackId.namespace
       val jobDetails = listJobsDetails(ClusterKey(APPLICATION, namespace, clusterId))
       if (jobDetails.isEmpty || jobDetails.get.jobs.isEmpty) {
         inferApplicationFlinkJobStateFromK8sEvent(trackId)
       } else {
         Some(jobDetails.get.jobs.head.toJobStatusCV(pollEmitTime, System.currentTimeMillis))
       }
     }
   
     /**
      * list flink jobs details
      */
     private def listJobsDetails(clusterKey: ClusterKey): Option[JobDetails] = {
       // get flink rest api
       Try {
         val clusterRestUrl = trackController.getClusterRestUrl(clusterKey).filter(_.nonEmpty).getOrElse(return None)
         // list flink jobs from rest api
         callJobsOverviewsApi(clusterRestUrl)
       }.getOrElse {
         logger.warn("Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.")
         val clusterRestUrl = trackController.refreshClusterRestUrl(clusterKey).getOrElse(return None)
         Try(callJobsOverviewsApi(clusterRestUrl)) match {
           case Success(s) =>
             logger.info("The retry is successful.")
             s
           case Failure(e) =>
             logger.warn(s"The retry fetch failed, final status failed, errorStack=${e.getMessage}.")
             None
         }
       }
     }
   
     /**
      * list flink jobs details from rest api
      */
     private def callJobsOverviewsApi(restUrl: String): Option[JobDetails] = {
       val jobDetails = JobDetails.as(
         Request.get(s"$restUrl/jobs/overview")
           .connectTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_REST_AWAIT_TIMEOUT_SEC))
           .responseTimeout(Timeout.ofSeconds(KubernetesRetriever.FLINK_CLIENT_TIMEOUT_SEC))
           .execute.returnContent().asString(StandardCharsets.UTF_8)
       )
       jobDetails
     }
   
     /**
      * Infer the current flink state from the last relevant k8s events.
      * This method is only used for application-mode job inference in
      * case of a failed JM rest request.
      */
     private def inferApplicationFlinkJobStateFromK8sEvent(@Nonnull trackId: TrackId)
                                                          (implicit pollEmitTime: Long): Option[JobStatusCV] = {
       this.synchronized {
         // infer from k8s deployment and event
         val latest: JobStatusCV = trackController.jobStatuses.get(trackId)
         logger.info(s"Query the local cache result:${trackController.canceling.has(trackId).toString},trackId ${trackId.toString}.")
         val jobState = {
           if (trackController.canceling.has(trackId)) FlinkJobState.CANCELED else {
             // whether deployment exists on kubernetes cluster
             val isDeployExists = KubernetesRetriever.isDeploymentExists(trackId.clusterId, trackId.namespace)
             val deployStateOfTheError = KubernetesDeploymentHelper.getDeploymentStatusChanges(trackId.namespace, trackId.clusterId)
             val isConnection = KubernetesDeploymentHelper.isTheK8sConnectionNormal()
   
             if (isDeployExists) {
               if (!deployStateOfTheError) {
                 logger.info("Task Enter the initialization process.")
                 FlinkJobState.K8S_INITIALIZING
               } else if (isConnection) {
                 logger.info("Enter the task failure deletion process.")
                 KubernetesDeploymentHelper.watchPodTerminatedLog(trackId.namespace, trackId.clusterId)
                 KubernetesDeploymentHelper.deleteTaskDeployment(trackId.namespace, trackId.clusterId)
                 IngressController.deleteIngress(trackId.namespace, trackId.clusterId)
                 FlinkJobState.FAILED
               } else {
                 inferSilentOrLostFromPreCache(latest)
               }
             } else if (isConnection) {
               logger.info("The deployment is deleted and enters the task failure process.")
               FlinkJobState.of(FetchArchives.fetchArchives("002c63091ecc95bd395260dbbdc69d2a"))
             } else {
               inferSilentOrLostFromPreCache(latest)
             }
   
         }
       }
   
       val jobStatusCV = JobStatusCV(
         jobState = jobState,
         jobId = null,
         pollEmitTime = pollEmitTime,
         pollAckTime = System.currentTimeMillis
       )
   
       if (jobState == FlinkJobState.SILENT && latest != null && latest.jobState == FlinkJobState.SILENT) {
         Some(jobStatusCV.copy(pollEmitTime = latest.pollEmitTime, pollAckTime = latest.pollAckTime))
       } else {
         Some(jobStatusCV)
       }
       }
     }
   
     private[this] def inferSilentOrLostFromPreCache(preCache: JobStatusCV) = preCache match {
       case preCache if preCache == null => FlinkJobState.SILENT
       case preCache if preCache.jobState == FlinkJobState.SILENT &&
         System.currentTimeMillis() - preCache.pollAckTime >= conf.silentStateJobKeepTrackingSec * 1000 => FlinkJobState.LOST
       case _ => FlinkJobState.SILENT
     }
   
   }
   
   object FlinkJobStatusWatcher {
   
     private val effectEndStates: Seq[FlinkJobState.Value] = FlinkJobState.endingStates.filter(_ != FlinkJobState.LOST)
   
     /**
      * infer flink job state before persistence.
      * so drama, so sad.
      *
      * @param current  current flink job state
      * @param previous previous flink job state from persistent storage
      */
     def inferFlinkJobStateFromPersist(current: FlinkJobState.Value, previous: FlinkJobState.Value): FlinkJobState.Value = {
       current match {
         case FlinkJobState.LOST => if (effectEndStates.contains(current)) previous else FlinkJobState.TERMINATED
         case FlinkJobState.POS_TERMINATED | FlinkJobState.TERMINATED => previous match {
           case FlinkJobState.CANCELLING => FlinkJobState.CANCELED
           case FlinkJobState.FAILING => FlinkJobState.FAILED
           case _ => if (current == FlinkJobState.POS_TERMINATED) FlinkJobState.FINISHED else FlinkJobState.TERMINATED
         }
         case _ => current
       }
     }
   
   }
   
   private[kubernetes] case class JobDetails(jobs: Array[JobDetail] = Array())
   
   
   private[kubernetes] case class JobDetail(jid: String,
                                            name: String,
                                            state: String,
                                            startTime: Long,
                                            endTime: Long,
                                            duration: Long,
                                            lastModification: Long,
                                            tasks: JobTask) {
     def toJobStatusCV(pollEmitTime: Long, pollAckTime: Long): JobStatusCV = {
       JobStatusCV(
         jobState = FlinkJobState.of(state),
         jobId = jid,
         jobName = name,
         jobStartTime = startTime,
         jobEndTime = endTime,
         duration = duration,
         taskTotal = tasks.total,
         pollEmitTime = pollEmitTime,
         pollAckTime = pollAckTime)
     }
   }
   
   private[kubernetes] case class JobTask(total: Int,
                                          created: Int,
                                          scheduled: Int,
                                          deploying: Int,
                                          running: Int,
                                          finished: Int,
                                          canceling: Int,
                                          canceled: Int,
                                          failed: Int,
                                          reconciling: Int,
                                          initializing: Int)
   
   
   private[kubernetes] object JobDetails {
   
     @transient
     implicit lazy val formats: DefaultFormats.type = org.json4s.DefaultFormats
   
     def as(json: String): Option[JobDetails] = {
   
       Try(parse(json)) match {
         case Success(ok) =>
           ok \ "jobs" match {
             case JNothing | JNull => None
             case JArray(arr) =>
               val details = arr.map(x => {
                 val task = x \ "tasks"
                 JobDetail(
                   (x \ "jid").extractOpt[String].orNull,
                   (x \ "name").extractOpt[String].orNull,
                   (x \ "state").extractOpt[String].orNull,
                   (x \ "start-time").extractOpt[Long].getOrElse(0),
                   (x \ "end-time").extractOpt[Long].getOrElse(0),
                   (x \ "duration").extractOpt[Long].getOrElse(0),
                   (x \ "last-modification").extractOpt[Long].getOrElse(0),
                   JobTask(
                     (task \ "total").extractOpt[Int].getOrElse(0),
                     (task \ "created").extractOpt[Int].getOrElse(0),
                     (task \ "scheduled").extractOpt[Int].getOrElse(0),
                     (task \ "deploying").extractOpt[Int].getOrElse(0),
                     (task \ "running").extractOpt[Int].getOrElse(0),
                     (task \ "finished").extractOpt[Int].getOrElse(0),
                     (task \ "canceling").extractOpt[Int].getOrElse(0),
                     (task \ "canceled").extractOpt[Int].getOrElse(0),
                     (task \ "failed").extractOpt[Int].getOrElse(0),
                     (task \ "reconciling").extractOpt[Int].getOrElse(0),
                     (task \ "initializing").extractOpt[Int].getOrElse(0)
                   )
                 )
               }).toArray
               Some(JobDetails(details))
             case _ => None
           }
         case Failure(_) => None
       }
   
     }
   
   }
   
   ```
   
   首先上运行日志:
   ========整个流程执行开始:pool-7-thread-1========
   2022-10-14 17:15:51 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ======Set(TrackId(kubernetes-application,native-flink,test33,100000,null))======
   2022-10-14 17:15:51 | WARN  | ForkJoinPool-1-worker-4 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:220] Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.
   2022-10-14 17:15:51 | INFO  | ForkJoinPool-1-worker-4 | org.apache.flink.kubernetes.KubernetesClusterDescriptor:147] Retrieve flink cluster test33 successfully, JobManager Web Interface: http://10.216.138.19:30998
   2022-10-14 17:15:52 | INFO  | ForkJoinPool-1-worker-4 | org.apache.streampark.flink.kubernetes.KubernetesRetriever:131] retrieve flink jobManager rest url: http://10.216.138.19:30998
   2022-10-14 17:16:02 | WARN  | ForkJoinPool-1-worker-4 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:227] The retry fetch failed, final status failed, errorStack=Connect to http://10.216.138.19:30998 [/10.216.138.19] failed: connect timed out.
   2022-10-14 17:16:02 | INFO  | ForkJoinPool-1-worker-4 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:256] Query the local cache result:false,trackId TrackId(kubernetes-application,native-flink,test33,100000,null).
   2022-10-14 17:16:02 | INFO  | ForkJoinPool-1-worker-4 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:278] The deployment is deleted and enters the task failure process.
   /jobs/overview
   {"jobs":[{"jid":"002c63091ecc95bd395260dbbdc69d2a","name":"SQLJob-passport-api_kp_ziroom_com_dpLogin-1594315680","state":"FINISHED","start-time":1594315681845,"end-time":1594315691522,"duration":9677,"last-modification":1594315691522,"tasks":{"total":2,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":1,"reconciling":0}}]}
   ==============FINISHED==============
   2022-10-14 17:16:02 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行完成一遍:pool-7-thread-1========
   2022-10-14 17:16:02 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行开始:pool-7-thread-1========
   2022-10-14 17:16:02 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ======Set(TrackId(kubernetes-application,native-flink,test33,100000,null))======
   2022-10-14 17:16:03 | ERROR | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.KubernetesRetriever:71] [StreamPark] Get flinkClient error, the error is: java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the rest endpoint of test33
   2022-10-14 17:16:03 | INFO  | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:256] Query the local cache result:false,trackId TrackId(kubernetes-application,native-flink,test33,100000,null).
   2022-10-14 17:16:03 | INFO  | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:278] The deployment is deleted and enters the task failure process.
   /jobs/overview
   {"jobs":[{"jid":"002c63091ecc95bd395260dbbdc69d2a","name":"SQLJob-passport-api_kp_ziroom_com_dpLogin-1594315680","state":"FINISHED","start-time":1594315681845,"end-time":1594315691522,"duration":9677,"last-modification":1594315691522,"tasks":{"total":2,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":1,"reconciling":0}}]}
   ==============FINISHED==============
   2022-10-14 17:16:03 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行完成一遍:pool-7-thread-1========
   2022-10-14 17:16:03 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行开始:pool-7-thread-1========
   2022-10-14 17:16:03 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ======Set(TrackId(kubernetes-application,native-flink,test33,100000,null))======
   2022-10-14 17:16:03 | ERROR | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.KubernetesRetriever:71] [StreamPark] Get flinkClient error, the error is: java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the rest endpoint of test33
   2022-10-14 17:16:03 | INFO  | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:256] Query the local cache result:false,trackId TrackId(kubernetes-application,native-flink,test33,100000,null).
   2022-10-14 17:16:04 | INFO  | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:278] The deployment is deleted and enters the task failure process.
   /jobs/overview
   {"jobs":[{"jid":"002c63091ecc95bd395260dbbdc69d2a","name":"SQLJob-passport-api_kp_ziroom_com_dpLogin-1594315680","state":"FINISHED","start-time":1594315681845,"end-time":1594315691522,"duration":9677,"last-modification":1594315691522,"tasks":{"total":2,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":1,"reconciling":0}}]}
   ==============FINISHED==============
   2022-10-14 17:16:04 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行完成一遍:pool-7-thread-1========
   2022-10-14 17:16:04 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行开始:pool-7-thread-1========
   2022-10-14 17:16:04 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ======Set(TrackId(kubernetes-application,native-flink,test33,100000,null))======
   2022-10-14 17:16:04 | ERROR | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.KubernetesRetriever:71] [StreamPark] Get flinkClient error, the error is: java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the rest endpoint of test33
   2022-10-14 17:16:04 | INFO  | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:256] Query the local cache result:false,trackId TrackId(kubernetes-application,native-flink,test33,100000,null).
   2022-10-14 17:16:04 | INFO  | ForkJoinPool-1-worker-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:278] The deployment is deleted and enters the task failure process.
   /jobs/overview
   {"jobs":[{"jid":"002c63091ecc95bd395260dbbdc69d2a","name":"SQLJob-passport-api_kp_ziroom_com_dpLogin-1594315680","state":"FINISHED","start-time":1594315681845,"end-time":1594315691522,"duration":9677,"last-modification":1594315691522,"tasks":{"total":2,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":1,"reconciling":0}}]}
   ==============FINISHED==============
   2022-10-14 17:16:04 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行完成一遍:pool-7-thread-1========
   2022-10-14 17:16:08 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行开始:pool-7-thread-1========
   2022-10-14 17:16:13 | INFO  | pool-7-thread-1 | org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher:39] [StreamPark] ========整个流程执行开始:pool-7-thread-1========
   
   ### Usage Scenario
   
   _No response_
   
   ### Related issues
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of Conduct](https://www.apache.org/foundation/policies/conduct)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [I] [Feature] FlinkJobStatusWatcher [incubator-streampark]

Posted by "linxq1995 (via GitHub)" <gi...@apache.org>.
linxq1995 commented on issue #1841:
URL: https://github.com/apache/incubator-streampark/issues/1841#issuecomment-2028043949

   你好,kubernetes application方式提交任务,遇到jobmanger ui无法访问,如何解决?
   日志:
   The retry fetch failed, final status failed, errorStack=Connect to http://10.216.138.19:30998/ [/10.216.138.19] failed: connect timed out.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] wolfboys closed issue #1841: [Feature] FlinkJobStatusWatcher

Posted by GitBox <gi...@apache.org>.
wolfboys closed issue #1841: [Feature] FlinkJobStatusWatcher 
URL: https://github.com/apache/incubator-streampark/issues/1841


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [incubator-streampark] MonsterChenzhuo commented on issue #1841: [Feature] FlinkJobStatusWatcher

Posted by GitBox <gi...@apache.org>.
MonsterChenzhuo commented on issue #1841:
URL: https://github.com/apache/incubator-streampark/issues/1841#issuecomment-1284304239

   解决方案:使用java1.8的forkjoin机制,将一个trackIds列表,通过递归的方式进行拆分,这个拆分粒度设置为默认50,也就是说,如果我有500个任务,当执行到状态判断逻辑时,会将原来的单线程遍历访问jobmanage,变为10个线程,去遍历各自的sub_trackIds列表,效率大大提高。
   <img width="1190" alt="图片" src="https://user-images.githubusercontent.com/60029759/196754552-03ed1fb1-8d6b-4dbe-99f1-90df1db9a99a.png">
   <img width="601" alt="图片" src="https://user-images.githubusercontent.com/60029759/196754575-a80a80b0-3c09-4c51-a380-3c32396be599.png">
   
   Solution: Use java1.8 forkjoin mechanism, a list of trackIds, through the recursive way to split, this split granularity is set to the default 50, that is, if I have 500 tasks, when the implementation of the status judgment logic, will be the original single-threaded traversal to access the jobmanage, into 10 threads, to traverse the respective sub_trackIds list, which is much more efficient.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org