You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/29 06:48:27 UTC

[incubator-seatunnel] branch dev updated: [Feature][seatunnel-transforms] Add Uuid transform for spark (#1770)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 64f513e8 [Feature][seatunnel-transforms] Add Uuid transform for spark (#1770)
64f513e8 is described below

commit 64f513e82ba7048180d6a424a33c9720e7ba4678
Author: wuzhenhua <10...@users.noreply.github.com>
AuthorDate: Fri Apr 29 14:48:22 2022 +0800

    [Feature][seatunnel-transforms] Add Uuid transform for spark (#1770)
    
    * [Feature][seatunnel-transforms] Add Uuid transform for spark and fix some error at replace doc
    * Refactored transform config parameter constants and default values
    Co-authored-by: xiaowu <cx...@gmail.com>
---
 docs/en/transform/replace.md                       | 10 +--
 docs/en/transform/uuid.md                          | 62 +++++++++++++++++++
 seatunnel-core/seatunnel-core-spark/pom.xml        |  6 ++
 .../seatunnel-transforms-spark/pom.xml             |  1 +
 .../apache/seatunnel/spark/transform/Json.scala    | 22 ++++---
 .../seatunnel/spark/transform/JsonConfig.scala}    | 26 +++-----
 .../apache/seatunnel/spark/transform/Replace.scala | 26 ++++----
 .../seatunnel/spark/transform/ReplaceConfig.scala} | 29 ++++-----
 .../seatunnel/spark/transform/TestReplace.scala    |  1 +
 .../apache/seatunnel/spark/transform/Split.scala   | 24 ++++----
 .../seatunnel/spark/transform/SplitConfig.scala}   | 26 +++-----
 .../org/apache/seatunnel/spark/transform/Sql.scala |  1 +
 .../{ => seatunnel-transform-spark-uuid}/pom.xml   | 33 +++++++---
 .../org.apache.seatunnel.spark.BaseSparkTransform  | 18 ++++++
 .../apache/seatunnel/spark/transform/UUID.scala}   | 71 +++++++++++-----------
 .../seatunnel/spark/transform/UUIDConfig.scala}    | 26 +++-----
 .../seatunnel/spark/transform/TestUUID.scala}      | 31 ++++++----
 17 files changed, 256 insertions(+), 157 deletions(-)

diff --git a/docs/en/transform/replace.md b/docs/en/transform/replace.md
index 8286007c..ecfce6dd 100644
--- a/docs/en/transform/replace.md
+++ b/docs/en/transform/replace.md
@@ -1,4 +1,4 @@
-# Json
+# Replace
 
 ## Description
 
@@ -33,13 +33,13 @@ The name of the field to replaced.
 
 The string to match.
 
-### is_regex [string]
+### replacement [string]
 
-Whether or not to interpret the pattern as a regex (true) or string literal (false).
+The replacement pattern (is_regex is true) or string literal (is_regex is false).
 
-### replacement [boolean]
+### is_regex [boolean]
 
-The replacement pattern (is_regex is true) or string literal (is_regex is false).
+Whether or not to interpret the pattern as a regex (true) or string literal (false).
 
 ### replace_first [boolean]
 
diff --git a/docs/en/transform/uuid.md b/docs/en/transform/uuid.md
new file mode 100644
index 00000000..4633a843
--- /dev/null
+++ b/docs/en/transform/uuid.md
@@ -0,0 +1,62 @@
+# UUID
+
+## Description
+
+Generate a universally unique identifier on a specified field.
+
+:::tip
+
+This transform **ONLY** supported by Spark.
+
+:::
+
+## Options
+
+| name           | type   | required | default value |
+| -------------- | ------ | -------- | ------------- |
+| fields         | string | yes      | -             |
+| prefix         | string | no       | -             |
+| secure         | boolean| no       | false         |
+
+### field [string]
+
+The name of the field to generate.
+
+### prefix [string]
+
+The prefix string constant to prepend to each generated UUID.
+
+### secure [boolean]
+
+the cryptographically secure algorithm can be comparatively slow
+The nonSecure algorithm uses a secure random seed but is otherwise deterministic
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform Plugin](common-options.mdx) for details
+
+## Examples
+
+```bash
+  UUID {
+    fields = "u"
+    prefix = "uuid-"
+    secure = true
+  }
+}
+```
+
+Use `UUID` as udf in sql.
+
+```bash
+  UUID {
+    fields = "u"
+    prefix = "uuid-"
+    secure = true
+  }
+
+  # Use the uuid function (confirm that the fake table exists)
+  sql {
+    sql = "select * from (select raw_message, UUID() as info_row from fake) t1"
+  }
+```
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index 53ae179d..86410e07 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -74,6 +74,12 @@
             <artifactId>seatunnel-transform-spark-replace</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transform-spark-uuid</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
