You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/07 02:53:12 UTC

[incubator-streampark] branch dev updated: [bug] flinkjob program arguments bug fixed (#1976)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new ecc7d300b [bug] flinkjob program arguments bug fixed (#1976)
ecc7d300b is described below

commit ecc7d300bc1e0000683c9d253fb0cf3741f860db
Author: benjobs <be...@apache.org>
AuthorDate: Mon Nov 7 10:53:06 2022 +0800

    [bug] flinkjob program arguments bug fixed (#1976)
---
 .../flink/submit/trait/FlinkSubmitTrait.scala      |  79 ++++++++--------
 .../flink/submit/test/ParameterTestCase.scala      | 105 ++++++++++-----------
 2 files changed, 90 insertions(+), 94 deletions(-)

diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 757ac7520..414959668 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -361,55 +361,52 @@ trait FlinkSubmitTrait extends Logger {
     val programArgs = new ArrayBuffer[String]()
 
     if (StringUtils.isNotEmpty(submitRequest.args)) {
-
-      val array = submitRequest.args.split("\\s")
-      val argsArray = new ArrayBuffer[String]()
-      val tempBuffer = new ArrayBuffer[String]()
-
-      def processElement(index: Int, num: Int): Unit = {
-
-        if (index == array.length) {
-          if (tempBuffer.nonEmpty) {
-            argsArray += tempBuffer.mkString(" ")
+      val multiLineChar = "\"\"\""
+      val array = submitRequest.args.split("\\s+")
+      if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
+        array.foreach(programArgs +=)
+      } else {
+        val argsArray = new ArrayBuffer[String]()
+        val tempBuffer = new ArrayBuffer[String]()
+
+        def processElement(index: Int, multiLine: Boolean): Unit = {
+          if (index == array.length) {
+            if (tempBuffer.nonEmpty) {
+              argsArray += tempBuffer.mkString(" ")
+            }
+            return
           }
-          return
-        }
-
-        val next = index + 1
-        val elem = array(index)
-
-        if (elem.trim.nonEmpty) {
-          if (num == 0) {
-            if (elem.startsWith("'")) {
-              tempBuffer += elem
-              processElement(next, 1)
-            } else if (elem.startsWith("\"")) {
-              tempBuffer += elem
-              processElement(next, 2)
+          val next = index + 1
+          val elem = array(index)
+
+          if (elem.trim.nonEmpty) {
+            if (!multiLine) {
+              if (elem.startsWith(multiLineChar)) {
+                tempBuffer += elem.drop(3)
+                processElement(next, true)
+              } else {
+                argsArray += elem
+                processElement(next, false)
+              }
             } else {
-              argsArray += elem
-              processElement(next, 0)
+              if (elem.endsWith(multiLineChar)) {
+                tempBuffer += elem.dropRight(3)
+                argsArray += tempBuffer.mkString(" ")
+                tempBuffer.clear()
+                processElement(next, false)
+              } else {
+                tempBuffer += elem
+                processElement(next, multiLine)
+              }
             }
           } else {
             tempBuffer += elem
-            val end1 = elem.endsWith("'") && num == 1
-            val end2 = elem.endsWith("\"") && num == 2
-            if (end1 || end2) {
-              argsArray += tempBuffer.mkString(" ")
-              tempBuffer.clear()
-              processElement(next, 0)
-            } else {
-              processElement(next, num)
-            }
+            processElement(next, false)
           }
-        } else {
-          tempBuffer += elem
-          processElement(next, 0)
         }
+        processElement(0, false)
+        argsArray.foreach(x => programArgs += x.trim)
       }
-
-      processElement(0, 0)
-      argsArray.foreach(x => programArgs += x.trim.replaceAll("^[\"|']|[\"|']$", ""))
     }
 
     if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
index 4b97ef9b9..f87fc9242 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.streampark.flink.submit.test
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.java.utils.ParameterTool
 import org.junit.jupiter.api.{Assertions, Test}
 
@@ -44,68 +45,66 @@ class ParameterTestCase {
   }
 
   @Test def testExtractProgramArgs(): Unit = {
+    val argsStr = "--host localhost:8123\n\n\n" +
+      "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" +
+      "--c d\r\n" +
+      "--x yyy"
+    val programArgs = new ArrayBuffer[String]()
+    if (StringUtils.isNotEmpty(argsStr)) {
+      val multiLineChar = "\"\"\""
+      val array = argsStr.split("\\s+")
+      if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
+        array.foreach(programArgs +=)
+      } else {
+        val argsArray = new ArrayBuffer[String]()
+        val tempBuffer = new ArrayBuffer[String]()
 
-    val argsStr = "--url localhost:8123 \n" +
-      "--insertSql1 'insert \'\'into default.test values (?,?,?,?,?)' \n" +
-      "--insertSql2 'insert into default.test values (1,2,3,4,\"111\")'\n " +
-      "--insertSql2 \"insert into default.test values (1,2,3,4,\'111\')\" \n" +
-      "--insertSql2 'insert into default.test values (1,2,3,4,\"111\", \'22\', \'\')'"
-
-    val array = argsStr.split("\\s")
-    val argsArray = new ArrayBuffer[String]()
-    val tempBuffer = new ArrayBuffer[String]()
-
-    def processElement(index: Int, num: Int): Unit = {
-
-      if (index == array.length) {
-        if (tempBuffer.nonEmpty) {
-          argsArray += tempBuffer.mkString(" ")
-        }
-        return
-      }
-
-      val next = index + 1
-      val elem = array(index)
+        def processElement(index: Int, multiLine: Boolean): Unit = {
 
-      if (elem.trim.nonEmpty) {
-        if (num == 0) {
-          if (elem.startsWith("'")) {
-            tempBuffer += elem
-            processElement(next, 1)
-          } else if (elem.startsWith("\"")) {
-            tempBuffer += elem
-            processElement(next, 2)
-          } else {
-            argsArray += elem
-            processElement(next, 0)
+          if (index == array.length) {
+            if (tempBuffer.nonEmpty) {
+              argsArray += tempBuffer.mkString(" ")
+            }
+            return
           }
-        } else {
-          tempBuffer += elem
-          val end1 = elem.endsWith("'") && num == 1
-          val end2 = elem.endsWith("\"") && num == 2
-          if (end1 || end2) {
-            argsArray += tempBuffer.mkString(" ")
-            tempBuffer.clear()
-            processElement(next, 0)
+
+          val next = index + 1
+          val elem = array(index)
+
+          if (elem.trim.nonEmpty) {
+            if (!multiLine) {
+              if (elem.startsWith(multiLineChar)) {
+                tempBuffer += elem.drop(3)
+                processElement(next, true)
+              } else {
+                argsArray += elem
+                processElement(next, false)
+              }
+            } else {
+              if (elem.endsWith(multiLineChar)) {
+                tempBuffer += elem.dropRight(3)
+                argsArray += tempBuffer.mkString(" ")
+                tempBuffer.clear()
+                processElement(next, false)
+              } else {
+                tempBuffer += elem
+                processElement(next, multiLine)
+              }
+            }
           } else {
-            processElement(next, num)
+            tempBuffer += elem
+            processElement(next, false)
           }
         }
-      } else {
-        tempBuffer += elem
-        processElement(next, 0)
+
+        processElement(0, false)
+        argsArray.foreach(x => programArgs += x.trim)
       }
     }
 
-    processElement(0, 0)
-
-    val programArgs = argsArray.map(_.trim.replaceAll("^[\"|']|[\"|']$", "")).toList
-
     Assertions.assertEquals("localhost:8123", programArgs(1))
-    Assertions.assertEquals("insert \'\'into default.test values (?,?,?,?,?)", programArgs(3))
-    Assertions.assertEquals("insert into default.test values (1,2,3,4,\"111\")", programArgs(5))
-    Assertions.assertEquals("insert into default.test values (1,2,3,4,\'111\')", programArgs(7))
-    Assertions.assertEquals("insert into default.test values (1,2,3,4,\"111\", \'22\', \'\')", programArgs(9))
-
+    Assertions.assertEquals("insert into table_a select * from table_b", programArgs(3))
+    Assertions.assertEquals("d", programArgs(5))
+    Assertions.assertEquals("yyy", programArgs(7))
   }
 }