You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/20 12:14:06 UTC
[incubator-inlong] branch master updated: [INLONG-3827][Sort] Add functions definition to support transform (#3850)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new eb487b070 [INLONG-3827][Sort] Add functions definition to support transform (#3850)
eb487b070 is described below
commit eb487b07061c7b1d0b0b66132a749c26e2e95fe3
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Wed Apr 20 20:14:00 2022 +0800
[INLONG-3827][Sort] Add functions definition to support transform (#3850)
Co-authored-by: yunqingmo <yu...@tencent.com>
---
.../protocol/transformation/FilterFunction.java | 7 ++
.../sort/protocol/transformation/Function.java | 24 ++++++-
.../protocol/transformation/FunctionParam.java | 24 ++++++-
.../transformation/GroupTimeWindowFunction.java | 9 +++
.../transformation/TimeWindowFunction.java | 20 +++++-
.../transformation/function/HopEndFunction.java | 72 +++++++++++++++++++
.../transformation/function/HopFunction.java | 72 +++++++++++++++++++
.../transformation/function/HopStartFunction.java | 72 +++++++++++++++++++
.../function/MultiValueFilterFunction.java | 82 ++++++++++++++++++++++
.../function/SessionEndFunction.java | 72 +++++++++++++++++++
.../transformation/function/SessionFunction.java | 72 +++++++++++++++++++
.../function/SessionStartFunction.java | 72 +++++++++++++++++++
.../function/SingleValueFilterFunction.java | 78 ++++++++++++++++++++
.../transformation/function/TumbleEndFunction.java | 72 +++++++++++++++++++
.../transformation/function/TumbleFunction.java | 72 +++++++++++++++++++
.../function/TumbleStartFunction.java | 72 +++++++++++++++++++
.../transformation/function/FunctionBaseTest.java | 65 +++++++++++++++++
.../function/HopEndFunctionTest.java | 49 +++++++++++++
.../transformation/function/HopFunctionTest.java | 49 +++++++++++++
.../function/HopStartFunctionTest.java | 49 +++++++++++++
.../function/MultiValueFilterFunctionTest.java | 49 +++++++++++++
.../function/SessionEndFunctionTest.java | 49 +++++++++++++
.../function/SessionFunctionTest.java | 49 +++++++++++++
.../function/SessionStartFunctionTest.java | 49 +++++++++++++
.../function/SingleValueFilterFunctionTest.java | 50 +++++++++++++
.../function/TumbleEndFunctionTest.java | 49 +++++++++++++
.../function/TumbleFunctionTest.java | 49 +++++++++++++
.../function/TumbleStartFunctionTest.java | 49 +++++++++++++
28 files changed, 1494 insertions(+), 3 deletions(-)
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
index ffcefe35f..e1c98fcf1 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
@@ -17,12 +17,19 @@
package org.apache.inlong.sort.protocol.transformation;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = SingleValueFilterFunction.class, name = "singleValueFilter"),
+ @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = "multiValueFilter")}
+)
public interface FilterFunction extends Function {
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
index 3fb189e06..957d06282 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
@@ -20,6 +20,17 @@ package org.apache.inlong.sort.protocol.transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction;
import java.util.List;
@@ -28,7 +39,18 @@ import java.util.List;
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
- @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark")
+ @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark"),
+ @JsonSubTypes.Type(value = HopStartFunction.class, name = "hopStart"),
+ @JsonSubTypes.Type(value = HopEndFunction.class, name = "hopEnd"),
+ @JsonSubTypes.Type(value = TumbleStartFunction.class, name = "tumbleStart"),
+ @JsonSubTypes.Type(value = TumbleEndFunction.class, name = "tumbleEnd"),
+ @JsonSubTypes.Type(value = SessionStartFunction.class, name = "sessionStart"),
+ @JsonSubTypes.Type(value = SessionEndFunction.class, name = "sessionEnd"),
+ @JsonSubTypes.Type(value = SessionFunction.class, name = "session"),
+ @JsonSubTypes.Type(value = TumbleFunction.class, name = "tumble"),
+ @JsonSubTypes.Type(value = HopFunction.class, name = "hop"),
+ @JsonSubTypes.Type(value = SingleValueFilterFunction.class, name = "singleValueFilter"),
+ @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = "multiValueFilter")
})
public interface Function extends FunctionParam {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index 2c2e767a6..1c37fac3e 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -22,6 +22,17 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction;
import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
@@ -58,7 +69,18 @@ import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
@JsonSubTypes.Type(value = MoreThanOrEqualOperator.class, name = "moreThanOrEqual"),
@JsonSubTypes.Type(value = InOperator.class, name = "in"),
@JsonSubTypes.Type(value = NotInOperator.class, name = "notIn"),
- @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark")
+ @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark"),
+ @JsonSubTypes.Type(value = HopStartFunction.class, name = "hopStart"),
+ @JsonSubTypes.Type(value = HopEndFunction.class, name = "hopEnd"),
+ @JsonSubTypes.Type(value = TumbleStartFunction.class, name = "tumbleStart"),
+ @JsonSubTypes.Type(value = TumbleEndFunction.class, name = "tumbleEnd"),
+ @JsonSubTypes.Type(value = SessionStartFunction.class, name = "sessionStart"),
+ @JsonSubTypes.Type(value = SessionEndFunction.class, name = "sessionEnd"),
+ @JsonSubTypes.Type(value = SessionFunction.class, name = "session"),
+ @JsonSubTypes.Type(value = TumbleFunction.class, name = "tumble"),
+ @JsonSubTypes.Type(value = HopFunction.class, name = "hop"),
+ @JsonSubTypes.Type(value = SingleValueFilterFunction.class, name = "singleValueFilter"),
+ @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = "multiValueFilter")
})
public interface FunctionParam {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/GroupTimeWindowFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/GroupTimeWindowFunction.java
index 1c62ab381..25bf3e240 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/GroupTimeWindowFunction.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/GroupTimeWindowFunction.java
@@ -17,12 +17,21 @@
package org.apache.inlong.sort.protocol.transformation;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type")
+@JsonSubTypes({
+ @JsonSubTypes.Type(value = HopFunction.class, name = "hop"),
+ @JsonSubTypes.Type(value = SessionFunction.class, name = "session"),
+ @JsonSubTypes.Type(value = TumbleFunction.class, name = "tumble")
+})
public interface GroupTimeWindowFunction extends TimeWindowFunction {
}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/TimeWindowFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/TimeWindowFunction.java
index e84eb5b63..95ec6dcf9 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/TimeWindowFunction.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/TimeWindowFunction.java
@@ -19,13 +19,31 @@ package org.apache.inlong.sort.protocol.transformation;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction;
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "type")
@JsonSubTypes({
- @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark")
+ @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark"),
+ @JsonSubTypes.Type(value = HopStartFunction.class, name = "hopStart"),
+ @JsonSubTypes.Type(value = HopEndFunction.class, name = "hopEnd"),
+ @JsonSubTypes.Type(value = TumbleStartFunction.class, name = "tumbleStart"),
+ @JsonSubTypes.Type(value = TumbleEndFunction.class, name = "tumbleEnd"),
+ @JsonSubTypes.Type(value = SessionStartFunction.class, name = "sessionStart"),
+ @JsonSubTypes.Type(value = SessionEndFunction.class, name = "sessionEnd"),
+ @JsonSubTypes.Type(value = SessionFunction.class, name = "session"),
+ @JsonSubTypes.Type(value = TumbleFunction.class, name = "tumble"),
+ @JsonSubTypes.Type(value = HopFunction.class, name = "hop")
})
public interface TimeWindowFunction extends Function {
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunction.java
new file mode 100644
index 000000000..7b1f15d67
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("hopEnd")
+@Data
+@NoArgsConstructor
+public class HopEndFunction implements TimeWindowFunction {
+
+ @JsonProperty("timeAttr")
+ private FieldInfo timeAttr;
+ @JsonProperty("interval")
+ private ConstantParam interval;
+ @JsonProperty("timeUnit")
+ private TimeUnitConstantParam timeUnit;
+
+ @JsonCreator
+ public HopEndFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+ @JsonProperty("interval") ConstantParam interval,
+ @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+ this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+ this.interval = Preconditions.checkNotNull(interval, "interval is null");
+ this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s(%s, INTERVAL %s %s)", getName(),
+ timeAttr.format(), interval.getValue(), timeUnit.format());
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(timeAttr, interval, timeUnit);
+ }
+
+ @Override
+ public String getName() {
+ return "HOP_END";
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopFunction.java
new file mode 100644
index 000000000..a84a1ef56
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.GroupTimeWindowFunction;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("hop")
+@Data
+@NoArgsConstructor
+public class HopFunction implements GroupTimeWindowFunction {
+
+ @JsonProperty("timeAttr")
+ private FieldInfo timeAttr;
+ @JsonProperty("interval")
+ private ConstantParam interval;
+ @JsonProperty("timeUnit")
+ private TimeUnitConstantParam timeUnit;
+
+ @JsonCreator
+ public HopFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+ @JsonProperty("interval") ConstantParam interval,
+ @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+ this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+ this.interval = Preconditions.checkNotNull(interval, "interval is null");
+ this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s(%s, INTERVAL %s %s)", getName(),
+ timeAttr.format(), interval.getValue(), timeUnit.format());
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(timeAttr, interval, timeUnit);
+ }
+
+ @Override
+ public String getName() {
+ return "HOP";
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunction.java
new file mode 100644
index 000000000..42d3694d5
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("hopStart")
+@Data
+@NoArgsConstructor
+public class HopStartFunction implements TimeWindowFunction {
+
+ @JsonProperty("timeAttr")
+ private FieldInfo timeAttr;
+ @JsonProperty("interval")
+ private ConstantParam interval;
+ @JsonProperty("timeUnit")
+ private TimeUnitConstantParam timeUnit;
+
+ @JsonCreator
+ public HopStartFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+ @JsonProperty("interval") ConstantParam interval,
+ @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+ this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+ this.interval = Preconditions.checkNotNull(interval, "interval is null");
+ this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s(%s, INTERVAL %s %s)", getName(),
+ timeAttr.format(), interval.getValue(), timeUnit.format());
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(timeAttr, interval, timeUnit);
+ }
+
+ @Override
+ public String getName() {
+ return "HOP_START";
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunction.java
new file mode 100644
index 000000000..19bb7f5a1
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunction.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.LogicOperator;
+import org.apache.inlong.sort.protocol.transformation.MultiValueCompareOperator;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@JsonTypeName("multiValueFilter")
+@Data
+@NoArgsConstructor
+public class MultiValueFilterFunction implements FilterFunction {
+
+ @JsonProperty("source")
+ private FunctionParam source;
+ @JsonProperty("targets")
+ private List<FunctionParam> targets;
+ @JsonProperty("compareOperator")
+ private MultiValueCompareOperator compareOperator;
+ @JsonProperty("logicOperator")
+ private LogicOperator logicOperator;
+
+ @JsonCreator
+ public MultiValueFilterFunction(
+ @JsonProperty("source") FunctionParam source,
+ @JsonProperty("targets") List<FunctionParam> targets,
+ @JsonProperty("compareOperator") MultiValueCompareOperator compareOperator,
+ @JsonProperty("logicOperator") LogicOperator logicOperator) {
+ this.source = Preconditions.checkNotNull(source, "source is null");
+ this.targets = Preconditions.checkNotNull(targets, "targets is null");
+ Preconditions.checkState(!targets.isEmpty(), "targets is empty");
+ this.compareOperator = Preconditions.checkNotNull(compareOperator, "compareOperator is null");
+ this.logicOperator = Preconditions.checkNotNull(logicOperator, "logicOperator is null");
+ }
+
+ @Override
+ public String format() {
+ String targetStr = StringUtils
+ .join(targets.stream().map(FunctionParam::format).collect(Collectors.toList()), ",");
+ return String.format("%s %s %s (%s)", logicOperator.format(),
+ source.format(), compareOperator.format(), targetStr);
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ List<FunctionParam> params = Arrays.asList(logicOperator, source, compareOperator);
+ params.addAll(targets);
+ return params;
+ }
+
+ @Override
+ public String getName() {
+ return "multiValueFilter";
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunction.java
new file mode 100644
index 000000000..3e13c7a1c
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("sessionEnd")
+@Data
+@NoArgsConstructor
+public class SessionEndFunction implements TimeWindowFunction {
+
+ @JsonProperty("timeAttr")
+ private FieldInfo timeAttr;
+ @JsonProperty("interval")
+ private ConstantParam interval;
+ @JsonProperty("timeUnit")
+ private TimeUnitConstantParam timeUnit;
+
+ @JsonCreator
+ public SessionEndFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+ @JsonProperty("interval") ConstantParam interval,
+ @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+ this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+ this.interval = Preconditions.checkNotNull(interval, "interval is null");
+ this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s(%s, INTERVAL %s %s)", getName(),
+ timeAttr.format(), interval.getValue(), timeUnit.format());
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(timeAttr, interval, timeUnit);
+ }
+
+ @Override
+ public String getName() {
+ return "SESSION_END";
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunction.java
new file mode 100644
index 000000000..565ba3d0d
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.GroupTimeWindowFunction;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("session")
+@Data
+@NoArgsConstructor
+public class SessionFunction implements GroupTimeWindowFunction {
+
+ @JsonProperty("timeAttr")
+ private FieldInfo timeAttr;
+ @JsonProperty("interval")
+ private ConstantParam interval;
+ @JsonProperty("timeUnit")
+ private TimeUnitConstantParam timeUnit;
+
+ @JsonCreator
+ public SessionFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+ @JsonProperty("interval") ConstantParam interval,
+ @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+ this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+ this.interval = Preconditions.checkNotNull(interval, "interval is null");
+ this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s(%s, INTERVAL %s %s)", getName(),
+ timeAttr.format(), interval.getValue(), timeUnit.format());
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(timeAttr, interval, timeUnit);
+ }
+
+ @Override
+ public String getName() {
+ return "SESSION";
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunction.java
new file mode 100644
index 000000000..8bb725b34
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("sessionStart")
+@Data
+@NoArgsConstructor
+public class SessionStartFunction implements TimeWindowFunction {
+
+ @JsonProperty("timeAttr")
+ private FieldInfo timeAttr;
+ @JsonProperty("interval")
+ private ConstantParam interval;
+ @JsonProperty("timeUnit")
+ private TimeUnitConstantParam timeUnit;
+
+ @JsonCreator
+ public SessionStartFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+ @JsonProperty("interval") ConstantParam interval,
+ @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+ this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+ this.interval = Preconditions.checkNotNull(interval, "interval is null");
+ this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s(%s, INTERVAL %s %s)", getName(),
+ timeAttr.format(), interval.getValue(), timeUnit.format());
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(timeAttr, interval, timeUnit);
+ }
+
+ @Override
+ public String getName() {
+ return "SESSION_START";
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunction.java
new file mode 100644
index 000000000..e525586fd
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunction.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.LogicOperator;
+import org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("singleValueFilter")
+@Data
+@NoArgsConstructor
+public class SingleValueFilterFunction implements FilterFunction, Serializable {
+
+ private static final long serialVersionUID = 8953419088907830331L;
+
+ @JsonProperty("source")
+ private FunctionParam source;
+ @JsonProperty("target")
+ private FunctionParam target;
+ @JsonProperty("compareOperator")
+ private SingleValueCompareOperator compareOperator;
+ @JsonProperty("logicOperator")
+ private LogicOperator logicOperator;
+
+ @JsonCreator
+ public SingleValueFilterFunction(
+ @JsonProperty("logicOperator") LogicOperator logicOperator,
+ @JsonProperty("source") FunctionParam source,
+ @JsonProperty("compareOperator") SingleValueCompareOperator compareOperator,
+ @JsonProperty("target") FunctionParam target) {
+ this.source = Preconditions.checkNotNull(source, "source is null");
+ this.target = Preconditions.checkNotNull(target, "target is null");
+ this.compareOperator = Preconditions.checkNotNull(compareOperator, "compareOperator is null");
+ this.logicOperator = Preconditions.checkNotNull(logicOperator, "logicOperator is null");
+ }
+
+ @Override
+ public String getName() {
+ return "singleValueFilter";
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(logicOperator, source, compareOperator, target);
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s %s %s %s", logicOperator.format(),
+ source.format(), compareOperator.format(), target.format());
+ }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunction.java
new file mode 100644
index 000000000..289c6f626
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("tumbleEnd")
+@Data
+@NoArgsConstructor
+public class TumbleEndFunction implements TimeWindowFunction {
+
+ @JsonProperty("timeAttr")
+ private FieldInfo timeAttr;
+ @JsonProperty("interval")
+ private ConstantParam interval;
+ @JsonProperty("timeUnit")
+ private TimeUnitConstantParam timeUnit;
+
+ @JsonCreator
+ public TumbleEndFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+ @JsonProperty("interval") ConstantParam interval,
+ @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+ this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+ this.interval = Preconditions.checkNotNull(interval, "interval is null");
+ this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s(%s, INTERVAL %s %s)", getName(),
+ timeAttr.format(), interval.getValue(), timeUnit.getValue());
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(timeAttr, interval, timeUnit);
+ }
+
+ @Override
+ public String getName() {
+ return "TUMBLE_END";
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunction.java
new file mode 100644
index 000000000..d3c580996
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.GroupTimeWindowFunction;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("tumble")
+@Data
+@NoArgsConstructor
+public class TumbleFunction implements GroupTimeWindowFunction {
+
+ @JsonProperty("timeAttr")
+ private FieldInfo timeAttr;
+ @JsonProperty("interval")
+ private ConstantParam interval;
+ @JsonProperty("timeUnit")
+ private TimeUnitConstantParam timeUnit;
+
+ @JsonCreator
+ public TumbleFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+ @JsonProperty("interval") ConstantParam interval,
+ @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+ this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+ this.interval = Preconditions.checkNotNull(interval, "interval is null");
+ this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s(%s, INTERVAL %s %s)", getName(),
+ timeAttr.format(), interval.getValue(), timeUnit.format());
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(timeAttr, interval, timeUnit);
+ }
+
+ @Override
+ public String getName() {
+ return "TUMBLE";
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunction.java
new file mode 100644
index 000000000..e996dcb60
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("tumbleStart")
+@Data
+@NoArgsConstructor
+public class TumbleStartFunction implements TimeWindowFunction {
+
+ @JsonProperty("timeAttr")
+ private FieldInfo timeAttr;
+ @JsonProperty("interval")
+ private ConstantParam interval;
+ @JsonProperty("timeUnit")
+ private TimeUnitConstantParam timeUnit;
+
+ @JsonCreator
+ public TumbleStartFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+ @JsonProperty("interval") ConstantParam interval,
+ @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+ this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+ this.interval = Preconditions.checkNotNull(interval, "interval is null");
+ this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+ }
+
+ @Override
+ public String format() {
+ return String.format("%s(%s, INTERVAL %s %s)", getName(),
+ timeAttr.format(), interval.getValue(), timeUnit.format());
+ }
+
+ @Override
+ public List<FunctionParam> getParams() {
+ return Arrays.asList(timeAttr, interval, timeUnit);
+ }
+
+ @Override
+ public String getName() {
+ return "TUMBLE_START";
+ }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/FunctionBaseTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/FunctionBaseTest.java
new file mode 100644
index 000000000..14f0ad976
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/FunctionBaseTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class FunctionBaseTest {
+
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private String expectFormat;
+ private String expectSerializeStr;
+ private Function function;
+
+ public abstract Function getFunction();
+
+ public abstract String getExpectFormat();
+
+ public abstract String getExpectSerializeStr();
+
+ @Before
+ public void init() {
+ this.expectFormat = Preconditions.checkNotNull(getExpectFormat());
+ this.function = Preconditions.checkNotNull(getFunction());
+ this.expectSerializeStr = getExpectSerializeStr();
+ }
+
+ @Test
+ public void testSerialize() throws JsonProcessingException {
+ assertEquals(expectSerializeStr, objectMapper.writeValueAsString(function));
+ }
+
+ @Test
+ public void testDeserialize() throws JsonProcessingException {
+ ObjectMapper objectMapper = new ObjectMapper();
+ Function expected = objectMapper.readValue(expectSerializeStr, function.getClass());
+ assertEquals(expected, function);
+ }
+
+ @Test
+ public void testFormat() throws JsonProcessingException {
+ assertEquals(expectFormat, function.format());
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
new file mode 100644
index 000000000..4d9db2875
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class HopEndFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getFunction() {
+ return new HopEndFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "HOP_END(`time_field`, INTERVAL 1 SECOND)";
+ }
+
+ @Override
+ public String getExpectSerializeStr() {
+ return "{\"type\":\"hopEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
new file mode 100644
index 000000000..9d268d08f
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class HopFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getFunction() {
+ return new HopFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "HOP(`time_field`, INTERVAL 1 SECOND)";
+ }
+
+ @Override
+ public String getExpectSerializeStr() {
+ return "{\"type\":\"hop\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
new file mode 100644
index 000000000..16a33b67e
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class HopStartFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getFunction() {
+ return new HopStartFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "HOP_START(`time_field`, INTERVAL 1 SECOND)";
+ }
+
+ @Override
+ public String getExpectSerializeStr() {
+ return "{\"type\":\"hopStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
new file mode 100644
index 000000000..f5858677f
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class MultiValueFilterFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getFunction() {
+ return new HopFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "HOP(`time_field`, INTERVAL 1 SECOND)";
+ }
+
+ @Override
+ public String getExpectSerializeStr() {
+ return "{\"type\":\"hop\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
new file mode 100644
index 000000000..c6e646b03
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class SessionEndFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getFunction() {
+ return new SessionEndFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "SESSION_END(`time_field`, INTERVAL 1 SECOND)";
+ }
+
+ @Override
+ public String getExpectSerializeStr() {
+ return "{\"type\":\"sessionEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
new file mode 100644
index 000000000..a5dfea513
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class SessionFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getFunction() {
+ return new SessionFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "SESSION(`time_field`, INTERVAL 1 SECOND)";
+ }
+
+ @Override
+ public String getExpectSerializeStr() {
+ return "{\"type\":\"session\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
new file mode 100644
index 000000000..b49d6cbb6
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class SessionStartFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getFunction() {
+ return new SessionStartFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "SESSION_START(`time_field`, INTERVAL 1 SECOND)";
+ }
+
+ @Override
+ public String getExpectSerializeStr() {
+ return "{\"type\":\"sessionStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
new file mode 100644
index 000000000..51299a8b8
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+
+public class SingleValueFilterFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getFunction() {
+ return new SingleValueFilterFunction(EmptyOperator.getInstance(),
+ new FieldInfo("single_value_field", new TimestampFormatInfo()),
+ EqualOperator.getInstance(),
+ new ConstantParam("'123'"));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return " `single_value_field` = '123'";
+ }
+
+ @Override
+ public String getExpectSerializeStr() {
+ return "{\"type\":\"singleValueFilter\",\"logicOperator\":{\"type\":\"empty\"},"
+ + "\"source\":{\"type\":\"base\",\"name\":\"single_value_field\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"compareOperator\":{\"type\":\"equal\"},\"target\":{\"type\":\"constant\",\"value\":\"'123'\"}}";
+
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
new file mode 100644
index 000000000..51a22dcc7
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class TumbleEndFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getFunction() {
+ return new TumbleEndFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "TUMBLE_END(`time_field`, INTERVAL 1 SECOND)";
+ }
+
+ @Override
+ public String getExpectSerializeStr() {
+ return "{\"type\":\"tumbleEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
new file mode 100644
index 000000000..befb16736
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class TumbleFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getFunction() {
+ return new TumbleFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "TUMBLE(`time_field`, INTERVAL 1 SECOND)";
+ }
+
+ @Override
+ public String getExpectSerializeStr() {
+ return "{\"type\":\"tumble\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+ }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
new file mode 100644
index 000000000..10323a281
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class TumbleStartFunctionTest extends FunctionBaseTest {
+
+ @Override
+ public Function getFunction() {
+ return new TumbleStartFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+ new ConstantParam("1"),
+ new TimeUnitConstantParam(TimeUnit.SECOND));
+ }
+
+ @Override
+ public String getExpectFormat() {
+ return "TUMBLE_START(`time_field`, INTERVAL 1 SECOND)";
+ }
+
+ @Override
+ public String getExpectSerializeStr() {
+ return "{\"type\":\"tumbleStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+ + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+ + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+ + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+ }
+}