You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/27 14:55:27 UTC

[incubator-seatunnel] branch dev updated: [Feature][seatunnel-transforms] Add Replace transforms for spark #1750 (#1751)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f1bf9535 [Feature][seatunnel-transforms] Add Replace transforms for spark #1750 (#1751)
f1bf9535 is described below

commit f1bf9535ee4e7f8f1163d629356047b556d2b4b0
Author: wuzhenhua <10...@users.noreply.github.com>
AuthorDate: Wed Apr 27 22:55:22 2022 +0800

    [Feature][seatunnel-transforms] Add Replace transforms for spark #1750 (#1751)
    
    * Add Replace transforms for spark #1750
    * fix mistake and update doc for spark replace transform.
---
 docs/en/transform/replace.md                       | 79 ++++++++++++++++++++
 seatunnel-core/seatunnel-core-spark/pom.xml        |  6 ++
 .../seatunnel-transforms-spark/pom.xml             |  1 +
 .../pom.xml                                        | 32 ++++++--
 .../org.apache.seatunnel.spark.BaseSparkTransform  | 18 +++++
 .../apache/seatunnel/spark/transform/Replace.scala | 86 ++++++++++++++++++++++
 .../seatunnel/spark/transform/TestReplace.scala    | 40 ++++++++++
 7 files changed, 254 insertions(+), 8 deletions(-)

diff --git a/docs/en/transform/replace.md b/docs/en/transform/replace.md
new file mode 100644
index 00000000..8286007c
--- /dev/null
+++ b/docs/en/transform/replace.md
@@ -0,0 +1,79 @@
+# Json
+
+## Description
+
+Examines string value in a given field and replaces substring of the string value that matches the given string literal or regexes with the given replacement.
+
+:::tip
+
+This transform **ONLY** supported by Spark.
+
+:::
+
+## Options
+
+| name           | type   | required | default value |
+| -------------- | ------ | -------- | ------------- |
+| source_field   | string | no       | raw_message   |
+| fields         | string | yes      | -             |
+| pattern        | string | yes      | -             |
+| replacement    | string | yes      | -             |
+| is_regex       | boolean| no       | false         |
+| replace_first  | boolean| no       | false         |
+
+### source_field [string]
+
+Source field, if not configured, the default is `raw_message`
+
+### field [string]
+
+The name of the field to replaced.
+
+### pattern [string]
+
+The string to match.
+
+### is_regex [string]
+
+Whether or not to interpret the pattern as a regex (true) or string literal (false).
+
+### replacement [boolean]
+
+The replacement pattern (is_regex is true) or string literal (is_regex is false).
+
+### replace_first [boolean]
+
+Whether or not to skip any matches beyond the first match.
+
+### common options [string]
+
+Transform plugin common parameters, please refer to [Transform Plugin](common-options.mdx) for details
+
+## Examples
+the word `a` will be replaced by `b` at message field values.
+
+```bash
+replace {
+    source_field = "message"
+    fields = "_replaced"
+    pattern = "a"
+    replacement = "b"
+}
+```
+
+Use `Replace` as udf in sql.
+
+```bash
+  Replace {
+    fields = "_replaced"
+    pattern = "([^ ]*) ([^ ]*)"
+    replacement = "$2
+    isRegex = true
+    replaceFirst = true
+  }
+
+  # Use the split function (confirm that the fake table exists)
+  sql {
+    sql = "select * from (select raw_message, replace(raw_message) as info_row from fake) t1"
+  }
+```
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index de6f59b9..53ae179d 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -68,6 +68,12 @@
             <artifactId>seatunnel-transform-spark-json</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-transform-spark-replace</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
index 6d7a28c2..0626dfc0 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
+++ b/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
@@ -33,6 +33,7 @@
     <modules>
         <module>seatunnel-transform-spark-json</module>
         <module>seatunnel-transform-spark-split</module>
+        <module>seatunnel-transform-spark-replace</module>
         <module>seatunnel-transform-spark-sql</module>
     </modules>
 
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/pom.xml
similarity index 59%
copy from seatunnel-transforms/seatunnel-transforms-spark/pom.xml
copy to seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/pom.xml
index 6d7a28c2..ef29c636 100644
--- a/seatunnel-transforms/seatunnel-transforms-spark/pom.xml
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/pom.xml
@@ -22,18 +22,34 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <groupId>org.apache.seatunnel</groupId>
-        <artifactId>seatunnel-transforms</artifactId>
+        <artifactId>seatunnel-transforms-spark</artifactId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>seatunnel-transforms-spark</artifactId>
-    <packaging>pom</packaging>
+    <artifactId>seatunnel-transform-spark-replace</artifactId>
 
