You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2023/07/31 12:44:00 UTC

[incubator-streampark] branch dev updated: YarnSessionClient code optimization (#2911)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new a672ffe27 YarnSessionClient code optimization (#2911)
a672ffe27 is described below

commit a672ffe278bf5be8183433f8dc80b818821f3fa1
Author: ChengJie1053 <18...@163.com>
AuthorDate: Mon Jul 31 20:43:53 2023 +0800

    YarnSessionClient code optimization (#2911)
    
    
    * YarnSessionClient code optimization
---
 .../org/apache/streampark/flink/client/impl/YarnSessionClient.scala | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index fbcb38c4c..40a278e40 100644
--- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -21,6 +21,7 @@ import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.client.`trait`.YarnClientTrait
 import org.apache.streampark.flink.client.bean._
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.JobID
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
 import org.apache.flink.client.program.{ClusterClient, PackagedProgram}
@@ -37,7 +38,6 @@ import java.util
 
 import scala.collection.JavaConverters._
 import scala.collection.convert.ImplicitConversions._
-import scala.collection.mutable.ListBuffer
 
 /** Submit Job to YARN Session Cluster */
 object YarnSessionClient extends YarnClientTrait {
@@ -207,11 +207,11 @@ object YarnSessionClient extends YarnClientTrait {
       deployClusterConfig(deployRequest, flinkConfig)
       val yarnClusterDescriptor = getSessionClusterDeployDescriptor(flinkConfig)
       clusterDescriptor = yarnClusterDescriptor._2
-      if (null != deployRequest.clusterId && deployRequest.clusterId.nonEmpty) {
+      if (StringUtils.isNotBlank(deployRequest.clusterId)) {
         try {
           val applicationStatus =
             clusterDescriptor.getYarnClient
-              .getApplicationReport(ConverterUtils.toApplicationId(deployRequest.clusterId))
+              .getApplicationReport(ApplicationId.fromString(deployRequest.clusterId))
               .getFinalApplicationStatus
           if (FinalApplicationStatus.UNDEFINED.equals(applicationStatus)) {
             // application is running