index 0626dfc0..e95a4ebc 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
+++ b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
@@ -34,6 +34,7 @@
         <module>seatunnel-transform-spark-json</module>
         <module>seatunnel-transform-spark-split</module>
         <module>seatunnel-transform-spark-replace</module>
+        <module>seatunnel-transform-spark-uuid</module>
         <module>seatunnel-transform-spark-sql</module>
     </modules>
 
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
index 519be92b..63e50cee 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
@@ -14,11 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
 import org.apache.seatunnel.common.config.{Common, ConfigRuntimeException}
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.JsonConfig._
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{DataType, StructType}
 import org.apache.spark.sql.{Dataset, Row, SparkSession}
@@ -40,19 +42,19 @@ class Json extends BaseSparkTransform {
   var useCustomSchema: Boolean = false
 
   override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    val srcField = config.getString("source_field")
+    val srcField = config.getString(SOURCE_FILED)
     val spark = env.getSparkSession
 
     import spark.implicits._
 
-    config.getString("target_field") match {
+    config.getString(TARGET_FILED) match {
       case Constants.ROW_ROOT => {
 
         val jsonRDD = df.select(srcField).as[String].rdd
 
         val newDF = srcField match {
           // for backward-compatibility for spark < 2.2.0, we created rdd, not Dataset[String]
-          case "raw_message" => {
+          case DEFAULT_SOURCE_FILED => {
             val tmpDF =
               if (this.useCustomSchema) {
                 spark.read.schema(this.customSchema).json(jsonRDD)
@@ -94,14 +96,14 @@ class Json extends BaseSparkTransform {
   override def prepare(env: SparkEnvironment): Unit = {
     val defaultConfig = ConfigFactory.parseMap(
       Map(
-        "source_field" -> "raw_message",
-        "target_field" -> Constants.ROW_ROOT,
-        "schema_dir" -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString,
-        "schema_file" -> ""))
+        SOURCE_FILED -> DEFAULT_SOURCE_FILED,
+        TARGET_FILED -> Constants.ROW_ROOT,
+        SCHEMA_DIR -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString,
+        SCHEMA_FILE -> DEFAULT_SCHEMA_FILE))
     config = config.withFallback(defaultConfig)
-    val schemaFile = config.getString("schema_file")
+    val schemaFile = config.getString(SCHEMA_FILE)
     if (schemaFile.trim != "") {
-      parseCustomJsonSchema(env.getSparkSession, config.getString("schema_dir"), schemaFile)
+      parseCustomJsonSchema(env.getSparkSession, config.getString(SCHEMA_DIR), schemaFile)
     }
   }
 
@@ -136,5 +138,5 @@ class Json extends BaseSparkTransform {
     }
   }
 
-  override def getPluginName: String = "json"
+  override def getPluginName: String = PLUGIN_NAME
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala
similarity index 60%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala
index 2881b1a2..560fce0c 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala
@@ -14,23 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark.transform
-
-import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
-import org.apache.spark.sql.{Dataset, Row}
-
-class Sql extends BaseSparkTransform {
 
-  override def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    env.getSparkSession.sql(config.getString("sql"))
-  }
-
-  override def checkConfig(): CheckResult = {
-    checkAllExists(config, "sql")
-  }
+package org.apache.seatunnel.spark.transform
 
-  override def getPluginName: String = "sql"
+object JsonConfig {
+  val PLUGIN_NAME = "json"
 
+  val FIELDS = "fields"
+  val SOURCE_FILED = "source_field"
+  val DEFAULT_SOURCE_FILED = "raw_message"
+  val TARGET_FILED = "target_field"
+  val SCHEMA_DIR = "schema_dir"
+  val SCHEMA_FILE = "schema_file"
+  val DEFAULT_SCHEMA_FILE = ""
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
index 609dec15..27e6cf9c 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
 import scala.collection.JavaConversions._
@@ -26,42 +27,43 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.ReplaceConfig._
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions.{col, udf}
 
 class Replace extends BaseSparkTransform {
   override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    val srcField = config.getString("source_field")
-    val key = config.getString("fields")
+    val srcField = config.getString(SOURCE_FILED)
+    val key = config.getString(FIELDS)
 
     val func: UserDefinedFunction = udf((s: String) => {
       replace(
         s,
-        config.getString("pattern"),
-        config.getString("replacement"),
-        config.getBoolean("is_regex"),
-        config.getBoolean("replace_first"))
+        config.getString(PATTERN),
+        config.getString(REPLACEMENT),
+        config.getBoolean(REPLACE_REGEX),
+        config.getBoolean(REPLACE_FIRST))
     })
     var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
     filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP))
     val ds = filterDf.drop(Constants.ROW_TMP)
     if (func != null) {
-      env.getSparkSession.udf.register("Replace", func)
+      env.getSparkSession.udf.register(UDF_NAME, func)
     }
     ds
   }
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "fields", "pattern", "replacement")
+    checkAllExists(config, FIELDS, PATTERN, REPLACEMENT)
   }
 
   override def prepare(env: SparkEnvironment): Unit = {
     val defaultConfig = ConfigFactory.parseMap(
       Map(
-        "source_field" -> "raw_message",
-        "is_regex" -> false,
-        "replace_first" -> false))
+        SOURCE_FILED -> DEFAULT_SOURCE_FILED,
+        REPLACE_REGEX -> DEFAULT_REPLACE_REGEX,
+        REPLACE_FIRST -> DEFAULT_REPLACE_FIRST))
     config = config.withFallback(defaultConfig)
   }
 