-    <modules>
-        <module>seatunnel-transform-spark-json</module>
-        <module>seatunnel-transform-spark-split</module>
-        <module>seatunnel-transform-spark-sql</module>
-    </modules>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api-spark</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
 </project>
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform
new file mode 100644
index 00000000..84bbac24
--- /dev/null
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/resources/META-INF/services/org.apache.seatunnel.spark.BaseSparkTransform
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.spark.transform.Replace
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
new file mode 100644
index 00000000..609dec15
--- /dev/null
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/main/scala/org/apache/seatunnel/spark/transform/Replace.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.transform
+
+import scala.collection.JavaConversions._
+import scala.util.matching.Regex
+
+import com.google.common.annotations.VisibleForTesting
+import org.apache.commons.lang3.StringUtils
+import org.apache.seatunnel.common.Constants
+import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
+import org.apache.seatunnel.spark.{BaseSparkTransform, SparkEnvironment}
+import org.apache.spark.sql.{Dataset, Row}
+import org.apache.spark.sql.expressions.UserDefinedFunction
+import org.apache.spark.sql.functions.{col, udf}
+
+class Replace extends BaseSparkTransform {
+  override def process(df: Dataset[Row], env: SparkEnvironment): Dataset[Row] = {
+    val srcField = config.getString("source_field")
+    val key = config.getString("fields")
+
+    val func: UserDefinedFunction = udf((s: String) => {
+      replace(
+        s,
+        config.getString("pattern"),
+        config.getString("replacement"),
+        config.getBoolean("is_regex"),
+        config.getBoolean("replace_first"))
+    })
+    var filterDf = df.withColumn(Constants.ROW_TMP, func(col(srcField)))
+    filterDf = filterDf.withColumn(key, col(Constants.ROW_TMP))
+    val ds = filterDf.drop(Constants.ROW_TMP)
+    if (func != null) {
+      env.getSparkSession.udf.register("Replace", func)
+    }
+    ds
+  }
+
+  override def checkConfig(): CheckResult = {
+    checkAllExists(config, "fields", "pattern", "replacement")
+  }
+
+  override def prepare(env: SparkEnvironment): Unit = {
+    val defaultConfig = ConfigFactory.parseMap(
+      Map(
+        "source_field" -> "raw_message",
+        "is_regex" -> false,
+        "replace_first" -> false))
+    config = config.withFallback(defaultConfig)
+  }
+
+  @VisibleForTesting
+  def replace(
+      str: String,
+      pattern: String,
+      replacement: String,
+      isRegex: Boolean,
+      replaceFirst: Boolean): String = {
+
+    if (isRegex) {
+      if (replaceFirst) pattern.replaceFirstIn(str, replacement)
+      else pattern.replaceAllIn(str, replacement)
+    } else {
+      val max = if (replaceFirst) 1 else -1
+      StringUtils.replace(str, pattern, replacement, max)
+    }
+  }
+
+  implicit def toReg(pattern: String): Regex = pattern.r
+}
diff --git a/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
new file mode 100644
index 00000000..dddac46a
--- /dev/null
+++ b/seatunnel-transforms/seatunnel-transforms-spark/seatunnel-transform-spark-replace/src/test/scala/org/apache/seatunnel/spark/transform/TestReplace.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.spark.transform
+
+import junit.framework.TestCase.assertEquals
+import org.junit.Test
+
+class TestReplace {
+  @Test
+  def testReplaceReg() {
+    val udf = new Replace
+    assertEquals(
+      "world",
+      udf.replace("hello world", "([^ ]*) ([^ ]*)", "$2", isRegex = true, replaceFirst = false))
+    assertEquals(
+      "hello world",
+      udf.replace("hello world", "([^ ]*)", "$1", isRegex = true, replaceFirst = true))
+  }
+
+  @Test
+  def testReplaceLiteral() {
+    val udf = new Replace
+    assertEquals("fee", udf.replace("foo", "o", "e", isRegex = false, replaceFirst = false))
+    assertEquals("feo", udf.replace("foo", "o", "e", isRegex = false, replaceFirst = true))
+  }
+}