You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/04/20 12:14:06 UTC

[incubator-inlong] branch master updated: [INLONG-3827][Sort] Add functions definition to support transform (#3850)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new eb487b070 [INLONG-3827][Sort] Add functions definition to support transform (#3850)
eb487b070 is described below

commit eb487b07061c7b1d0b0b66132a749c26e2e95fe3
Author: yunqingmoswu <44...@users.noreply.github.com>
AuthorDate: Wed Apr 20 20:14:00 2022 +0800

    [INLONG-3827][Sort] Add functions definition to support transform (#3850)
    
    Co-authored-by: yunqingmo <yu...@tencent.com>
---
 .../protocol/transformation/FilterFunction.java    |  7 ++
 .../sort/protocol/transformation/Function.java     | 24 ++++++-
 .../protocol/transformation/FunctionParam.java     | 24 ++++++-
 .../transformation/GroupTimeWindowFunction.java    |  9 +++
 .../transformation/TimeWindowFunction.java         | 20 +++++-
 .../transformation/function/HopEndFunction.java    | 72 +++++++++++++++++++
 .../transformation/function/HopFunction.java       | 72 +++++++++++++++++++
 .../transformation/function/HopStartFunction.java  | 72 +++++++++++++++++++
 .../function/MultiValueFilterFunction.java         | 82 ++++++++++++++++++++++
 .../function/SessionEndFunction.java               | 72 +++++++++++++++++++
 .../transformation/function/SessionFunction.java   | 72 +++++++++++++++++++
 .../function/SessionStartFunction.java             | 72 +++++++++++++++++++
 .../function/SingleValueFilterFunction.java        | 78 ++++++++++++++++++++
 .../transformation/function/TumbleEndFunction.java | 72 +++++++++++++++++++
 .../transformation/function/TumbleFunction.java    | 72 +++++++++++++++++++
 .../function/TumbleStartFunction.java              | 72 +++++++++++++++++++
 .../transformation/function/FunctionBaseTest.java  | 65 +++++++++++++++++
 .../function/HopEndFunctionTest.java               | 49 +++++++++++++
 .../transformation/function/HopFunctionTest.java   | 49 +++++++++++++
 .../function/HopStartFunctionTest.java             | 49 +++++++++++++
 .../function/MultiValueFilterFunctionTest.java     | 49 +++++++++++++
 .../function/SessionEndFunctionTest.java           | 49 +++++++++++++
 .../function/SessionFunctionTest.java              | 49 +++++++++++++
 .../function/SessionStartFunctionTest.java         | 49 +++++++++++++
 .../function/SingleValueFilterFunctionTest.java    | 50 +++++++++++++
 .../function/TumbleEndFunctionTest.java            | 49 +++++++++++++
 .../function/TumbleFunctionTest.java               | 49 +++++++++++++
 .../function/TumbleStartFunctionTest.java          | 49 +++++++++++++
 28 files changed, 1494 insertions(+), 3 deletions(-)

diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
index ffcefe35f..e1c98fcf1 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FilterFunction.java
@@ -17,12 +17,19 @@
 
 package org.apache.inlong.sort.protocol.transformation;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
 
 @JsonTypeInfo(
         use = JsonTypeInfo.Id.NAME,
         include = JsonTypeInfo.As.PROPERTY,
         property = "type")
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = SingleValueFilterFunction.class, name = "singleValueFilter"),
+        @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = "multiValueFilter")}
+)
 public interface FilterFunction extends Function {
 
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
index 3fb189e06..957d06282 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/Function.java
@@ -20,6 +20,17 @@ package org.apache.inlong.sort.protocol.transformation;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction;
 
 import java.util.List;
 
@@ -28,7 +39,18 @@ import java.util.List;
         include = JsonTypeInfo.As.PROPERTY,
         property = "type")
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark")
+        @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark"),
+        @JsonSubTypes.Type(value = HopStartFunction.class, name = "hopStart"),
+        @JsonSubTypes.Type(value = HopEndFunction.class, name = "hopEnd"),
+        @JsonSubTypes.Type(value = TumbleStartFunction.class, name = "tumbleStart"),
+        @JsonSubTypes.Type(value = TumbleEndFunction.class, name = "tumbleEnd"),
+        @JsonSubTypes.Type(value = SessionStartFunction.class, name = "sessionStart"),
+        @JsonSubTypes.Type(value = SessionEndFunction.class, name = "sessionEnd"),
+        @JsonSubTypes.Type(value = SessionFunction.class, name = "session"),
+        @JsonSubTypes.Type(value = TumbleFunction.class, name = "tumble"),
+        @JsonSubTypes.Type(value = HopFunction.class, name = "hop"),
+        @JsonSubTypes.Type(value = SingleValueFilterFunction.class, name = "singleValueFilter"),
+        @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = "multiValueFilter")
 })
 public interface Function extends FunctionParam {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
index 2c2e767a6..1c37fac3e 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/FunctionParam.java
@@ -22,6 +22,17 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSub
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.inlong.sort.protocol.BuiltInFieldInfo;
 import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.MultiValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SingleValueFilterFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction;
 import org.apache.inlong.sort.protocol.transformation.operator.AndOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
 import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
@@ -58,7 +69,18 @@ import org.apache.inlong.sort.protocol.transformation.operator.OrOperator;
         @JsonSubTypes.Type(value = MoreThanOrEqualOperator.class, name = "moreThanOrEqual"),
         @JsonSubTypes.Type(value = InOperator.class, name = "in"),
         @JsonSubTypes.Type(value = NotInOperator.class, name = "notIn"),
-        @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark")
+        @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark"),
+        @JsonSubTypes.Type(value = HopStartFunction.class, name = "hopStart"),
+        @JsonSubTypes.Type(value = HopEndFunction.class, name = "hopEnd"),
+        @JsonSubTypes.Type(value = TumbleStartFunction.class, name = "tumbleStart"),
+        @JsonSubTypes.Type(value = TumbleEndFunction.class, name = "tumbleEnd"),
+        @JsonSubTypes.Type(value = SessionStartFunction.class, name = "sessionStart"),
+        @JsonSubTypes.Type(value = SessionEndFunction.class, name = "sessionEnd"),
+        @JsonSubTypes.Type(value = SessionFunction.class, name = "session"),
+        @JsonSubTypes.Type(value = TumbleFunction.class, name = "tumble"),
+        @JsonSubTypes.Type(value = HopFunction.class, name = "hop"),
+        @JsonSubTypes.Type(value = SingleValueFilterFunction.class, name = "singleValueFilter"),
+        @JsonSubTypes.Type(value = MultiValueFilterFunction.class, name = "multiValueFilter")
 })
 public interface FunctionParam {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/GroupTimeWindowFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/GroupTimeWindowFunction.java
index 1c62ab381..25bf3e240 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/GroupTimeWindowFunction.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/GroupTimeWindowFunction.java
@@ -17,12 +17,21 @@
 
 package org.apache.inlong.sort.protocol.transformation;
 
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
 
 @JsonTypeInfo(
         use = JsonTypeInfo.Id.NAME,
         include = JsonTypeInfo.As.PROPERTY,
         property = "type")
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = HopFunction.class, name = "hop"),
+        @JsonSubTypes.Type(value = SessionFunction.class, name = "session"),
+        @JsonSubTypes.Type(value = TumbleFunction.class, name = "tumble")
+})
 public interface GroupTimeWindowFunction extends TimeWindowFunction {
 
 }
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/TimeWindowFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/TimeWindowFunction.java
index e84eb5b63..95ec6dcf9 100644
--- a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/TimeWindowFunction.java
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/TimeWindowFunction.java
@@ -19,13 +19,31 @@ package org.apache.inlong.sort.protocol.transformation;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSubTypes;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.inlong.sort.protocol.transformation.function.HopEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopFunction;
+import org.apache.inlong.sort.protocol.transformation.function.HopStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionFunction;
+import org.apache.inlong.sort.protocol.transformation.function.SessionStartFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleEndFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleFunction;
+import org.apache.inlong.sort.protocol.transformation.function.TumbleStartFunction;
 
 @JsonTypeInfo(
         use = JsonTypeInfo.Id.NAME,
         include = JsonTypeInfo.As.PROPERTY,
         property = "type")
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark")
+        @JsonSubTypes.Type(value = WatermarkField.class, name = "watermark"),
+        @JsonSubTypes.Type(value = HopStartFunction.class, name = "hopStart"),
+        @JsonSubTypes.Type(value = HopEndFunction.class, name = "hopEnd"),
+        @JsonSubTypes.Type(value = TumbleStartFunction.class, name = "tumbleStart"),
+        @JsonSubTypes.Type(value = TumbleEndFunction.class, name = "tumbleEnd"),
+        @JsonSubTypes.Type(value = SessionStartFunction.class, name = "sessionStart"),
+        @JsonSubTypes.Type(value = SessionEndFunction.class, name = "sessionEnd"),
+        @JsonSubTypes.Type(value = SessionFunction.class, name = "session"),
+        @JsonSubTypes.Type(value = TumbleFunction.class, name = "tumble"),
+        @JsonSubTypes.Type(value = HopFunction.class, name = "hop")
 })
 public interface TimeWindowFunction extends Function {
 
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunction.java
new file mode 100644
index 000000000..7b1f15d67
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("hopEnd")
+@Data
+@NoArgsConstructor
+public class HopEndFunction implements TimeWindowFunction {
+
+    @JsonProperty("timeAttr")
+    private FieldInfo timeAttr;
+    @JsonProperty("interval")
+    private ConstantParam interval;
+    @JsonProperty("timeUnit")
+    private TimeUnitConstantParam timeUnit;
+
+    @JsonCreator
+    public HopEndFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+            @JsonProperty("interval") ConstantParam interval,
+            @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+        this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+        this.interval = Preconditions.checkNotNull(interval, "interval is null");
+        this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s(%s, INTERVAL %s %s)", getName(),
+                timeAttr.format(), interval.getValue(), timeUnit.format());
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(timeAttr, interval, timeUnit);
+    }
+
+    @Override
+    public String getName() {
+        return "HOP_END";
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopFunction.java
new file mode 100644
index 000000000..a84a1ef56
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.GroupTimeWindowFunction;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("hop")
+@Data
+@NoArgsConstructor
+public class HopFunction implements GroupTimeWindowFunction {
+
+    @JsonProperty("timeAttr")
+    private FieldInfo timeAttr;
+    @JsonProperty("interval")
+    private ConstantParam interval;
+    @JsonProperty("timeUnit")
+    private TimeUnitConstantParam timeUnit;
+
+    @JsonCreator
+    public HopFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+            @JsonProperty("interval") ConstantParam interval,
+            @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+        this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+        this.interval = Preconditions.checkNotNull(interval, "interval is null");
+        this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s(%s, INTERVAL %s %s)", getName(),
+                timeAttr.format(), interval.getValue(), timeUnit.format());
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(timeAttr, interval, timeUnit);
+    }
+
+    @Override
+    public String getName() {
+        return "HOP";
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunction.java
new file mode 100644
index 000000000..42d3694d5
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("hopStart")
+@Data
+@NoArgsConstructor
+public class HopStartFunction implements TimeWindowFunction {
+
+    @JsonProperty("timeAttr")
+    private FieldInfo timeAttr;
+    @JsonProperty("interval")
+    private ConstantParam interval;
+    @JsonProperty("timeUnit")
+    private TimeUnitConstantParam timeUnit;
+
+    @JsonCreator
+    public HopStartFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+            @JsonProperty("interval") ConstantParam interval,
+            @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+        this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+        this.interval = Preconditions.checkNotNull(interval, "interval is null");
+        this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s(%s, INTERVAL %s %s)", getName(),
+                timeAttr.format(), interval.getValue(), timeUnit.format());
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(timeAttr, interval, timeUnit);
+    }
+
+    @Override
+    public String getName() {
+        return "HOP_START";
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunction.java
new file mode 100644
index 000000000..19bb7f5a1
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunction.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.LogicOperator;
+import org.apache.inlong.sort.protocol.transformation.MultiValueCompareOperator;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+@JsonTypeName("multiValueFilter")
+@Data
+@NoArgsConstructor
+public class MultiValueFilterFunction implements FilterFunction {
+
+    @JsonProperty("source")
+    private FunctionParam source;
+    @JsonProperty("targets")
+    private List<FunctionParam> targets;
+    @JsonProperty("compareOperator")
+    private MultiValueCompareOperator compareOperator;
+    @JsonProperty("logicOperator")
+    private LogicOperator logicOperator;
+
+    @JsonCreator
+    public MultiValueFilterFunction(
+            @JsonProperty("source") FunctionParam source,
+            @JsonProperty("targets") List<FunctionParam> targets,
+            @JsonProperty("compareOperator") MultiValueCompareOperator compareOperator,
+            @JsonProperty("logicOperator") LogicOperator logicOperator) {
+        this.source = Preconditions.checkNotNull(source, "source is null");
+        this.targets = Preconditions.checkNotNull(targets, "targets is null");
+        Preconditions.checkState(!targets.isEmpty(), "targets is empty");
+        this.compareOperator = Preconditions.checkNotNull(compareOperator, "compareOperator is null");
+        this.logicOperator = Preconditions.checkNotNull(logicOperator, "logicOperator is null");
+    }
+
+    @Override
+    public String format() {
+        String targetStr = StringUtils
+                .join(targets.stream().map(FunctionParam::format).collect(Collectors.toList()), ",");
+        return String.format("%s %s %s (%s)", logicOperator.format(),
+                source.format(), compareOperator.format(), targetStr);
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        List<FunctionParam> params = Arrays.asList(logicOperator, source, compareOperator);
+        params.addAll(targets);
+        return params;
+    }
+
+    @Override
+    public String getName() {
+        return "multiValueFilter";
+    }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunction.java
new file mode 100644
index 000000000..3e13c7a1c
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("sessionEnd")
+@Data
+@NoArgsConstructor
+public class SessionEndFunction implements TimeWindowFunction {
+
+    @JsonProperty("timeAttr")
+    private FieldInfo timeAttr;
+    @JsonProperty("interval")
+    private ConstantParam interval;
+    @JsonProperty("timeUnit")
+    private TimeUnitConstantParam timeUnit;
+
+    @JsonCreator
+    public SessionEndFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+            @JsonProperty("interval") ConstantParam interval,
+            @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+        this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+        this.interval = Preconditions.checkNotNull(interval, "interval is null");
+        this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s(%s, INTERVAL %s %s)", getName(),
+                timeAttr.format(), interval.getValue(), timeUnit.format());
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(timeAttr, interval, timeUnit);
+    }
+
+    @Override
+    public String getName() {
+        return "SESSION_END";
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunction.java
new file mode 100644
index 000000000..565ba3d0d
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.GroupTimeWindowFunction;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("session")
+@Data
+@NoArgsConstructor
+public class SessionFunction implements GroupTimeWindowFunction {
+
+    @JsonProperty("timeAttr")
+    private FieldInfo timeAttr;
+    @JsonProperty("interval")
+    private ConstantParam interval;
+    @JsonProperty("timeUnit")
+    private TimeUnitConstantParam timeUnit;
+
+    @JsonCreator
+    public SessionFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+            @JsonProperty("interval") ConstantParam interval,
+            @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+        this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+        this.interval = Preconditions.checkNotNull(interval, "interval is null");
+        this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s(%s, INTERVAL %s %s)", getName(),
+                timeAttr.format(), interval.getValue(), timeUnit.format());
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(timeAttr, interval, timeUnit);
+    }
+
+    @Override
+    public String getName() {
+        return "SESSION";
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunction.java
new file mode 100644
index 000000000..8bb725b34
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("sessionStart")
+@Data
+@NoArgsConstructor
+public class SessionStartFunction implements TimeWindowFunction {
+
+    @JsonProperty("timeAttr")
+    private FieldInfo timeAttr;
+    @JsonProperty("interval")
+    private ConstantParam interval;
+    @JsonProperty("timeUnit")
+    private TimeUnitConstantParam timeUnit;
+
+    @JsonCreator
+    public SessionStartFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+            @JsonProperty("interval") ConstantParam interval,
+            @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+        this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+        this.interval = Preconditions.checkNotNull(interval, "interval is null");
+        this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s(%s, INTERVAL %s %s)", getName(),
+                timeAttr.format(), interval.getValue(), timeUnit.format());
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(timeAttr, interval, timeUnit);
+    }
+
+    @Override
+    public String getName() {
+        return "SESSION_START";
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunction.java
new file mode 100644
index 000000000..e525586fd
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunction.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.transformation.FilterFunction;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.LogicOperator;
+import org.apache.inlong.sort.protocol.transformation.SingleValueCompareOperator;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("singleValueFilter")
+@Data
+@NoArgsConstructor
+public class SingleValueFilterFunction implements FilterFunction, Serializable {
+
+    private static final long serialVersionUID = 8953419088907830331L;
+
+    @JsonProperty("source")
+    private FunctionParam source;
+    @JsonProperty("target")
+    private FunctionParam target;
+    @JsonProperty("compareOperator")
+    private SingleValueCompareOperator compareOperator;
+    @JsonProperty("logicOperator")
+    private LogicOperator logicOperator;
+
+    @JsonCreator
+    public SingleValueFilterFunction(
+            @JsonProperty("logicOperator") LogicOperator logicOperator,
+            @JsonProperty("source") FunctionParam source,
+            @JsonProperty("compareOperator") SingleValueCompareOperator compareOperator,
+            @JsonProperty("target") FunctionParam target) {
+        this.source = Preconditions.checkNotNull(source, "source is null");
+        this.target = Preconditions.checkNotNull(target, "target is null");
+        this.compareOperator = Preconditions.checkNotNull(compareOperator, "compareOperator is null");
+        this.logicOperator = Preconditions.checkNotNull(logicOperator, "logicOperator is null");
+    }
+
+    @Override
+    public String getName() {
+        return "singleValueFilter";
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(logicOperator, source, compareOperator, target);
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s %s %s %s", logicOperator.format(),
+                source.format(), compareOperator.format(), target.format());
+    }
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunction.java
new file mode 100644
index 000000000..289c6f626
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("tumbleEnd")
+@Data
+@NoArgsConstructor
+public class TumbleEndFunction implements TimeWindowFunction {
+
+    @JsonProperty("timeAttr")
+    private FieldInfo timeAttr;
+    @JsonProperty("interval")
+    private ConstantParam interval;
+    @JsonProperty("timeUnit")
+    private TimeUnitConstantParam timeUnit;
+
+    @JsonCreator
+    public TumbleEndFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+            @JsonProperty("interval") ConstantParam interval,
+            @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+        this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+        this.interval = Preconditions.checkNotNull(interval, "interval is null");
+        this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s(%s, INTERVAL %s %s)", getName(),
+                timeAttr.format(), interval.getValue(), timeUnit.getValue());
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(timeAttr, interval, timeUnit);
+    }
+
+    @Override
+    public String getName() {
+        return "TUMBLE_END";
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunction.java
new file mode 100644
index 000000000..d3c580996
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.GroupTimeWindowFunction;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("tumble")
+@Data
+@NoArgsConstructor
+public class TumbleFunction implements GroupTimeWindowFunction {
+
+    @JsonProperty("timeAttr")
+    private FieldInfo timeAttr;
+    @JsonProperty("interval")
+    private ConstantParam interval;
+    @JsonProperty("timeUnit")
+    private TimeUnitConstantParam timeUnit;
+
+    @JsonCreator
+    public TumbleFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+            @JsonProperty("interval") ConstantParam interval,
+            @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+        this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+        this.interval = Preconditions.checkNotNull(interval, "interval is null");
+        this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s(%s, INTERVAL %s %s)", getName(),
+                timeAttr.format(), interval.getValue(), timeUnit.format());
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(timeAttr, interval, timeUnit);
+    }
+
+    @Override
+    public String getName() {
+        return "TUMBLE";
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunction.java b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunction.java
new file mode 100644
index 000000000..e996dcb60
--- /dev/null
+++ b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunction.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.FunctionParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeWindowFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+@JsonTypeName("tumbleStart")
+@Data
+@NoArgsConstructor
+public class TumbleStartFunction implements TimeWindowFunction {
+
+    @JsonProperty("timeAttr")
+    private FieldInfo timeAttr;
+    @JsonProperty("interval")
+    private ConstantParam interval;
+    @JsonProperty("timeUnit")
+    private TimeUnitConstantParam timeUnit;
+
+    @JsonCreator
+    public TumbleStartFunction(@JsonProperty("timeAttr") FieldInfo timeAttr,
+            @JsonProperty("interval") ConstantParam interval,
+            @JsonProperty("timeUnit") TimeUnitConstantParam timeUnit) {
+        this.timeAttr = Preconditions.checkNotNull(timeAttr, "timeAttr is null");
+        this.interval = Preconditions.checkNotNull(interval, "interval is null");
+        this.timeUnit = Preconditions.checkNotNull(timeUnit, "timeUnit is null");
+    }
+
+    @Override
+    public String format() {
+        return String.format("%s(%s, INTERVAL %s %s)", getName(),
+                timeAttr.format(), interval.getValue(), timeUnit.format());
+    }
+
+    @Override
+    public List<FunctionParam> getParams() {
+        return Arrays.asList(timeAttr, interval, timeUnit);
+    }
+
+    @Override
+    public String getName() {
+        return "TUMBLE_START";
+    }
+
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/FunctionBaseTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/FunctionBaseTest.java
new file mode 100644
index 000000000..14f0ad976
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/FunctionBaseTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public abstract class FunctionBaseTest {
+
+    private final ObjectMapper objectMapper = new ObjectMapper();
+    private String expectFormat;
+    private String expectSerializeStr;
+    private Function function;
+
+    public abstract Function getFunction();
+
+    public abstract String getExpectFormat();
+
+    public abstract String getExpectSerializeStr();
+
+    @Before
+    public void init() {
+        this.expectFormat = Preconditions.checkNotNull(getExpectFormat());
+        this.function = Preconditions.checkNotNull(getFunction());
+        this.expectSerializeStr = getExpectSerializeStr();
+    }
+
+    @Test
+    public void testSerialize() throws JsonProcessingException {
+        assertEquals(expectSerializeStr, objectMapper.writeValueAsString(function));
+    }
+
+    @Test
+    public void testDeserialize() throws JsonProcessingException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        Function expected = objectMapper.readValue(expectSerializeStr, function.getClass());
+        assertEquals(expected, function);
+    }
+
+    @Test
+    public void testFormat() throws JsonProcessingException {
+        assertEquals(expectFormat, function.format());
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
new file mode 100644
index 000000000..4d9db2875
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopEndFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class HopEndFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getFunction() {
+        return new HopEndFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "HOP_END(`time_field`, INTERVAL 1 SECOND)";
+    }
+
+    @Override
+    public String getExpectSerializeStr() {
+        return "{\"type\":\"hopEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
new file mode 100644
index 000000000..9d268d08f
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class HopFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getFunction() {
+        return new HopFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "HOP(`time_field`, INTERVAL 1 SECOND)";
+    }
+
+    @Override
+    public String getExpectSerializeStr() {
+        return "{\"type\":\"hop\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
new file mode 100644
index 000000000..16a33b67e
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/HopStartFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class HopStartFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getFunction() {
+        return new HopStartFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "HOP_START(`time_field`, INTERVAL 1 SECOND)";
+    }
+
+    @Override
+    public String getExpectSerializeStr() {
+        return "{\"type\":\"hopStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
new file mode 100644
index 000000000..f5858677f
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/MultiValueFilterFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class MultiValueFilterFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getFunction() {
+        return new HopFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "HOP(`time_field`, INTERVAL 1 SECOND)";
+    }
+
+    @Override
+    public String getExpectSerializeStr() {
+        return "{\"type\":\"hop\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
new file mode 100644
index 000000000..c6e646b03
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionEndFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class SessionEndFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getFunction() {
+        return new SessionEndFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "SESSION_END(`time_field`, INTERVAL 1 SECOND)";
+    }
+
+    @Override
+    public String getExpectSerializeStr() {
+        return "{\"type\":\"sessionEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
new file mode 100644
index 000000000..a5dfea513
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class SessionFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getFunction() {
+        return new SessionFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "SESSION(`time_field`, INTERVAL 1 SECOND)";
+    }
+
+    @Override
+    public String getExpectSerializeStr() {
+        return "{\"type\":\"session\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
new file mode 100644
index 000000000..b49d6cbb6
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SessionStartFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class SessionStartFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getFunction() {
+        return new SessionStartFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "SESSION_START(`time_field`, INTERVAL 1 SECOND)";
+    }
+
+    @Override
+    public String getExpectSerializeStr() {
+        return "{\"type\":\"sessionStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
new file mode 100644
index 000000000..51299a8b8
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/SingleValueFilterFunctionTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.operator.EmptyOperator;
+import org.apache.inlong.sort.protocol.transformation.operator.EqualOperator;
+
+public class SingleValueFilterFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getFunction() {
+        return new SingleValueFilterFunction(EmptyOperator.getInstance(),
+                new FieldInfo("single_value_field", new TimestampFormatInfo()),
+                EqualOperator.getInstance(),
+                new ConstantParam("'123'"));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return " `single_value_field` = '123'";
+    }
+
+    @Override
+    public String getExpectSerializeStr() {
+        return "{\"type\":\"singleValueFilter\",\"logicOperator\":{\"type\":\"empty\"},"
+                + "\"source\":{\"type\":\"base\",\"name\":\"single_value_field\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"compareOperator\":{\"type\":\"equal\"},\"target\":{\"type\":\"constant\",\"value\":\"'123'\"}}";
+
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
new file mode 100644
index 000000000..51a22dcc7
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleEndFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class TumbleEndFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getFunction() {
+        return new TumbleEndFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "TUMBLE_END(`time_field`, INTERVAL 1 SECOND)";
+    }
+
+    @Override
+    public String getExpectSerializeStr() {
+        return "{\"type\":\"tumbleEnd\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
new file mode 100644
index 000000000..befb16736
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class TumbleFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getFunction() {
+        return new TumbleFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "TUMBLE(`time_field`, INTERVAL 1 SECOND)";
+    }
+
+    @Override
+    public String getExpectSerializeStr() {
+        return "{\"type\":\"tumble\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+    }
+}
diff --git a/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
new file mode 100644
index 000000000..10323a281
--- /dev/null
+++ b/inlong-sort/sort-common/src/test/java/org/apache/inlong/sort/protocol/transformation/function/TumbleStartFunctionTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.protocol.transformation.function;
+
+import org.apache.inlong.sort.formats.common.TimestampFormatInfo;
+import org.apache.inlong.sort.protocol.FieldInfo;
+import org.apache.inlong.sort.protocol.transformation.ConstantParam;
+import org.apache.inlong.sort.protocol.transformation.Function;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam;
+import org.apache.inlong.sort.protocol.transformation.TimeUnitConstantParam.TimeUnit;
+
+public class TumbleStartFunctionTest extends FunctionBaseTest {
+
+    @Override
+    public Function getFunction() {
+        return new TumbleStartFunction(new FieldInfo("time_field", new TimestampFormatInfo()),
+                new ConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.SECOND));
+    }
+
+    @Override
+    public String getExpectFormat() {
+        return "TUMBLE_START(`time_field`, INTERVAL 1 SECOND)";
+    }
+
+    @Override
+    public String getExpectSerializeStr() {
+        return "{\"type\":\"tumbleStart\",\"timeAttr\":{\"type\":\"base\",\"name\":\"time_field\","
+                + "\"formatInfo\":{\"type\":\"timestamp\",\"format\":\"yyyy-MM-dd HH:mm:ss\"}},"
+                + "\"interval\":{\"type\":\"constant\",\"value\":\"1\"},\"timeUnit\":{\"type\":\"timeUnitConstant\","
+                + "\"timeUnit\":\"SECOND\",\"value\":\"SECOND\"}}";
+
+    }
+}