You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@streampark.apache.org by GitBox <gi...@apache.org> on 2022/10/12 09:24:07 UTC

[GitHub] [incubator-streampark] wolfboys commented on a diff in pull request #1820: [bug] After optimizing the core parameter. job submit failed bug fix

wolfboys commented on code in PR #1820:
URL: https://github.com/apache/incubator-streampark/pull/1820#discussion_r993219187


##########
streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala:
##########
@@ -127,50 +127,62 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     localTableEnv
   }
 
-
   /**
    * In case of table SQL, the parameter conf is not required, it depends on the developer.
    */
+
   override def initParameter(): (ParameterTool, Configuration) = {
-    val (userParameter: ParameterTool, flinkConf: Configuration) = super.initParameter()
-    (userParameter.get(KEY_FLINK_SQL()) match {
-      case null => userParameter
+    val (appParameter: ParameterTool, flinkConf: Configuration) = {
+      val argsMap = ParameterTool.fromArgs(args)
+      argsMap.get(KEY_APP_CONF(), null) match {
+        case null | "" =>
+          logWarn("Usage:can't fond config,you can set \"--conf $path \" in main arguments")
+          ParameterTool.fromSystemProperties().mergeWith(argsMap) -> new Configuration()
+        case file => super.parseConfig(file)
+      }
+    }
+
+    val appParam = appParameter.get(KEY_FLINK_SQL()) match {
+      case null => appParameter
       case param =>
         // for streampark-console
         Try(DeflaterUtils.unzipString(param)) match {
-          case Success(value) => userParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+          case Success(value) => appParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
           case Failure(_) =>
             val sqlFile = new File(param)
             Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
-              case Success(value) => userParameter.mergeWith(ParameterTool.fromMap(value))
+              case Success(value) => appParameter.mergeWith(ParameterTool.fromMap(value))
               case Failure(e) =>
                 new IllegalArgumentException(s"[StreamPark] init sql error.$e")
-                userParameter
+                appParameter
             }
         }
-    }, flinkConf)
+    }
+
+    appParam -> flinkConf
   }
 
   def initEnvironment(tableMode: TableMode): Unit = {
     val builder = EnvironmentSettings.newInstance()
-    val plannerType = Try(PlannerType.withName(userParameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse {
-      logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by: blinkPlanner")
-      PlannerType.blink
-    }
-
-    plannerType match {
-      case PlannerType.blink =>
-        logInfo("blinkPlanner will be use.")
-        builder.useBlinkPlanner()
-      case PlannerType.old =>
-        logInfo("oldPlanner will be use.")
-        builder.useOldPlanner()
-      case PlannerType.any =>
-        logInfo("anyPlanner will be use.")
-        builder.useAnyPlanner()
+    val plannerType = Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse(PlannerType.blink)
+
+    try {
+      plannerType match {
+        case PlannerType.blink =>
+          logInfo("blinkPlanner will be use.")
+          builder.useBlinkPlanner()
+        case PlannerType.old =>
+          logInfo("oldPlanner will be use.")
+          builder.useOldPlanner()
+        case PlannerType.any =>
+          logInfo("anyPlanner will be use.")
+          builder.useAnyPlanner()
+      }
+    } catch {
+      case e: IncompatibleClassChangeError =>

Review Comment:
   The API related to flink table env is public and stable, in the short term, only this part will be affected, in the long term, as you said, we need a set of standard specifications to solve these problems, How about you take care of this part of the job? Discuss it fully in the email. It can be done in several steps. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@streampark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org