You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by "RocMarshal (via GitHub)" <gi...@apache.org> on 2023/02/26 14:33:26 UTC

[GitHub] [incubator-streampark] RocMarshal commented on a diff in pull request #2365: [Refactor] refactor flinkClient

RocMarshal commented on code in PR #2365:
URL: https://github.com/apache/incubator-streampark/pull/2365#discussion_r1118096276


##########
streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/FlinkClientHandler.scala:
##########
@@ -25,55 +25,55 @@ object FlinkClientHandler {
 
   def submit(submitInfo: SubmitRequest): SubmitResponse = {
     submitInfo.executionMode match {
-      case ExecutionMode.LOCAL => LocalSubmit.submit(submitInfo)
-      case ExecutionMode.REMOTE => RemoteSubmit.submit(submitInfo)
-      case ExecutionMode.YARN_APPLICATION => YarnApplicationSubmit.submit(submitInfo)
-      case ExecutionMode.YARN_SESSION => YarnSessionSubmit.submit(submitInfo)
-      case ExecutionMode.YARN_PER_JOB => YarnPerJobSubmit.submit(submitInfo)
-      case ExecutionMode.KUBERNETES_NATIVE_SESSION => KubernetesNativeSessionSubmit.submit(submitInfo)
-      case ExecutionMode.KUBERNETES_NATIVE_APPLICATION => KubernetesNativeApplicationSubmit.submit(submitInfo)
+      case ExecutionMode.LOCAL => LocalClient.submit(submitInfo)
+      case ExecutionMode.REMOTE => RemoteClient.submit(submitInfo)
+      case ExecutionMode.YARN_APPLICATION => YarnApplicationClient.submit(submitInfo)
+      case ExecutionMode.YARN_SESSION => YarnSessionClient.submit(submitInfo)
+      case ExecutionMode.YARN_PER_JOB => YarnPerJobClient.submit(submitInfo)
+      case ExecutionMode.KUBERNETES_NATIVE_SESSION => KubernetesNativeSessionClient.submit(submitInfo)
+      case ExecutionMode.KUBERNETES_NATIVE_APPLICATION => KubernetesNativeApplicationClient.submit(submitInfo)
       case _ => throw new UnsupportedOperationException(s"Unsupported ${submitInfo.executionMode} submit ")

Review Comment:
   There's some steal condition in the client.
   1. The ExecutionMode field is required for every existed XXXRequest and behavior like these lines.
   2. every request need a method to drive the action bounded with `xxxClient`
   3. special ExecutionMode with special xxxClient.
   
   If we want to remove the case lines , which was mentioned in the `trigger savepoint PR`, here's 2 optional raw strategy from me:
   
   1. hold a map to store the `ExecutionMode -> Concrete FlinkClientTrait`, then use mapping to judge the `Concrete FlinkClientTrait` to drive corresponding method. move the relation mapping between `xxxRequest` and `method name` into the concrete `xxxRequest` with  behavior.
   In this way, if we introduce a new client action, we need   
   -  add a `xxxRequest` first,
   -  add a  xxxrequest->method name  mapping.
   -  then, add a method declaration in `FlinkClientTrait` , 
   - next,  add the corresponding method implementations in `xxxClient` with its' executionMode,  
   - finally, add the same method for `FlinkClientHandler` ()
   In other words, we only reduce the redundant case lines in every method of `FlinkClientHandler` by this way. but can't follow the OCP rule well.
   
   2.  Introduce  a abstract base class that has `identifier` behavior to represent the method name and a `getFlinkClientTrait`  behavior to represent the mapping from `execution mode` to `Concrete FlinkClientTrait`.
   And make `FlinkClientHandler` as a child class of `FlinkClientTrait`,which is helpful to unify the interface about Client classes. 
   In this way, if we introduce a new client action, we need   
   -  add a `xxxRequest` first, we need add the required implementations from the above abstract parent, 
   -  then, add a method declaration in `FlinkClientTrait` , 
   - next,  add the corresponding method implementations in all of children object difinetion.
   
   We can reduce the redundant case lines in every method of `FlinkClientHandler` by this way and follow the OCP rule better. However, the `method name` attribute of xxxRequest used to drive in the xxxClient is a little weird for me.
   
   If I'm wrong, correct me, please. Any suggestion is appreciated.
   Thx ~



##########
streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/FlinkClient.scala:
##########
@@ -17,144 +17,59 @@
 
 package org.apache.streampark.flink.client
 
-import java.util
-import java.util.{Map => JavaMap}
-import java.util.regex.Pattern
-import javax.annotation.Nonnull
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.commons.lang3.StringUtils
-
-import org.apache.streampark.common.util.{Logger, Utils}
+import org.apache.streampark.common.conf.FlinkVersion
+import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.proxy.FlinkShimsProxy
 import org.apache.streampark.flink.client.bean._
 
-object FlinkClient extends Logger {
-
-  private[this] lazy val PROPERTY_PATTERN = Pattern.compile("(.*?)=(.*?)")
-
-  private[this] lazy val MULTI_PROPERTY_REGEXP = "-D(.*?)\\s*=\\s*[\\\"|'](.*)[\\\"|']"
+import scala.language.{implicitConversions, reflectiveCalls}
+import scala.reflect.ClassTag
 
-  private[this] lazy val MULTI_PROPERTY_PATTERN = Pattern.compile(MULTI_PROPERTY_REGEXP)
+object FlinkClient extends Logger {
 
   private[this] val FLINK_CLIENT_HANDLER_CLASS_NAME = "org.apache.streampark.flink.client.FlinkClientHandler"
 
-  private[this] val SUBMIT_REQUEST_CLASS_NAME = "org.apache.streampark.flink.client.bean.SubmitRequest"
+  private[this] val SUBMIT_REQUEST = "org.apache.streampark.flink.client.bean.SubmitRequest" -> "submit"
 
-  private[this] val DEPLOY_REQUEST_CLASS_NAME = "org.apache.streampark.flink.client.bean.DeployRequest"
+  private[this] val DEPLOY_REQUEST = "org.apache.streampark.flink.client.bean.DeployRequest" -> "deploy"
 
-  private[this] val CANCEL_REQUEST_CLASS_NAME = "org.apache.streampark.flink.client.bean.CancelRequest"
+  private[this] val CANCEL_REQUEST = "org.apache.streampark.flink.client.bean.CancelRequest" -> "cancel"
 
-  private[this] val SHUTDOWN_REQUEST_CLASS_NAME = "org.apache.streampark.flink.client.bean.ShutDownRequest"
+  private[this] val SHUTDOWN_REQUEST = "org.apache.streampark.flink.client.bean.ShutDownRequest" -> "shutdown"
 
-  private[this] val SAVEPOINT_REQUEST_CLASS_NAME = "org.apache.streampark.flink.client.bean.TriggerSavepointRequest"
+  private[this] val SAVEPOINT_REQUEST = "org.apache.streampark.flink.client.bean.TriggerSavepointRequest" -> "triggerSavepoint"

Review Comment:
   How about making the mapping be abstracted as a behavior and  fall the mapping info into the concrete  Request implementation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org