You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/07 02:53:12 UTC
[incubator-streampark] branch dev updated: [bug] flinkjob program arguments bug fixed (#1976)
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new ecc7d300b [bug] flinkjob program arguments bug fixed (#1976)
ecc7d300b is described below
commit ecc7d300bc1e0000683c9d253fb0cf3741f860db
Author: benjobs <be...@apache.org>
AuthorDate: Mon Nov 7 10:53:06 2022 +0800
[bug] flinkjob program arguments bug fixed (#1976)
---
.../flink/submit/trait/FlinkSubmitTrait.scala | 79 ++++++++--------
.../flink/submit/test/ParameterTestCase.scala | 105 ++++++++++-----------
2 files changed, 90 insertions(+), 94 deletions(-)
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
index 757ac7520..414959668 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/main/scala/org/apache/streampark/flink/submit/trait/FlinkSubmitTrait.scala
@@ -361,55 +361,52 @@ trait FlinkSubmitTrait extends Logger {
val programArgs = new ArrayBuffer[String]()
if (StringUtils.isNotEmpty(submitRequest.args)) {
-
- val array = submitRequest.args.split("\\s")
- val argsArray = new ArrayBuffer[String]()
- val tempBuffer = new ArrayBuffer[String]()
-
- def processElement(index: Int, num: Int): Unit = {
-
- if (index == array.length) {
- if (tempBuffer.nonEmpty) {
- argsArray += tempBuffer.mkString(" ")
+ val multiLineChar = "\"\"\""
+ val array = submitRequest.args.split("\\s+")
+ if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
+ array.foreach(programArgs +=)
+ } else {
+ val argsArray = new ArrayBuffer[String]()
+ val tempBuffer = new ArrayBuffer[String]()
+
+ def processElement(index: Int, multiLine: Boolean): Unit = {
+ if (index == array.length) {
+ if (tempBuffer.nonEmpty) {
+ argsArray += tempBuffer.mkString(" ")
+ }
+ return
}
- return
- }
-
- val next = index + 1
- val elem = array(index)
-
- if (elem.trim.nonEmpty) {
- if (num == 0) {
- if (elem.startsWith("'")) {
- tempBuffer += elem
- processElement(next, 1)
- } else if (elem.startsWith("\"")) {
- tempBuffer += elem
- processElement(next, 2)
+ val next = index + 1
+ val elem = array(index)
+
+ if (elem.trim.nonEmpty) {
+ if (!multiLine) {
+ if (elem.startsWith(multiLineChar)) {
+ tempBuffer += elem.drop(3)
+ processElement(next, true)
+ } else {
+ argsArray += elem
+ processElement(next, false)
+ }
} else {
- argsArray += elem
- processElement(next, 0)
+ if (elem.endsWith(multiLineChar)) {
+ tempBuffer += elem.dropRight(3)
+ argsArray += tempBuffer.mkString(" ")
+ tempBuffer.clear()
+ processElement(next, false)
+ } else {
+ tempBuffer += elem
+ processElement(next, multiLine)
+ }
}
} else {
tempBuffer += elem
- val end1 = elem.endsWith("'") && num == 1
- val end2 = elem.endsWith("\"") && num == 2
- if (end1 || end2) {
- argsArray += tempBuffer.mkString(" ")
- tempBuffer.clear()
- processElement(next, 0)
- } else {
- processElement(next, num)
- }
+ processElement(next, false)
}
- } else {
- tempBuffer += elem
- processElement(next, 0)
}
+ processElement(0, false)
+ argsArray.foreach(x => programArgs += x.trim)
}
-
- processElement(0, 0)
- argsArray.foreach(x => programArgs += x.trim.replaceAll("^[\"|']|[\"|']$", ""))
}
if (submitRequest.applicationType == ApplicationType.STREAMPARK_FLINK) {
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
index 4b97ef9b9..f87fc9242 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-core/src/test/scala/org/apache/streampark/flink/submit/test/ParameterTestCase.scala
@@ -16,6 +16,7 @@
*/
package org.apache.streampark.flink.submit.test
+import org.apache.commons.lang3.StringUtils
import org.apache.flink.api.java.utils.ParameterTool
import org.junit.jupiter.api.{Assertions, Test}
@@ -44,68 +45,66 @@ class ParameterTestCase {
}
@Test def testExtractProgramArgs(): Unit = {
+ val argsStr = "--host localhost:8123\n\n\n" +
+ "--sql \"\"\"insert into table_a select * from table_b\"\"\"\n" +
+ "--c d\r\n" +
+ "--x yyy"
+ val programArgs = new ArrayBuffer[String]()
+ if (StringUtils.isNotEmpty(argsStr)) {
+ val multiLineChar = "\"\"\""
+ val array = argsStr.split("\\s+")
+ if (array.filter(_.startsWith(multiLineChar)).isEmpty) {
+ array.foreach(programArgs +=)
+ } else {
+ val argsArray = new ArrayBuffer[String]()
+ val tempBuffer = new ArrayBuffer[String]()
- val argsStr = "--url localhost:8123 \n" +
- "--insertSql1 'insert \'\'into default.test values (?,?,?,?,?)' \n" +
- "--insertSql2 'insert into default.test values (1,2,3,4,\"111\")'\n " +
- "--insertSql2 \"insert into default.test values (1,2,3,4,\'111\')\" \n" +
- "--insertSql2 'insert into default.test values (1,2,3,4,\"111\", \'22\', \'\')'"
-
- val array = argsStr.split("\\s")
- val argsArray = new ArrayBuffer[String]()
- val tempBuffer = new ArrayBuffer[String]()
-
- def processElement(index: Int, num: Int): Unit = {
-
- if (index == array.length) {
- if (tempBuffer.nonEmpty) {
- argsArray += tempBuffer.mkString(" ")
- }
- return
- }
-
- val next = index + 1
- val elem = array(index)
+ def processElement(index: Int, multiLine: Boolean): Unit = {
- if (elem.trim.nonEmpty) {
- if (num == 0) {
- if (elem.startsWith("'")) {
- tempBuffer += elem
- processElement(next, 1)
- } else if (elem.startsWith("\"")) {
- tempBuffer += elem
- processElement(next, 2)
- } else {
- argsArray += elem
- processElement(next, 0)
+ if (index == array.length) {
+ if (tempBuffer.nonEmpty) {
+ argsArray += tempBuffer.mkString(" ")
+ }
+ return
}
- } else {
- tempBuffer += elem
- val end1 = elem.endsWith("'") && num == 1
- val end2 = elem.endsWith("\"") && num == 2
- if (end1 || end2) {
- argsArray += tempBuffer.mkString(" ")
- tempBuffer.clear()
- processElement(next, 0)
+
+ val next = index + 1
+ val elem = array(index)
+
+ if (elem.trim.nonEmpty) {
+ if (!multiLine) {
+ if (elem.startsWith(multiLineChar)) {
+ tempBuffer += elem.drop(3)
+ processElement(next, true)
+ } else {
+ argsArray += elem
+ processElement(next, false)
+ }
+ } else {
+ if (elem.endsWith(multiLineChar)) {
+ tempBuffer += elem.dropRight(3)
+ argsArray += tempBuffer.mkString(" ")
+ tempBuffer.clear()
+ processElement(next, false)
+ } else {
+ tempBuffer += elem
+ processElement(next, multiLine)
+ }
+ }
} else {
- processElement(next, num)
+ tempBuffer += elem
+ processElement(next, false)
}
}
- } else {
- tempBuffer += elem
- processElement(next, 0)
+
+ processElement(0, false)
+ argsArray.foreach(x => programArgs += x.trim)
}
}
- processElement(0, 0)
-
- val programArgs = argsArray.map(_.trim.replaceAll("^[\"|']|[\"|']$", "")).toList
-
Assertions.assertEquals("localhost:8123", programArgs(1))
- Assertions.assertEquals("insert \'\'into default.test values (?,?,?,?,?)", programArgs(3))
- Assertions.assertEquals("insert into default.test values (1,2,3,4,\"111\")", programArgs(5))
- Assertions.assertEquals("insert into default.test values (1,2,3,4,\'111\')", programArgs(7))
- Assertions.assertEquals("insert into default.test values (1,2,3,4,\"111\", \'22\', \'\')", programArgs(9))
-
+ Assertions.assertEquals("insert into table_a select * from table_b", programArgs(3))
+ Assertions.assertEquals("d", programArgs(5))
+ Assertions.assertEquals("yyy", programArgs(7))
}
}