You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/15 08:54:02 UTC

[inlong] branch master updated: [INLONG-5068][Sort] Fix RegexpReplace replacing uncorrectly (#5069)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 78d12c05b [INLONG-5068][Sort] Fix RegexpReplace replacing uncorrectly (#5069)
78d12c05b is described below

commit 78d12c05b3c7637a67817dc25877e0931d71fed6
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Fri Jul 15 16:53:57 2022 +0800

    [INLONG-5068][Sort] Fix RegexpReplace replacing uncorrectly (#5069)
---
 .../inlong/sort/function/RegexpReplaceFirstFunction.java  |  4 ++--
 ...placeFirstFunction.java => RegexpReplaceFunction.java} | 10 +++++-----
 .../apache/inlong/sort/parser/impl/FlinkSqlParser.java    |  2 ++
 .../inlong/sort/parser/impl/NativeFlinkSqlParser.java     | 15 ++++++++-------
 .../inlong/sort/function/RegexpReplaceFunctionTest.java   | 14 ++++++++------
 5 files changed, 25 insertions(+), 20 deletions(-)

diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java
index 19c46f3f3..5a66247d9 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sort.function;
 import org.apache.flink.table.functions.ScalarFunction;
 
 /**
- * RegexpReplaceFirstFunction class isIt is a custom function, used to replace the value in the string.
+ * RegexpReplaceFirstFunction class. It is a custom function, used to replace the value in the string.
  * Specifically, given a string to be replaced, a regular expression to be replaced, and the replaced target string,
  * the regular expression will be satisfied replace the first element of the formula with the target string.
  */
@@ -38,7 +38,7 @@ public class RegexpReplaceFirstFunction extends ScalarFunction {
         if (field != null) {
             return field.replaceFirst(regex, replacement);
         }
-        return field;
+        return null;
     }
 
 }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFunction.java
similarity index 79%
copy from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java
copy to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFunction.java
index 19c46f3f3..89e186f9f 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFunction.java
@@ -20,11 +20,11 @@ package org.apache.inlong.sort.function;
 import org.apache.flink.table.functions.ScalarFunction;
 
 /**
- * RegexpReplaceFirstFunction class isIt is a custom function, used to replace the value in the string.
+ * RegexpReplaceFirstFunction class. It is a custom function, used to replace the value in the string.
  * Specifically, given a string to be replaced, a regular expression to be replaced, and the replaced target string,
- * the regular expression will be satisfied replace the first element of the formula with the target string.
+ * the regular expression will be satisfied replace the all element of the formula with the target string.
  */
-public class RegexpReplaceFirstFunction extends ScalarFunction {
+public class RegexpReplaceFunction extends ScalarFunction {
 
     private static final long serialVersionUID = -7185622027483662395L;
 
@@ -36,9 +36,9 @@ public class RegexpReplaceFirstFunction extends ScalarFunction {
      */
     public String eval(String field, String regex, String replacement) {
         if (field != null) {
-            return field.replaceFirst(regex, replacement);
+            return field.replaceAll(regex, replacement);
         }
-        return field;
+        return null;
     }
 
 }
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index f818a2b61..b15b1ddab 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.inlong.sort.formats.base.TableFormatUtils;
 import org.apache.inlong.sort.formats.common.FormatInfo;
 import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
+import org.apache.inlong.sort.function.RegexpReplaceFunction;
 import org.apache.inlong.sort.parser.Parser;
 import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
 import org.apache.inlong.sort.parser.result.ParseResult;
@@ -101,6 +102,7 @@ public class FlinkSqlParser implements Parser {
      */
     private void registerUDF() {
         tableEnv.createTemporarySystemFunction("REGEXP_REPLACE_FIRST", RegexpReplaceFirstFunction.class);
+        tableEnv.createTemporarySystemFunction("REGEXP_REPLACE", RegexpReplaceFunction.class);
     }
 
     /**
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
index bffd3d1c9..3a1cc2941 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
@@ -21,6 +21,7 @@ package org.apache.inlong.sort.parser.impl;
 import com.google.common.base.Preconditions;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
+import org.apache.inlong.sort.function.RegexpReplaceFunction;
 import org.apache.inlong.sort.parser.Parser;
 import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
 import org.apache.inlong.sort.parser.result.ParseResult;
@@ -48,13 +49,6 @@ public class NativeFlinkSqlParser implements Parser {
         registerUDF();
     }
 
-    /**
-     * Register udf
-     */
-    private void registerUDF() {
-        tableEnv.createTemporarySystemFunction("REGEXP_REPLACE_FIRST", RegexpReplaceFirstFunction.class);
-    }
-
     /**
      * Get a instance of NativeFlinkSqlParser
      *
@@ -66,6 +60,13 @@ public class NativeFlinkSqlParser implements Parser {
         return new NativeFlinkSqlParser(tableEnv, statements);
     }
 
+    /**
+     * Register udf
+     */
+    private void registerUDF() {
+        tableEnv.createTemporarySystemFunction("REGEXP_REPLACE_FIRST", RegexpReplaceFirstFunction.class);
+        tableEnv.createTemporarySystemFunction("REGEXP_REPLACE", RegexpReplaceFunction.class);
+    }
 
     /**
      * parse flink sql script file
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
index 35a243f9e..324c547dd 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.util.CloseableIterator;
 import org.apache.inlong.sort.formats.common.StringFormatInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
 import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
-import org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFunction;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -60,18 +59,21 @@ public class RegexpReplaceFunctionTest extends AbstractTestBase {
         env.setParallelism(1);
         env.enableCheckpointing(10000);
         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        tableEnv.createTemporaryFunction("REGEXP_REPLACE", RegexpReplaceFunction.class);
         // step 1. Generate test data and convert to DataStream
         List<Row> data = new ArrayList<>();
-        data.add(Row.of("inlong is a data integration tool and inlong has been used by many companies"));
+        data.add(Row.of("111222333,111222333,111222333"));
         TypeInformation<?>[] types = {
                 BasicTypeInfo.STRING_TYPE_INFO};
         String[] names = {"f1"};
         RowTypeInfo typeInfo = new RowTypeInfo(types, names);
         DataStream<Row> dataStream = env.fromCollection(data).returns(typeInfo);
         // step 2. Convert from DataStream to Table and execute the REGEXP_REPLACE function
-        RegexpReplaceFunction regexpReplaceFunction = new RegexpReplaceFunction(new FieldInfo("f1",
-                new StringFormatInfo()), new StringConstantParam("inlong*"),
-                new StringConstantParam("INLONG"));
+        org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFunction regexpReplaceFunction =
+                new org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFunction(
+                        new FieldInfo("f1", new StringFormatInfo()),
+                        new StringConstantParam("(\\d{3})\\d*(\\d{3})"),
+                        new StringConstantParam("$1***$2"));
         Table tempView = tableEnv.fromDataStream(dataStream).as("f1");
         tableEnv.createTemporaryView("temp_view", tempView);
         String sqlQuery = String.format("SELECT %s as f1 FROM temp_view", regexpReplaceFunction.format());
@@ -85,7 +87,7 @@ public class RegexpReplaceFunctionTest extends AbstractTestBase {
             result.add(next);
         }
         // step 4. Whether the comparison results are as expected
-        String expect = "INLONG is a data integration tool and INLONG has been used by many companies";
+        String expect = "111***333,111***333,111***333";
         Assert.assertEquals(expect, result.get(0));
     }