You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/29 06:48:27 UTC
[incubator-seatunnel] branch dev updated: [Feature][seatunnel-transforms] Add Uuid transform for spark (#1770)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 64f513e8 [Feature][seatunnel-transforms] Add Uuid transform for spark (#1770)
64f513e8 is described below
commit 64f513e82ba7048180d6a424a33c9720e7ba4678
Author: wuzhenhua <10...@users.noreply.github.com>
AuthorDate: Fri Apr 29 14:48:22 2022 +0800
[Feature][seatunnel-transforms] Add Uuid transform for spark (#1770)
* [Feature][seatunnel-transforms] Add Uuid transform for spark and fix some error at replace doc
* Refactored transform config parameter constants and default values
Co-authored-by: xiaowu <cx...@gmail.com>
---
docs/en/transform/replace.md | 10 +--
docs/en/transform/uuid.md | 62 +++++++++++++++++++
seatunnel-core/seatunnel-core-spark/pom.xml | 6 ++
.../seatunnel-transforms-spark/pom.xml | 1 +
.../apache/seatunnel/spark/transform/Json.scala | 22 ++++---
.../seatunnel/spark/transform/JsonConfig.scala} | 26 +++-----
.../apache/seatunnel/spark/transform/Replace.scala | 26 ++++----
.../seatunnel/spark/transform/ReplaceConfig.scala} | 29 ++++-----
.../seatunnel/spark/transform/TestReplace.scala | 1 +
.../apache/seatunnel/spark/transform/Split.scala | 24 ++++----
.../seatunnel/spark/transform/SplitConfig.scala} | 26 +++-----
.../org/apache/seatunnel/spark/transform/Sql.scala | 1 +
.../{ => seatunnel-transform-spark-uuid}/pom.xml | 33 +++++++---
.../org.apache.seatunnel.spark.BaseSparkTransform | 18 ++++++
.../apache/seatunnel/spark/transform/UUID.scala} | 71 +++++++++++-----------
.../seatunnel/spark/transform/UUIDConfig.scala} | 26 +++-----
.../seatunnel/spark/transform/TestUUID.scala} | 31 ++++++----
17 files changed, 256 insertions(+), 157 deletions(-)
diff --git a/docs/en/transform/replace.md b/docs/en/transform/replace.md
index 8286007c..ecfce6dd 100644
--- a/docs/en/transform/replace.md
+++ b/docs/en/transform/replace.md
@@ -1,4 +1,4 @@
-# Json
+# Replace
## Description
@@ -33,13 +33,13 @@ The name of the field to replaced.
The string to match.
-### is_regex [string]
+### replacement [string]
-Whether or not to interpret the pattern as a regex (true) or string literal (false).
+The replacement pattern (is_regex is true) or string literal (is_regex is false).
-### replacement [boolean]
+### is_regex [boolean]
-The replacement pattern (is_regex is true) or string literal (is_regex is false).
+Whether or not to interpret the pattern as a regex (true) or string literal (false).
### replace_first [boolean]
diff --git a/docs/en/transform/uuid.md b/docs/en/transform/uuid.md
new file mode 100644
index 00000000..4633a843
--- /dev/null
+++ b/docs/en/transform/uuid.md
@@ -0,0 +1,62 @@
+# UUID
+
+## Description
+
+Generate a universally unique identifier on a specified field.
+
+:::tip
+
+This transform **ONLY** supported by Spark.
+
+:::
+
+## Options
+
+| name | type | required | default value |
+| -------------- | ------ | -------- | ------------- |
+| fields | string | yes | - |
+| prefix | string | no | - |
+| secure | boolean| no | false |
+
+### field [string]
+
+The name of the field to generate.
+
+### prefix [string]
+
+The prefix string constant to prepend to each generated UUID.
+
+### secure [boolean]
+
+the cryptographically secure algorithm can be comparatively slow
+The nonSecure algorithm uses a secure random seed but is otherwise deterministic
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform Plugin](common-options.mdx) for details
+
+## Examples
+
+```bash
+ UUID {
+ fields = "u"
+ prefix = "uuid-"
+ secure = true
+ }
+}
+```
+
+Use `UUID` as udf in sql.
+
+```bash
+ UUID {
+ fields = "u"
+ prefix = "uuid-"
+ secure = true
+ }
+
+ # Use the uuid function (confirm that the fake table exists)
+ sql {
+ sql = "select * from (select raw_message, UUID() as info_row from fake) t1"
+ }
+```
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index 53ae179d..86410e07 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -74,6 +74,12 @@
<artifactId>seatunnel-transform-spark-replace</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transform-spark-uuid</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
<build>
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
index 0626dfc0..e95a4ebc 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
+++ b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
@@ -34,6 +34,7 @@
<module>seatunnel-transform-spark-json</module>
<module>seatunnel-transform-spark-split</module>
<module>seatunnel-transform-spark-replace</module>
+ <module>seatunnel-transform-spark-uuid</module>
<module>seatunnel-transform-spark-sql</module>
</modules>
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
index 519be92b..63e50cee 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/Json.scala
@@ -14,11 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.seatunnel.spark.transform
import org.apache.seatunnel.common.config.{Common, ConfigRuntimeException}
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.JsonConfig._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
@@ -40,19 +42,19 @@ class Json extends BaseSparkTransform {
var useCustomSchema: Boolean = false
override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
- val srcField = config.getString("source_field")
+ val srcField = config.getString(SOURCE_FILED)
val spark = env.getSparkSession
import spark.implicits._
- config.getString("target_field") match {
+ config.getString(TARGET_FILED) match {
case Constants.ROW_ROOT => {
val jsonRDD = df.select(srcField).as[String].rdd
val newDF = srcField match {
// for backward-compatibility for spark < 2.2.0, we created rdd, not Dataset[String]
- case "raw_message" => {
+ case DEFAULT_SOURCE_FILED => {
val tmpDF =
if (this.useCustomSchema) {
spark.read.schema(this.customSchema).json(jsonRDD)
@@ -94,14 +96,14 @@ class Json extends BaseSparkTransform {
override def prepare(env: SparkEnvironment): Unit = {
val defaultConfig = ConfigFactory.parseMap(
Map(
- "source_field" -> "raw_message",
- "target_field" -> Constants.ROW_ROOT,
- "schema_dir" -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString,
- "schema_file" -> ""))
+ SOURCE_FILED -> DEFAULT_SOURCE_FILED,
+ TARGET_FILED -> Constants.ROW_ROOT,
+ SCHEMA_DIR -> Paths.get(Common.pluginFilesDir("json").toString, "schemas").toString,
+ SCHEMA_FILE -> DEFAULT_SCHEMA_FILE))
config = config.withFallback(defaultConfig)
- val schemaFile = config.getString("schema_file")
+ val schemaFile = config.getString(SCHEMA_FILE)
if (schemaFile.trim != "") {
- parseCustomJsonSchema(env.getSparkSession, config.getString("schema_dir"), schemaFile)
+ parseCustomJsonSchema(env.getSparkSession, config.getString(SCHEMA_DIR), schemaFile)
}
}
@@ -136,5 +138,5 @@ class Json extends BaseSparkTransform {
}
}
- override def getPluginName: String = "json"
+ override def getPluginName: String = PLUGIN_NAME
}
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala
similarity index 60%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala
index 2881b1a2..560fce0c 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-json/src/main/scala/org/apache/seatunnel/spark/transform/JsonConfig.scala
@@ -14,23 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark.transform
-
-import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
-import org.apache.spark.sql.{Dataset, Row}
-
-class Sql extends BaseSparkTransform {
- override def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
- env.getSparkSession.sql(config.getString("sql"))
- }
-
- override def checkConfig(): CheckResult = {
- checkAllExists(config, "sql")
- }
+package org.apache.seatunnel.spark.transform
- override def getPluginName: String = "sql"
+object JsonConfig {
+ val PLUGIN_NAME = "json"
+ val FIELDS = "fields"
+ val SOURCE_FILED = "source_field"
+ val DEFAULT_SOURCE_FILED = "raw_message"
+ val TARGET_FILED = "target_field"
+ val SCHEMA_DIR = "schema_dir"
+ val SCHEMA_FILE = "schema_file"
+ val DEFAULT_SCHEMA_FILE = ""
}
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
index 609dec15..27e6cf9c 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.seatunnel.spark.transform
import scala.collection.JavaConversions._
@@ -26,42 +27,43 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.ReplaceConfig._
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, udf}
class Replace extends BaseSparkTransform {
override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
- val srcField = config.getString("source_field")
- val key = config.getString("fields")
+ val srcField = config.getString(SOURCE_FILED)
+ val key = config.getString(FIELDS)
val func: UserDefinedFunction = udf((s: String) => {
replace(
s,
- config.getString("pattern"),
- config.getString("replacement"),
- config.getBoolean("is_regex"),
- config.getBoolean("replace_first"))
+ config.getString(PATTERN),
+ config.getString(REPLACEMENT),
+ config.getBoolean(REPLACE_REGEX),
+ config.getBoolean(REPLACE_FIRST))
})
var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP))
val ds = filterDf.drop(Constants.ROW_TMP)
if (func != null) {
- env.getSparkSession.udf.register("Replace", func)
+ env.getSparkSession.udf.register(UDF_NAME, func)
}
ds
}
override def checkConfig(): CheckResult = {
- checkAllExists(config, "fields", "pattern", "replacement")
+ checkAllExists(config, FIELDS, PATTERN, REPLACEMENT)
}
override def prepare(env: SparkEnvironment): Unit = {
val defaultConfig = ConfigFactory.parseMap(
Map(
- "source_field" -> "raw_message",
- "is_regex" -> false,
- "replace_first" -> false))
+ SOURCE_FILED -> DEFAULT_SOURCE_FILED,
+ REPLACE_REGEX -> DEFAULT_REPLACE_REGEX,
+ REPLACE_FIRST -> DEFAULT_REPLACE_FIRST))
config = config.withFallback(defaultConfig)
}
@@ -83,4 +85,6 @@ class Replace extends BaseSparkTransform {
}
implicit def toReg(pattern: String): Regex = pattern.r
+
+ override def getPluginName: String = PLUGIN_NAME
}
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala
similarity index 60%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala
index 2881b1a2..51e15153 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/ReplaceConfig.scala
@@ -14,23 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark.transform
-
-import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
-import org.apache.spark.sql.{Dataset, Row}
-
-class Sql extends BaseSparkTransform {
- override def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
- env.getSparkSession.sql(config.getString("sql"))
- }
-
- override def checkConfig(): CheckResult = {
- checkAllExists(config, "sql")
- }
+package org.apache.seatunnel.spark.transform
- override def getPluginName: String = "sql"
+object ReplaceConfig {
+ val PLUGIN_NAME = "replace"
+ val UDF_NAME = "Replace"
+ val FIELDS = "fields"
+ val SOURCE_FILED = "source_field"
+ val DEFAULT_SOURCE_FILED = "raw_message"
+ val PATTERN = "pattern"
+ val REPLACEMENT = "replacement"
+ val REPLACE_REGEX = "is_regex"
+ val DEFAULT_REPLACE_REGEX = false
+ val REPLACE_FIRST = "replace_first"
+ val DEFAULT_REPLACE_FIRST = false
}
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
index dddac46a..fdb4287d 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.seatunnel.spark.transform
import junit.framework.TestCase.assertEquals
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
index 498776c0..764c5bab 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/Split.scala
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.seatunnel.spark.transform
import scala.collection.JavaConversions._
@@ -23,6 +24,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.SplitConfig._
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, udf}
@@ -30,15 +32,15 @@ import org.apache.spark.sql.functions.{col, udf}
class Split extends BaseSparkTransform {
override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
- val srcField = config.getString("source_field")
- val keys = config.getStringList("fields")
+ val srcField = config.getString(SOURCE_FILED)
+ val keys = config.getStringList(FIELDS)
// https://stackoverflow.com/a/33345698/1145750
var func: UserDefinedFunction = null
- val ds = config.getString("target_field") match {
+ val ds = config.getString(TARGET_FILED) match {
case Constants.ROW_ROOT =>
func = udf((s: String) => {
- split(s, config.getString("separator"), keys.size())
+ split(s, config.getString(SPLIT_SEPARATOR), keys.size())
})
var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
for (i <- 0 until keys.size()) {
@@ -47,28 +49,28 @@ class Split extends BaseSparkTransform {
filterDf.drop(Constants.ROW_TMP)
case targetField: String =>
func = udf((s: String) => {
- val values = split(s, config.getString("separator"), keys.size)
+ val values = split(s, config.getString(SPLIT_SEPARATOR), keys.size)
val kvs = (keys zip values).toMap
kvs
})
df.withColumn(targetField, func(col(srcField)))
}
if (func != null) {
- env.getSparkSession.udf.register("Split", func)
+ env.getSparkSession.udf.register(UDF_NAME, func)
}
ds
}
override def checkConfig(): CheckResult = {
- checkAllExists(config, "fields")
+ checkAllExists(config, FIELDS)
}
override def prepare(env: SparkEnvironment): Unit = {
val defaultConfig = ConfigFactory.parseMap(
Map(
- "separator" -> " ",
- "source_field" -> "raw_message",
- "target_field" -> Constants.ROW_ROOT))
+ SPLIT_SEPARATOR -> DEFAULT_SPLIT_SEPARATOR,
+ SOURCE_FILED -> DEFAULT_SOURCE_FILED,
+ TARGET_FILED -> Constants.ROW_ROOT))
config = config.withFallback(defaultConfig)
}
@@ -86,5 +88,5 @@ class Split extends BaseSparkTransform {
filled.toSeq
}
- override def getPluginName: String = "split"
+ override def getPluginName: String = PLUGIN_NAME
}
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala
similarity index 60%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala
index 2881b1a2..b23036ab 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-split/src/main/scala/org/apache/seatunnel/spark/transform/SplitConfig.scala
@@ -14,23 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark.transform
-
-import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
-import org.apache.spark.sql.{Dataset, Row}
-
-class Sql extends BaseSparkTransform {
- override def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
- env.getSparkSession.sql(config.getString("sql"))
- }
-
- override def checkConfig(): CheckResult = {
- checkAllExists(config, "sql")
- }
+package org.apache.seatunnel.spark.transform
- override def getPluginName: String = "sql"
+object SplitConfig {
+ val PLUGIN_NAME = "split"
+ val UDF_NAME = "Split"
+ val FIELDS = "fields"
+ val SOURCE_FILED = "source_field"
+ val DEFAULT_SOURCE_FILED = "raw_message"
+ val TARGET_FILED = "target_field"
+ val SPLIT_SEPARATOR = "separator"
+ val DEFAULT_SPLIT_SEPARATOR = " "
}
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
index 2881b1a2..b15a4dd5 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
@@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.seatunnel.spark.transform
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml
similarity index 59%
copy from seatunnel-transforms/seatunnel-transforms-spark/pom.xml
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml
index 0626dfc0..f9185ef4 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/pom.xml
@@ -22,19 +22,34 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-transforms</artifactId>
+ <artifactId>seatunnel-transforms-spark</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-transforms-spark</artifactId>
- <packaging>pom</packaging>
+ <artifactId>seatunnel-transform-spark-uuid</artifactId>
- <modules>
- <module>seatunnel-transform-spark-json</module>
- <module>seatunnel-transform-spark-split</module>
- <module>seatunnel-transform-spark-replace</module>
- <module>seatunnel-transform-spark-sql</module>
- </modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api-spark</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ </dependencies>
</project>
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform
new file mode 100644
index 00000000..281a7242
--- /dev/null
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.spark.transform.UUID
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala
similarity index 54%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala
index 609dec15..829df588 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUID.scala
@@ -14,73 +14,76 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.seatunnel.spark.transform
+import java.security.SecureRandom
+
import scala.collection.JavaConversions._
-import scala.util.matching.Regex
import com.google.common.annotations.VisibleForTesting
-import org.apache.commons.lang3.StringUtils
+import org.apache.commons.math3.random.{RandomGenerator, Well19937c}
import org.apache.seatunnel.common.Constants
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.seatunnel.spark.transform.UUIDConfig._
import org.apache.spark.sql.{Dataset, Row}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.{col, udf}
-class Replace extends BaseSparkTransform {
+class UUID extends BaseSparkTransform {
+ private var prng: RandomGenerator = _
+
override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
- val srcField = config.getString("source_field")
- val key = config.getString("fields")
+ val key = config.getString(FIELDS)
- val func: UserDefinedFunction = udf((s: String) => {
- replace(
- s,
- config.getString("pattern"),
- config.getString("replacement"),
- config.getBoolean("is_regex"),
- config.getBoolean("replace_first"))
+ val func: UserDefinedFunction = udf(() => {
+ generate(config.getString(UUID_PREFIX))
})
- var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
+ var filterDf = df.withColumn(Constants.ROW_TMP, func())
filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP))
val ds = filterDf.drop(Constants.ROW_TMP)
if (func != null) {
- env.getSparkSession.udf.register("Replace", func)
+ env.getSparkSession.udf.register(UDF_NAME, func)
}
ds
}
override def checkConfig(): CheckResult = {
- checkAllExists(config, "fields", "pattern", "replacement")
+ checkAllExists(config, FIELDS)
}
override def prepare(env: SparkEnvironment): Unit = {
- val defaultConfig = ConfigFactory.parseMap(
- Map(
- "source_field" -> "raw_message",
- "is_regex" -> false,
- "replace_first" -> false))
+ val defaultConfig = ConfigFactory.parseMap(Map(UUID_PREFIX -> DEFAULT_UUID_PREFIX, UUID_SECURE -> DEFAULT_UUID_SECURE))
config = config.withFallback(defaultConfig)
+
+ /**
+ * The secure algorithm can be comparatively slow.
+ * The new nonSecure algorithm never blocks and is much faster.
+ * The nonSecure algorithm uses a secure random seed but is otherwise deterministic,
+ * though it is one of the strongest uniform pseudo-random number generators known so far.
+ * thanks for whoschek@cloudera.com
+ */
+ if (config.getBoolean(UUID_SECURE)) {
+ val rand = new SecureRandom
+ val seed = for (_ <- 0 until 728) yield rand.nextInt
+ prng = new Well19937c(seed.toArray)
+ }
}
@VisibleForTesting
- def replace(
- str: String,
- pattern: String,
- replacement: String,
- isRegex: Boolean,
- replaceFirst: Boolean): String = {
+ def generate(prefix: String): String = {
+ val UUID = if (prng == null) java.util.UUID.randomUUID else new java.util.UUID(prng.nextLong, prng.nextLong)
+ prefix + UUID
+ }
- if (isRegex) {
- if (replaceFirst) pattern.replaceFirstIn(str, replacement)
- else pattern.replaceAllIn(str, replacement)
- } else {
- val max = if (replaceFirst) 1 else -1
- StringUtils.replace(str, pattern, replacement, max)
- }
+ // Only used for test
+ @VisibleForTesting
+ def setPrng(prng: RandomGenerator): Unit = {
+ this.prng = prng
}
- implicit def toReg(pattern: String): Regex = pattern.r
+ override def getPluginName: String = PLUGIN_NAME
}
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala
similarity index 60%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala
index 2881b1a2..4cf1aeae 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-sql/src/main/scala/org/apache/seatunnel/spark/transform/Sql.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/main/scala/org/apache/seatunnel/spark/transform/UUIDConfig.scala
@@ -14,23 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.seatunnel.spark.transform
-
-import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
-import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
-import org.apache.spark.sql.{Dataset, Row}
-
-class Sql extends BaseSparkTransform {
- override def process(data: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
- env.getSparkSession.sql(config.getString("sql"))
- }
-
- override def checkConfig(): CheckResult = {
- checkAllExists(config, "sql")
- }
+package org.apache.seatunnel.spark.transform
- override def getPluginName: String = "sql"
+object UUIDConfig {
+ val PLUGIN_NAME = "UUID"
+ val UDF_NAME = PLUGIN_NAME
+ val FIELDS = "fields"
+ val DEFAULT_SOURCE_FILED = "raw_message"
+ val UUID_PREFIX = "prefix"
+ val DEFAULT_UUID_PREFIX = ""
+ val UUID_SECURE = "secure"
+ val DEFAULT_UUID_SECURE = false
}
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala
similarity index 62%
copy from seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala
index dddac46a..7d375118 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-uuid/src/test/scala/org/apache/seatunnel/spark/transform/TestUUID.scala
@@ -14,27 +14,32 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.seatunnel.spark.transform
+import java.security.SecureRandom
+
import junit.framework.TestCase.assertEquals
+import org.apache.commons.math3.random.Well19937c
import org.junit.Test
-class TestReplace {
+class TestUUID {
@Test
- def testReplaceReg() {
- val udf = new Replace
- assertEquals(
- "world",
- udf.replace("hello world", "([^ ]*) ([^ ]*)", "$2", isRegex = true, replaceFirst = false))
- assertEquals(
- "hello world",
- udf.replace("hello world", "([^ ]*)", "$1", isRegex = true, replaceFirst = true))
+ def testUuid() {
+ val UUID = new UUID
+ assertEquals(36, UUID.generate("").length)
+ assertEquals(37, UUID.generate("u").length)
}
@Test
- def testReplaceLiteral() {
- val udf = new Replace
- assertEquals("fee", udf.replace("foo", "o", "e", isRegex = false, replaceFirst = false))
- assertEquals("feo", udf.replace("foo", "o", "e", isRegex = false, replaceFirst = true))
+ def testSecureUuid() {
+ val rand = new SecureRandom
+ val seed = for (_ <- 0 until 728) yield rand.nextInt
+ val prng = new Well19937c(seed.toArray)
+
+ val UUID = new UUID
+ UUID.setPrng(prng)
+ assertEquals(36, UUID.generate("").length)
+ assertEquals(37, UUID.generate("u").length)
}
}