@@ -83,4 +85,6 @@ class Replace extends BaseSparkTransform {
   }
 
   implicit def toReg(pattern: String): Regex = pattern.r
+
+  override def getPluginName: String = PLUGIN_NAME
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala
similarity index 60%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala
index 2881b1a2..51e15153 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala
@@ -14,23 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark.transform
-
-import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
-import org.apache.spark.sql.{Dataset, Row}
-
-class Sql extends BaseSparkTransform {
 
-  override def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    env.getSparkSession.sql(config.getString("sql"))
-  }
-
-  override def checkConfig(): CheckResult = {
-    checkAllExists(config, "sql")
-  }
+package org.apache.seatunnel.spark.transform
 
-  override def getPluginName: String = "sql"
+object ReplaceConfig {
+  val PLUGIN_NAME = "replace"
+  val UDF_NAME = "Replace"
 
+  val FIELDS = "fields"
+  val SOURCE_FILED = "source_field"
+  val DEFAULT_SOURCE_FILED = "raw_message"
+  val PATTERN = "pattern"
+  val REPLACEMENT = "replacement"
+  val REPLACE_REGEX = "is_regex"
+  val DEFAULT_REPLACE_REGEX = false
+  val REPLACE_FIRST = "replace_first"
+  val DEFAULT_REPLACE_FIRST = false
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
index dddac46a..fdb4287d 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
 import junit.framework.TestCase.assertEquals
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
index 498776c0..764c5bab 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
 import scala.collection.JavaConversions._
@@ -23,6 +24,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.SplitConfig._
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions.{col, udf}
@@ -30,15 +32,15 @@ import org.apache.spark.sql.functions.{col, udf}
 class Split extends BaseSparkTransform {
 
   override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    val srcField = config.getString("source_field")
-    val keys = config.getStringList("fields")
+    val srcField = config.getString(SOURCE_FILED)
+    val keys = config.getStringList(FIELDS)
 
     // https://stackoverflow.com/a/33345698/1145750
     var func: UserDefinedFunction = null
-    val ds = config.getString("target_field") match {
+    val ds = config.getString(TARGET_FILED) match {
       case Constants.ROW_ROOT =>
         func = udf((s: String) => {
-          split(s, config.getString("separator"), keys.size())
+          split(s, config.getString(SPLIT_SEPARATOR), keys.size())
         })
         var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
         for (i <- 0 until keys.size()) {
@@ -47,28 +49,28 @@ class Split extends BaseSparkTransform {
         filterDf.drop(Constants.ROW_TMP)
       case targetField: String =>
         func = udf((s: String) => {
-          val values = split(s, config.getString("separator"), keys.size)
+          val values = split(s, config.getString(SPLIT_SEPARATOR), keys.size)
           val kvs = (keys zip values).toMap
           kvs
         })
         df.withColumn(targetField, func(col(srcField)))
     }
     if (func != null) {
-      env.getSparkSession.udf.register("Split", func)
+      env.getSparkSession.udf.register(UDF_NAME, func)
     }
     ds
   }
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "fields")
+    checkAllExists(config, FIELDS)
   }
 
   override def prepare(env: SparkEnvironment): Unit = {
     val defaultConfig = ConfigFactory.parseMap(
       Map(
-        "separator" -> " ",
-        "source_field" -> "raw_message",
-        "target_field" -> Constants.ROW_ROOT))
+        SPLIT_SEPARATOR -> DEFAULT_SPLIT_SEPARATOR,
+        SOURCE_FILED -> DEFAULT_SOURCE_FILED,
+        TARGET_FILED -> Constants.ROW_ROOT))
     config = config.withFallback(defaultConfig)
   }
 
