You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/15 08:54:02 UTC
[inlong] branch master updated: [INLONG-5068][Sort] Fix RegexpReplace replacing uncorrectly (#5069)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 78d12c05b [INLONG-5068][Sort] Fix RegexpReplace replacing uncorrectly (#5069)
78d12c05b is described below
commit 78d12c05b3c7637a67817dc25877e0931d71fed6
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Fri Jul 15 16:53:57 2022 +0800
[INLONG-5068][Sort] Fix RegexpReplace replacing uncorrectly (#5069)
---
.../inlong/sort/function/RegexpReplaceFirstFunction.java | 4 ++--
...placeFirstFunction.java => RegexpReplaceFunction.java} | 10 +++++-----
.../apache/inlong/sort/parser/impl/FlinkSqlParser.java | 2 ++
.../inlong/sort/parser/impl/NativeFlinkSqlParser.java | 15 ++++++++-------
.../inlong/sort/function/RegexpReplaceFunctionTest.java | 14 ++++++++------
5 files changed, 25 insertions(+), 20 deletions(-)
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java
index 19c46f3f3..5a66247d9 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java
@@ -20,7 +20,7 @@ package org.apache.inlong.sort.function;
import org.apache.flink.table.functions.ScalarFunction;
/**
- * RegexpReplaceFirstFunction class isIt is a custom function, used to replace the value in the string.
+ * RegexpReplaceFirstFunction class. It is a custom function, used to replace the value in the string.
* Specifically, given a string to be replaced, a regular expression to be replaced, and the replaced target string,
* the regular expression will be satisfied replace the first element of the formula with the target string.
*/
@@ -38,7 +38,7 @@ public class RegexpReplaceFirstFunction extends ScalarFunction {
if (field != null) {
return field.replaceFirst(regex, replacement);
}
- return field;
+ return null;
}
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFunction.java
similarity index 79%
copy from inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java
copy to inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFunction.java
index 19c46f3f3..89e186f9f 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFirstFunction.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/function/RegexpReplaceFunction.java
@@ -20,11 +20,11 @@ package org.apache.inlong.sort.function;
import org.apache.flink.table.functions.ScalarFunction;
/**
- * RegexpReplaceFirstFunction class isIt is a custom function, used to replace the value in the string.
+ * RegexpReplaceFirstFunction class. It is a custom function, used to replace the value in the string.
* Specifically, given a string to be replaced, a regular expression to be replaced, and the replaced target string,
- * the regular expression will be satisfied replace the first element of the formula with the target string.
+ * the regular expression will be satisfied replace the all element of the formula with the target string.
*/
-public class RegexpReplaceFirstFunction extends ScalarFunction {
+public class RegexpReplaceFunction extends ScalarFunction {
private static final long serialVersionUID = -7185622027483662395L;
@@ -36,9 +36,9 @@ public class RegexpReplaceFirstFunction extends ScalarFunction {
*/
public String eval(String field, String regex, String replacement) {
if (field != null) {
- return field.replaceFirst(regex, replacement);
+ return field.replaceAll(regex, replacement);
}
- return field;
+ return null;
}
}
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
index f818a2b61..b15b1ddab 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/FlinkSqlParser.java
@@ -23,6 +23,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.inlong.sort.formats.base.TableFormatUtils;
import org.apache.inlong.sort.formats.common.FormatInfo;
import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
+import org.apache.inlong.sort.function.RegexpReplaceFunction;
import org.apache.inlong.sort.parser.Parser;
import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
import org.apache.inlong.sort.parser.result.ParseResult;
@@ -101,6 +102,7 @@ public class FlinkSqlParser implements Parser {
*/
private void registerUDF() {
tableEnv.createTemporarySystemFunction("REGEXP_REPLACE_FIRST", RegexpReplaceFirstFunction.class);
+ tableEnv.createTemporarySystemFunction("REGEXP_REPLACE", RegexpReplaceFunction.class);
}
/**
diff --git a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
index bffd3d1c9..3a1cc2941 100644
--- a/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
+++ b/inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/parser/impl/NativeFlinkSqlParser.java
@@ -21,6 +21,7 @@ package org.apache.inlong.sort.parser.impl;
import com.google.common.base.Preconditions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.inlong.sort.function.RegexpReplaceFirstFunction;
+import org.apache.inlong.sort.function.RegexpReplaceFunction;
import org.apache.inlong.sort.parser.Parser;
import org.apache.inlong.sort.parser.result.FlinkSqlParseResult;
import org.apache.inlong.sort.parser.result.ParseResult;
@@ -48,13 +49,6 @@ public class NativeFlinkSqlParser implements Parser {
registerUDF();
}
- /**
- * Register udf
- */
- private void registerUDF() {
- tableEnv.createTemporarySystemFunction("REGEXP_REPLACE_FIRST", RegexpReplaceFirstFunction.class);
- }
-
/**
* Get a instance of NativeFlinkSqlParser
*
@@ -66,6 +60,13 @@ public class NativeFlinkSqlParser implements Parser {
return new NativeFlinkSqlParser(tableEnv, statements);
}
+ /**
+ * Register udf
+ */
+ private void registerUDF() {
+ tableEnv.createTemporarySystemFunction("REGEXP_REPLACE_FIRST", RegexpReplaceFirstFunction.class);
+ tableEnv.createTemporarySystemFunction("REGEXP_REPLACE", RegexpReplaceFunction.class);
+ }
/**
* parse flink sql script file
diff --git a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
index 35a243f9e..324c547dd 100644
--- a/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
+++ b/inlong-sort/sort-core/src/test/java/org/apache/inlong/sort/function/RegexpReplaceFunctionTest.java
@@ -31,7 +31,6 @@ import org.apache.flink.util.CloseableIterator;
import org.apache.inlong.sort.formats.common.StringFormatInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
import org.apache.inlong.sort.protocol.transformation.StringConstantParam;
-import org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFunction;
import org.junit.Assert;
import org.junit.Test;
@@ -60,18 +59,21 @@ public class RegexpReplaceFunctionTest extends AbstractTestBase {
env.setParallelism(1);
env.enableCheckpointing(10000);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+ tableEnv.createTemporaryFunction("REGEXP_REPLACE", RegexpReplaceFunction.class);
// step 1. Generate test data and convert to DataStream
List<Row> data = new ArrayList<>();
- data.add(Row.of("inlong is a data integration tool and inlong has been used by many companies"));
+ data.add(Row.of("111222333,111222333,111222333"));
TypeInformation<?>[] types = {
BasicTypeInfo.STRING_TYPE_INFO};
String[] names = {"f1"};
RowTypeInfo typeInfo = new RowTypeInfo(types, names);
DataStream<Row> dataStream = env.fromCollection(data).returns(typeInfo);
// step 2. Convert from DataStream to Table and execute the REGEXP_REPLACE function
- RegexpReplaceFunction regexpReplaceFunction = new RegexpReplaceFunction(new FieldInfo("f1",
- new StringFormatInfo()), new StringConstantParam("inlong*"),
- new StringConstantParam("INLONG"));
+ org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFunction regexpReplaceFunction =
+ new org.apache.inlong.sort.protocol.transformation.function.RegexpReplaceFunction(
+ new FieldInfo("f1", new StringFormatInfo()),
+ new StringConstantParam("(\\d{3})\\d*(\\d{3})"),
+ new StringConstantParam("$1***$2"));
Table tempView = tableEnv.fromDataStream(dataStream).as("f1");
tableEnv.createTemporaryView("temp_view", tempView);
String sqlQuery = String.format("SELECT %s as f1 FROM temp_view", regexpReplaceFunction.format());
@@ -85,7 +87,7 @@ public class RegexpReplaceFunctionTest extends AbstractTestBase {
result.add(next);
}
// step 4. Whether the comparison results are as expected
- String expect = "INLONG is a data integration tool and INLONG has been used by many companies";
+ String expect = "111***333,111***333,111***333";
Assert.assertEquals(expect, result.get(0));
}