@@ -86,5 +88,5 @@ class Split extends BaseSparkTransform {
     filled.toSeq
   }
 
-  override def getPluginName: String = "split"
+  override def getPluginName: String = PLUGIN_NAME
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala
similarity index 60%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala
index 2881b1a2..b23036ab 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala
@@ -14,23 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark.transform
-
-import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
-import org.apache.spark.sql.{Dataset, Row}
-
-class Sql extends BaseSparkTransform {
 
-  override def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    env.getSparkSession.sql(config.getString("sql"))
-  }
-
-  override def checkConfig(): CheckResult = {
-    checkAllExists(config, "sql")
-  }
+package org.apache.seatunnel.spark.transform
 
-  override def getPluginName: String = "sql"
+object SplitConfig {
+  val PLUGIN_NAME = "split"
+  val UDF_NAME = "Split"
 
+  val FIELDS = "fields"
+  val SOURCE_FILED = "source_field"
+  val DEFAULT_SOURCE_FILED = "raw_message"
+  val TARGET_FILED = "target_field"
+  val SPLIT_SEPARATOR = "separator"
+  val DEFAULT_SPLIT_SEPARATOR = " "
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
index 2881b1a2..b15a4dd5 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
@@ -14,6 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml
similarity index 59%
copy from seatunnel-transforms/seatunnel-transforms-spark/pom.xml
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml
index 0626dfc0..f9185ef4 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml
@@ -22,19 +22,34 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-transforms</artifactId>
+        <artifactId>seatunnel-transforms-spark</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-transforms-spark</artifactId>
-    <packaging>pom</packaging>
+    <artifactId>seatunnel-transform-spark-uuid</artifactId>
 
-    <modules>
-        <module>seatunnel-transform-spark-json</module>
-        <module>seatunnel-transform-spark-split</module>
-        <module>seatunnel-transform-spark-replace</module>
-        <module>seatunnel-transform-spark-sql</module>
-    </modules>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api-spark</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform
new file mode 100644
index 00000000..281a7242
--- /dev/null
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.spark.transform.UUID
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala
similarity index 54%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala
index 609dec15..829df588 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala
@@ -14,73 +14,76 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
+import java.security.SecureRandom
+
 import scala.collection.JavaConversions._
-import scala.util.matching.Regex
 
 import com.google.common.annotations.VisibleForTesting
-import org.apache.commons.lang3.StringUtils
+import org.apache.commons.math3.random.{RandomGenerator, Well19937c}
 import org.apache.seatunnel.common.Constants
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.UUIDConfig._
 import org.apache.spark.sql.{Dataset, Row}
 import org.apache.spark.sql.expressions.UserDefinedFunction
 import org.apache.spark.sql.functions.{col, udf}
 
-class Replace extends BaseSparkTransform {
+class UUID extends BaseSparkTransform {
+  private var prng: RandomGenerator = _
+
   override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    val srcField = config.getString("source_field")
-    val key = config.getString("fields")
+    val key = config.getString(FIELDS)
 
-    val func: UserDefinedFunction = udf((s: String) => {
-      replace(
-        s,
-        config.getString("pattern"),
-        config.getString("replacement"),
-        config.getBoolean("is_regex"),
-        config.getBoolean("replace_first"))
+    val func: UserDefinedFunction = udf(() => {
+      generate(config.getString(UUID_PREFIX))
     })
-    var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
+    var filterDf = df.withColumn(Constants.ROW_TMP, func())
     filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP))
     val ds = filterDf.drop(Constants.ROW_TMP)
     if (func != null) {
-      env.getSparkSession.udf.register("Replace", func)
+      env.getSparkSession.udf.register(UDF_NAME, func)
     }
     ds
   }
 
   override def checkConfig(): CheckResult = {
-    checkAllExists(config, "fields", "pattern", "replacement")
+    checkAllExists(config, FIELDS)
   }
 
   override def prepare(env: SparkEnvironment): Unit = {
-    val defaultConfig = ConfigFactory.parseMap(
-      Map(
-        "source_field" -> "raw_message",
-        "is_regex" -> false,
-        "replace_first" -> false))
+    val defaultConfig = ConfigFactory.parseMap(Map(UUID_PREFIX -> DEFAULT_UUID_PREFIX, UUID_SECURE -> DEFAULT_UUID_SECURE))
     config = config.withFallback(defaultConfig)
+
+    /**
+     * The secure algorithm can be comparatively slow.
+     * The new nonSecure algorithm never blocks and is much faster.
+     * The nonSecure algorithm uses a secure random seed but is otherwise deterministic,
+     * though it is one of the strongest uniform pseudo-random number generators known so far.
+     * thanks for whoschek@cloudera.com
+     */
+    if (config.getBoolean(UUID_SECURE)) {
+      val rand = new SecureRandom
+      val seed = for (_ <- 0 until 728) yield rand.nextInt
+      prng = new Well19937c(seed.toArray)
+    }
   }
 
   @VisibleForTesting
-  def replace(
-      str: String,
-      pattern: String,
-      replacement: String,
-      isRegex: Boolean,
-      replaceFirst: Boolean): String = {
+  def generate(prefix: String): String = {
+    val UUID = if (prng == null) java.util.UUID.randomUUID else new java.util.UUID(prng.nextLong, prng.nextLong)
+    prefix + UUID
+  }
 
-    if (isRegex) {
-      if (replaceFirst) pattern.replaceFirstIn(str, replacement)
-      else pattern.replaceAllIn(str, replacement)
-    } else {
-      val max = if (replaceFirst) 1 else -1
-      StringUtils.replace(str, pattern, replacement, max)
-    }
+  // Only used for test
+  @VisibleForTesting
+  def setPrng(prng: RandomGenerator): Unit = {
+    this.prng = prng
   }
 
-  implicit def toReg(pattern: String): Regex = pattern.r
+  override def getPluginName: String = PLUGIN_NAME
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala
similarity index 60%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala
index 2881b1a2..4cf1aeae 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala
@@ -14,23 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.seatunnel.spark.transform
-
-import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
-import org.apache.spark.sql.{Dataset, Row}
-
-class Sql extends BaseSparkTransform {
 
-  override def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
-    env.getSparkSession.sql(config.getString("sql"))
-  }
-
-  override def checkConfig(): CheckResult = {
-    checkAllExists(config, "sql")
-  }
+package org.apache.seatunnel.spark.transform
 
-  override def getPluginName: String = "sql"
+object UUIDConfig {
+  val PLUGIN_NAME = "UUID"
+  val UDF_NAME = PLUGIN_NAME
 
+  val FIELDS = "fields"
+  val DEFAULT_SOURCE_FILED = "raw_message"
+  val UUID_PREFIX = "prefix"
+  val DEFAULT_UUID_PREFIX = ""
+  val UUID_SECURE = "secure"
+  val DEFAULT_UUID_SECURE = false
 }
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala
similarity index 62%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala
index dddac46a..7d375118 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala
@@ -14,27 +14,32 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.seatunnel.spark.transform
 
+import java.security.SecureRandom
+
 import junit.framework.TestCase.assertEquals
+import org.apache.commons.math3.random.Well19937c
 import org.junit.Test
 
-class TestReplace {
+class TestUUID {
   @Test
-  def testReplaceReg() {
-    val udf = new Replace
-    assertEquals(
-      "world",
-      udf.replace("hello world", "([^ ]*) ([^ ]*)", "$2", isRegex = true, replaceFirst = false))
-    assertEquals(
-      "hello world",
-      udf.replace("hello world", "([^ ]*)", "$1", isRegex = true, replaceFirst = true))
+  def testUuid() {
+    val UUID = new UUID
+    assertEquals(36, UUID.generate("").length)
+    assertEquals(37, UUID.generate("u").length)
   }
 
   @Test
-  def testReplaceLiteral() {
-    val udf = new Replace
-    assertEquals("fee", udf.replace("foo", "o", "e", isRegex = false, replaceFirst = false))
-    assertEquals("feo", udf.replace("foo", "o", "e", isRegex = false, replaceFirst = true))
+  def testSecureUuid() {
+    val rand = new SecureRandom
+    val seed = for (_ <- 0 until 728) yield rand.nextInt
+    val prng = new Well19937c(seed.toArray)
+
+    val UUID = new UUID
+    UUID.setPrng(prng)
+    assertEquals(36, UUID.generate("").length)
+    assertEquals(37, UUID.generate("u").length)
   }
 }