You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/28 11:53:40 UTC

[iotdb] branch master updated: [IOTDB-4085] Add StateWindowAccessStrategy in UDF (#7005)

This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 136bfd05e8 [IOTDB-4085] Add StateWindowAccessStrategy in UDF (#7005)
136bfd05e8 is described below

commit 136bfd05e8332c9b1f86338a8bea993a82aa09dc
Author: AACEPT <34...@users.noreply.github.com>
AuthorDate: Sun Aug 28 19:53:34 2022 +0800

    [IOTDB-4085] Add StateWindowAccessStrategy in UDF (#7005)
---
 .../db/query/udf/example/ExampleUDFConstant.java   |   2 +
 .../iotdb/db/query/udf/example/WindowStartEnd.java |  23 ++
 .../iotdb/itbase/constant/UDFTestConstant.java     |   2 +
 ...ueryIT.java => IoTDBUDFOtherWindowQueryIT.java} | 238 ++++++++++++++++++++-
 .../visitor/IntermediateLayerVisitor.java          |   1 +
 .../intermediate/ConstantIntermediateLayer.java    |   8 +
 .../dag/intermediate/IntermediateLayer.java        |   8 +
 .../MultiInputColumnIntermediateLayer.java         |   7 +
 ...InputColumnMultiReferenceIntermediateLayer.java | 121 +++++++++++
 ...nputColumnSingleReferenceIntermediateLayer.java | 121 +++++++++++
 .../transformation/dag/util/TransformUtils.java    |  90 ++++++++
 .../datastructure/util/ValueRecorder.java          |  88 ++++++++
 .../api/customizer/strategy/AccessStrategy.java    |   5 +-
 .../strategy/StateWindowAccessStrategy.java        | 115 ++++++++++
 14 files changed, 827 insertions(+), 2 deletions(-)

diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
index 3dfb8303a0..98b0e4aa19 100644
--- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
+++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
@@ -24,10 +24,12 @@ public class ExampleUDFConstant {
   public static final String ACCESS_STRATEGY_SLIDING_SIZE = "size";
   public static final String ACCESS_STRATEGY_SLIDING_TIME = "time";
   public static final String ACCESS_STRATEGY_SESSION = "session";
+  public static final String ACCESS_STRATEGY_STATE = "state";
   public static final String WINDOW_SIZE_KEY = "windowSize";
   public static final String TIME_INTERVAL_KEY = "timeInterval";
   public static final String SLIDING_STEP_KEY = "slidingStep";
   public static final String SESSION_GAP_KEY = "sessionGap";
+  public static final String STATE_DELTA_KEY = "delta";
   public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
   public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
 }
diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
index bd3a369365..7045aca687 100644
--- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
+++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
 import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
 import org.apache.iotdb.udf.api.type.Type;
 
 import java.io.IOException;
@@ -70,6 +71,28 @@ public class WindowStartEnd implements UDTF {
                   parameters.getLong(ExampleUDFConstant.SESSION_GAP_KEY))
               : new SessionTimeWindowAccessStrategy(
                   parameters.getLong(ExampleUDFConstant.SESSION_GAP_KEY)));
+    } else if (ExampleUDFConstant.ACCESS_STRATEGY_STATE.equals(
+        parameters.getString(ExampleUDFConstant.ACCESS_STRATEGY_KEY))) {
+      if (parameters.hasAttribute(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY)
+          && parameters.hasAttribute(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY)) {
+        if (parameters.hasAttribute(ExampleUDFConstant.STATE_DELTA_KEY)) {
+          configurations.setAccessStrategy(
+              new StateWindowAccessStrategy(
+                  parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY),
+                  parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY),
+                  parameters.getLong(ExampleUDFConstant.STATE_DELTA_KEY)));
+        } else {
+          configurations.setAccessStrategy(
+              new StateWindowAccessStrategy(
+                  parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY),
+                  parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY)));
+        }
+      } else if (parameters.hasAttribute(ExampleUDFConstant.STATE_DELTA_KEY)) {
+        configurations.setAccessStrategy(
+            new StateWindowAccessStrategy(parameters.getLong(ExampleUDFConstant.STATE_DELTA_KEY)));
+      } else {
+        configurations.setAccessStrategy(new StateWindowAccessStrategy());
+      }
     }
   }
 
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
index 31dfeeb661..8114f59610 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
@@ -24,10 +24,12 @@ public class UDFTestConstant {
   public static final String ACCESS_STRATEGY_SLIDING_SIZE = "size";
   public static final String ACCESS_STRATEGY_SLIDING_TIME = "time";
   public static final String ACCESS_STRATEGY_SESSION = "session";
+  public static final String ACCESS_STRATEGY_STATE = "state";
   public static final String WINDOW_SIZE_KEY = "windowSize";
   public static final String TIME_INTERVAL_KEY = "timeInterval";
   public static final String SLIDING_STEP_KEY = "slidingStep";
   public static final String SESSION_GAP_KEY = "sessionGap";
+  public static final String STATE_DELTA_KEY = "delta";
   public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
   public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
 }
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFOtherWindowQueryIT.java
similarity index 53%
rename from integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java
rename to integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFOtherWindowQueryIT.java
index 5698d7d4bc..07744152a6 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFOtherWindowQueryIT.java
@@ -41,9 +41,10 @@ import java.util.ArrayList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+/** Test for SessionWindow, StateWindow and UserDefinedWindow. */
 @RunWith(IoTDBTestRunner.class)
 @Category({ClusterIT.class})
-public class IoTDBUDFSessionWindowQueryIT {
+public class IoTDBUDFOtherWindowQueryIT {
 
   protected static final int ITERATION_TIMES = 1000;
 
@@ -86,12 +87,17 @@ public class IoTDBUDFSessionWindowQueryIT {
         Statement statement = connection.createStatement()) {
       statement.execute("SET STORAGE GROUP TO root.vehicle");
       statement.execute("CREATE TIMESERIES root.vehicle.d1.s3 with datatype=INT32,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.vehicle.d1.s4 with datatype=INT32,encoding=PLAIN");
+      statement.execute(
+          "CREATE TIMESERIES root.vehicle.d1.s5 with datatype=BOOLEAN,encoding=PLAIN");
+      statement.execute("CREATE TIMESERIES root.vehicle.d1.s6 with datatype=TEXT,encoding=PLAIN");
     } catch (SQLException throwable) {
       fail(throwable.getMessage());
     }
   }
 
   private static void generateData() {
+    // SessionWindow
     try (Connection connection = EnvFactory.getEnv().getConnection();
         Statement statement = connection.createStatement()) {
       for (int i = 0; i < ITERATION_TIMES; ++i) {
@@ -102,6 +108,51 @@ public class IoTDBUDFSessionWindowQueryIT {
         statement.execute(
             (String.format("insert into root.vehicle.d1(timestamp,s3) values(%d,%d)", i, i)));
       }
+
+      // StateWindow INT32
+      for (int i = 0; i < ITERATION_TIMES; ++i) {
+        if (i == 1 || i == 2 || i == 3 || i == 15 || i == 17 || i == 53 || i == 54 || i == 996
+            || i == 997 || i == 998) {
+          statement.execute(
+              (String.format(
+                  "insert into root.vehicle.d1(timestamp,s4) values(%d,%d)", i, i + 100)));
+        } else if (i == 500 || i == 501) {
+          statement.execute(
+              (String.format("insert into root.vehicle.d1(timestamp,s4) values(%d,%d)", i, 12)));
+        } else if (i % 2 == 0) {
+          statement.execute(
+              (String.format("insert into root.vehicle.d1(timestamp,s4) values(%d,%d)", i, 0)));
+        } else {
+          statement.execute(
+              (String.format("insert into root.vehicle.d1(timestamp,s4) values(%d,%d)", i, 3)));
+        }
+      }
+
+      // StateWindow BOOLEAN
+      for (int i = 0; i < ITERATION_TIMES; ++i) {
+        if (i == 1 || i == 2 || i == 3 || i == 15 || i == 17 || i == 53 || i == 54 || i == 996
+            || i == 997 || i == 998) {
+          statement.execute(
+              (String.format("insert into root.vehicle.d1(timestamp,s5) values(%d, true)", i)));
+        } else {
+          statement.execute(
+              (String.format("insert into root.vehicle.d1(timestamp,s5) values(%d, false)", i)));
+        }
+      }
+
+      // StateWindow TEXT
+      for (int i = 0; i < ITERATION_TIMES; ++i) {
+        if (i < 500) {
+          statement.execute(
+              (String.format("insert into root.vehicle.d1(timestamp,s6) values(%d, '<500')", i)));
+        } else if (i < 993) {
+          statement.execute(
+              (String.format("insert into root.vehicle.d1(timestamp,s6) values(%d, '<993')", i)));
+        } else {
+          statement.execute(
+              (String.format("insert into root.vehicle.d1(timestamp,s6) values(%d, '>=994')", i)));
+        }
+      }
     } catch (SQLException throwable) {
       fail(throwable.getMessage());
     }
@@ -280,4 +331,189 @@ public class IoTDBUDFSessionWindowQueryIT {
     Long displayEnd = -100L;
     testSessionTimeWindowSSOutOfRange(sessionGap, displayBegin, displayEnd);
   }
+
+  private void testStateWindowSS(
+      String measurement,
+      String delta,
+      long[] windowStart,
+      long[] windowEnd,
+      Long displayBegin,
+      Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      if (delta == null) {
+        sql =
+            String.format(
+                "select window_start_end(%s, '%s'='%s') from root.vehicle.d1",
+                measurement,
+                UDFTestConstant.ACCESS_STRATEGY_KEY,
+                UDFTestConstant.ACCESS_STRATEGY_STATE);
+      } else {
+        sql =
+            String.format(
+                "select window_start_end(%s, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+                measurement,
+                UDFTestConstant.ACCESS_STRATEGY_KEY,
+                UDFTestConstant.ACCESS_STRATEGY_STATE,
+                UDFTestConstant.STATE_DELTA_KEY,
+                delta);
+      }
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(%s, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              measurement,
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_STATE,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin,
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd,
+              UDFTestConstant.STATE_DELTA_KEY,
+              delta);
+    }
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      assertEquals(2, resultSet.getMetaData().getColumnCount());
+      int cnt = 0;
+      while (resultSet.next()) {
+        Assert.assertEquals(resultSet.getLong(1), windowStart[cnt]);
+        Assert.assertEquals(resultSet.getLong(2), windowEnd[cnt]);
+        cnt++;
+      }
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testStateWindowSS1() {
+    String delta = "10";
+    Long displayBegin = 10L;
+    Long displayEnd = 976L;
+    long[] windowStart = new long[] {10, 15, 16, 17, 18, 53, 55};
+    long[] windowEnd = new long[] {14, 15, 16, 17, 52, 54, 975};
+    testStateWindowSS("s4", delta, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testStateWindowSS2() {
+    String delta = "9";
+    Long displayBegin = 0L;
+    Long displayEnd = 100000L;
+    long[] windowStart = new long[] {0, 1, 4, 15, 16, 17, 18, 53, 55, 500, 502, 996, 999};
+    long[] windowEnd = new long[] {0, 3, 14, 15, 16, 17, 52, 54, 499, 501, 995, 998, 999};
+    testStateWindowSS("s4", delta, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testStateWindowSS3() {
+    String delta = "102";
+    Long displayBegin = -100L;
+    Long displayEnd = 100000L;
+    long[] windowStart = new long[] {0, 2, 4, 15, 16, 17, 18, 53, 55, 996, 999};
+    long[] windowEnd = new long[] {1, 3, 14, 15, 16, 17, 52, 54, 995, 998, 999};
+    testStateWindowSS("s4", delta, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testStateWindowSS4() {
+    String delta = "102";
+    long[] windowStart = new long[] {0, 2, 4, 15, 16, 17, 18, 53, 55, 996, 999};
+    long[] windowEnd = new long[] {1, 3, 14, 15, 16, 17, 52, 54, 995, 998, 999};
+    testStateWindowSS("s4", delta, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testStateWindowSS5() {
+    String delta = "2";
+    Long displayBegin = -100L;
+    Long displayEnd = 1L;
+    long[] windowStart = new long[] {0};
+    long[] windowEnd = new long[] {0};
+    testStateWindowSS("s4", delta, windowStart, windowEnd, displayBegin, displayEnd);
+  }
+
+  private void testStateWindowSSOutOfRange(
+      String measurement, String delta, Long displayBegin, Long displayEnd) {
+    String sql;
+    if (displayBegin == null) {
+      if (delta == null) {
+        sql =
+            String.format(
+                "select window_start_end(%s, '%s'='%s') from root.vehicle.d1",
+                measurement,
+                UDFTestConstant.ACCESS_STRATEGY_KEY,
+                UDFTestConstant.ACCESS_STRATEGY_STATE);
+      } else {
+        sql =
+            String.format(
+                "select window_start_end(%s, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+                measurement,
+                UDFTestConstant.ACCESS_STRATEGY_KEY,
+                UDFTestConstant.ACCESS_STRATEGY_STATE,
+                UDFTestConstant.STATE_DELTA_KEY,
+                delta);
+      }
+    } else {
+      sql =
+          String.format(
+              "select window_start_end(%s, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+              measurement,
+              UDFTestConstant.ACCESS_STRATEGY_KEY,
+              UDFTestConstant.ACCESS_STRATEGY_STATE,
+              UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+              displayBegin,
+              UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+              displayEnd,
+              UDFTestConstant.STATE_DELTA_KEY,
+              delta);
+    }
+
+    try (Connection conn = EnvFactory.getEnv().getConnection();
+        Statement statement = conn.createStatement();
+        ResultSet resultSet = statement.executeQuery(sql)) {
+      int count = 0;
+      while (resultSet.next()) {
+        count++;
+      }
+      assertEquals(count, 0);
+    } catch (SQLException e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testStateWindowSS6() {
+    String delta = "2";
+    Long displayBegin = -100L;
+    Long displayEnd = -1L;
+    testStateWindowSSOutOfRange("s4", delta, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testStateWindowSS7() {
+    String delta = "2";
+    Long displayBegin = 10000L;
+    Long displayEnd = 100005L;
+    testStateWindowSSOutOfRange("s4", delta, displayBegin, displayEnd);
+  }
+
+  @Test
+  public void testStateWindowSS8() {
+    long[] windowStart = new long[] {0, 1, 4, 15, 16, 17, 18, 53, 55, 996, 999};
+    long[] windowEnd = new long[] {0, 3, 14, 15, 16, 17, 52, 54, 995, 998, 999};
+    testStateWindowSS("s5", null, windowStart, windowEnd, null, null);
+  }
+
+  @Test
+  public void testStateWindowSS9() {
+    long[] windowStart = new long[] {0, 500, 993};
+    long[] windowEnd = new long[] {499, 992, 999};
+    testStateWindowSS("s6", null, windowStart, windowEnd, null, null);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
index f7d9a4e6c5..e68c5d7aa8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
@@ -397,6 +397,7 @@ public class IntermediateLayerVisitor
       case SLIDING_SIZE_WINDOW:
       case SLIDING_TIME_WINDOW:
       case SESSION_TIME_WINDOW:
+      case STATE_WINDOW:
         return new UDFQueryRowWindowTransformer(
             udfInputIntermediateLayer.constructRowWindowReader(
                 accessStrategy, context.memoryAssigner.assign()),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
index 440c2e6fbd..dc9865a5ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.transformation.dag.input.ConstantInputReader;
 import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
 
 /** IntermediateLayer for constants. */
 public class ConstantIntermediateLayer extends IntermediateLayer {
@@ -71,4 +72,11 @@ public class ConstantIntermediateLayer extends IntermediateLayer {
     // Not allowed since the timestamp of a constant row is not defined.
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowStateWindowReader(
+      StateWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    // Not allowed since the timestamp of a constant row is not defined.
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
index 0a8cd2e6b4..94608a54ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
 
 import java.io.IOException;
 
@@ -63,6 +64,9 @@ public abstract class IntermediateLayer {
       case SESSION_TIME_WINDOW:
         return constructRowSessionTimeWindowReader(
             (SessionTimeWindowAccessStrategy) strategy, memoryBudgetInMB);
+      case STATE_WINDOW:
+        return constructRowStateWindowReader(
+            (StateWindowAccessStrategy) strategy, memoryBudgetInMB);
       default:
         throw new IllegalStateException(
             "Unexpected access strategy: " + strategy.getAccessStrategyType());
@@ -81,6 +85,10 @@ public abstract class IntermediateLayer {
       SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
       throws QueryProcessException, IOException;
 
+  protected abstract LayerRowWindowReader constructRowStateWindowReader(
+      StateWindowAccessStrategy strategy, float memoryBudgetInMB)
+      throws QueryProcessException, IOException;
+
   @Override
   public String toString() {
     return expression.toString();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
index 4edf946b97..588ba5ae3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.udf.api.access.RowWindow;
 import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -772,4 +773,10 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowStateWindowReader(
+      StateWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
index 9171fcc6ed..a61d75083e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -30,7 +30,9 @@ import org.apache.iotdb.db.mpp.transformation.dag.adapter.ElasticSerializableTVL
 import org.apache.iotdb.db.mpp.transformation.dag.memory.SafetyLine;
 import org.apache.iotdb.db.mpp.transformation.dag.memory.SafetyLine.SafetyPile;
 import org.apache.iotdb.db.mpp.transformation.dag.util.LayerCacheUtils;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
 import org.apache.iotdb.db.mpp.transformation.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.db.mpp.transformation.datastructure.util.ValueRecorder;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 import org.apache.iotdb.udf.api.access.Row;
@@ -38,6 +40,7 @@ import org.apache.iotdb.udf.api.access.RowWindow;
 import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -659,4 +662,122 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowStateWindowReader(
+      StateWindowAccessStrategy strategy, float memoryBudgetInMB) {
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final double delta = strategy.getDelta();
+
+    final SafetyPile safetyPile = safetyLine.addSafetyPile();
+    final ElasticSerializableTVListBackedSingleColumnWindow window =
+        new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      private ValueRecorder valueRecorder = new ValueRecorder();
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (tvList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldPoint(
+                    parentLayerPointReaderDataType, parentLayerPointReader, tvList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, tvList.getTime(0));
+          hasAtLeastOneRow = tvList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (tvList.getTime(tvList.size() - 1) < displayWindowEnd) {
+          final YieldableState yieldableState =
+              LayerCacheUtils.yieldPoint(
+                  parentLayerPointReaderDataType, parentLayerPointReader, tvList);
+          if (yieldableState == YieldableState.YIELDABLE) {
+            if (tvList.getTime(tvList.size() - 2) >= displayWindowBegin
+                && TransformUtils.splitWindowForStateWindow(
+                    parentLayerPointReaderDataType, valueRecorder, delta, tvList)) {
+              nextIndexEnd = tvList.size() - 1;
+              break;
+            } else {
+              nextIndexEnd++;
+            }
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+            return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+            nextIndexEnd = tvList.size();
+            break;
+          }
+        }
+
+        nextWindowTimeEnd = tvList.getTime(nextIndexEnd - 1);
+
+        if (nextIndexBegin == nextIndexEnd) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        // Only if encounter user set the strategy's displayWindowBegin, which will go into the for
+        // loop to find the true index of the first window begin.
+        // For other situation, we will only go into if (nextWindowTimeBegin <= tvList.getTime(i))
+        // once.
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeBegin <= tvList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          // The first window's beginning time is greater than all the timestamp of the query result
+          // set
+          if (i == tvList.size() - 1) {
+            return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+          }
+        }
+
+        window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, nextWindowTimeEnd);
+
+        return YieldableState.YIELDABLE;
+      }
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        return false;
+      }
+
+      @Override
+      public void readyForNext() throws IOException, QueryProcessException {
+        if (nextIndexEnd < tvList.size()) {
+          nextWindowTimeBegin = tvList.getTime(nextIndexEnd);
+        }
+        safetyPile.moveForwardTo(nextIndexBegin + 1);
+        tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
+        nextIndexBegin = nextIndexEnd;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {parentLayerPointReaderDataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
index 6f46cd0f01..35090627f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -28,13 +28,16 @@ import org.apache.iotdb.db.mpp.transformation.api.YieldableState;
 import org.apache.iotdb.db.mpp.transformation.dag.adapter.ElasticSerializableTVListBackedSingleColumnWindow;
 import org.apache.iotdb.db.mpp.transformation.dag.adapter.LayerPointReaderBackedSingleColumnRow;
 import org.apache.iotdb.db.mpp.transformation.dag.util.LayerCacheUtils;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
 import org.apache.iotdb.db.mpp.transformation.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.db.mpp.transformation.datastructure.util.ValueRecorder;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.udf.api.access.Row;
 import org.apache.iotdb.udf.api.access.RowWindow;
 import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
 import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -530,4 +533,122 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
       }
     };
   }
+
+  @Override
+  protected LayerRowWindowReader constructRowStateWindowReader(
+      StateWindowAccessStrategy strategy, float memoryBudgetInMB) {
+
+    final long displayWindowBegin = strategy.getDisplayWindowBegin();
+    final long displayWindowEnd = strategy.getDisplayWindowEnd();
+    final double delta = strategy.getDelta();
+
+    final ElasticSerializableTVList tvList =
+        ElasticSerializableTVList.newElasticSerializableTVList(
+            dataType, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+    final ElasticSerializableTVListBackedSingleColumnWindow window =
+        new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+    return new LayerRowWindowReader() {
+
+      private boolean isFirstIteration = true;
+      private boolean hasAtLeastOneRow = false;
+
+      private long nextWindowTimeBegin = displayWindowBegin;
+      private long nextWindowTimeEnd = 0;
+      private int nextIndexBegin = 0;
+      private int nextIndexEnd = 1;
+
+      private ValueRecorder valueRecorder = new ValueRecorder();
+
+      @Override
+      public YieldableState yield() throws IOException, QueryProcessException {
+        if (isFirstIteration) {
+          if (tvList.size() == 0) {
+            final YieldableState yieldableState =
+                LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader, tvList);
+            if (yieldableState != YieldableState.YIELDABLE) {
+              return yieldableState;
+            }
+          }
+          nextWindowTimeBegin = Math.max(displayWindowBegin, tvList.getTime(0));
+          hasAtLeastOneRow = tvList.size() != 0;
+          isFirstIteration = false;
+        }
+
+        if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        while (tvList.getTime(tvList.size() - 1) < displayWindowEnd) {
+          final YieldableState yieldableState =
+              LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader, tvList);
+          if (yieldableState == YieldableState.YIELDABLE) {
+            if (tvList.getTime(tvList.size() - 2) >= displayWindowBegin
+                && TransformUtils.splitWindowForStateWindow(
+                    dataType, valueRecorder, delta, tvList)) {
+              nextIndexEnd = tvList.size() - 1;
+              break;
+            } else {
+              nextIndexEnd++;
+            }
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+            return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+          } else if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+            nextIndexEnd = tvList.size();
+            break;
+          }
+        }
+
+        nextWindowTimeEnd = tvList.getTime(nextIndexEnd - 1);
+
+        if (nextIndexBegin == nextIndexEnd) {
+          return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+        }
+
+        // Only if encounter user set the strategy's displayWindowBegin, which will go into the for
+        // loop to find the true index of the first window begin.
+        // For other situation, we will only go into if (nextWindowTimeBegin <= tvList.getTime(i))
+        // once.
+        for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+          if (nextWindowTimeBegin <= tvList.getTime(i)) {
+            nextIndexBegin = i;
+            break;
+          }
+          // The first window's beginning time is greater than all the timestamp of the query result
+          // set
+          if (i == tvList.size() - 1) {
+            return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+          }
+        }
+
+        window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, nextWindowTimeEnd);
+
+        return YieldableState.YIELDABLE;
+      }
+
+      @Override
+      public boolean next() throws IOException, QueryProcessException {
+        return false;
+      }
+
+      @Override
+      public void readyForNext() throws IOException {
+        if (nextIndexEnd < tvList.size()) {
+          nextWindowTimeBegin = tvList.getTime(nextIndexEnd);
+        }
+        tvList.setEvictionUpperBound(nextIndexBegin + 1);
+        nextIndexBegin = nextIndexEnd;
+      }
+
+      @Override
+      public TSDataType[] getDataTypes() {
+        return new TSDataType[] {dataType};
+      }
+
+      @Override
+      public RowWindow currentWindow() {
+        return window;
+      }
+    };
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/TransformUtils.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/TransformUtils.java
index fd426281ff..ed95095cda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/TransformUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/TransformUtils.java
@@ -21,8 +21,11 @@ package org.apache.iotdb.db.mpp.transformation.dag.util;
 
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
+import org.apache.iotdb.db.mpp.transformation.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.db.mpp.transformation.datastructure.util.ValueRecorder;
 import org.apache.iotdb.db.utils.CommonUtils;
 import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
 import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -34,6 +37,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
 
 import org.apache.commons.lang3.Validate;
 
+import java.io.IOException;
 import java.util.Objects;
 import java.util.Optional;
 
@@ -96,4 +100,90 @@ public class TransformUtils {
       throw new UnsupportedOperationException(e);
     }
   }
+
+  public static boolean splitWindowForStateWindow(
+      TSDataType dataType,
+      ValueRecorder valueRecorder,
+      double delta,
+      ElasticSerializableTVList tvList)
+      throws IOException {
+    boolean res;
+    switch (dataType) {
+      case INT32:
+        {
+          if (!valueRecorder.hasRecorded()) {
+            valueRecorder.recordInt(tvList.getInt(tvList.size() - 2));
+            valueRecorder.setRecorded(true);
+          }
+          res = Math.abs(tvList.getInt(tvList.size() - 1) - valueRecorder.getInt()) >= delta;
+          if (res) {
+            valueRecorder.recordInt(tvList.getInt(tvList.size() - 1));
+          }
+          break;
+        }
+      case INT64:
+        {
+          if (!valueRecorder.hasRecorded()) {
+            valueRecorder.recordLong(tvList.getLong(tvList.size() - 2));
+            valueRecorder.setRecorded(true);
+          }
+          res = Math.abs(tvList.getLong(tvList.size() - 1) - valueRecorder.getLong()) >= delta;
+          if (res) {
+            valueRecorder.recordLong(tvList.getLong(tvList.size() - 1));
+          }
+          break;
+        }
+      case FLOAT:
+        {
+          if (!valueRecorder.hasRecorded()) {
+            valueRecorder.recordFloat(tvList.getFloat(tvList.size() - 2));
+            valueRecorder.setRecorded(true);
+          }
+          res = Math.abs(tvList.getFloat(tvList.size() - 1) - valueRecorder.getFloat()) >= delta;
+          if (res) {
+            valueRecorder.recordFloat(tvList.getFloat(tvList.size() - 1));
+          }
+          break;
+        }
+      case DOUBLE:
+        {
+          if (!valueRecorder.hasRecorded()) {
+            valueRecorder.recordDouble(tvList.getDouble(tvList.size() - 2));
+            valueRecorder.setRecorded(true);
+          }
+          res = Math.abs(tvList.getDouble(tvList.size() - 1) - valueRecorder.getDouble()) >= delta;
+          if (res) {
+            valueRecorder.recordDouble(tvList.getDouble(tvList.size() - 1));
+          }
+          break;
+        }
+      case BOOLEAN:
+        {
+          if (!valueRecorder.hasRecorded()) {
+            valueRecorder.recordBoolean(tvList.getBoolean(tvList.size() - 2));
+            valueRecorder.setRecorded(true);
+          }
+          res = tvList.getBoolean(tvList.size() - 1) != valueRecorder.getBoolean();
+          if (res) {
+            valueRecorder.recordBoolean(tvList.getBoolean(tvList.size() - 1));
+          }
+          break;
+        }
+      case TEXT:
+        {
+          if (!valueRecorder.hasRecorded()) {
+            valueRecorder.recordString(tvList.getString(tvList.size() - 2));
+            valueRecorder.setRecorded(true);
+          }
+          res = !tvList.getString(tvList.size() - 1).equals(valueRecorder.getString());
+          if (res) {
+            valueRecorder.recordString(tvList.getString(tvList.size() - 1));
+          }
+          break;
+        }
+      default:
+        throw new RuntimeException("The data type of the state window strategy is not valid.");
+    }
+    return res;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/util/ValueRecorder.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/util/ValueRecorder.java
new file mode 100644
index 0000000000..91e0ab2ac1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/util/ValueRecorder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.datastructure.util;
+
+public class ValueRecorder {
+
+  boolean recorded = false;
+
+  int windowFirstValueInt = 0;
+  long windowFirstValueLong = 0;
+  float windowFirstValueFloat = 0;
+  double windowFirstValueDouble = 0;
+  boolean windowFirstValueBoolean = false;
+  String windowFirstValueString = "";
+
+  public boolean hasRecorded() {
+    return recorded;
+  }
+
+  public void setRecorded(boolean recorded) {
+    this.recorded = recorded;
+  }
+
+  public void recordInt(int value) {
+    windowFirstValueInt = value;
+  }
+
+  public void recordLong(long value) {
+    windowFirstValueLong = value;
+  }
+
+  public void recordFloat(float value) {
+    windowFirstValueFloat = value;
+  }
+
+  public void recordDouble(double value) {
+    windowFirstValueDouble = value;
+  }
+
+  public void recordBoolean(boolean value) {
+    windowFirstValueBoolean = value;
+  }
+
+  public void recordString(String value) {
+    windowFirstValueString = value;
+  }
+
+  public int getInt() {
+    return windowFirstValueInt;
+  }
+
+  public long getLong() {
+    return windowFirstValueLong;
+  }
+
+  public float getFloat() {
+    return windowFirstValueFloat;
+  }
+
+  public double getDouble() {
+    return windowFirstValueDouble;
+  }
+
+  public boolean getBoolean() {
+    return windowFirstValueBoolean;
+  }
+
+  public String getString() {
+    return windowFirstValueString;
+  }
+}
diff --git a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
index d543559a5f..a6ff234536 100644
--- a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
+++ b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
@@ -44,7 +44,10 @@ public interface AccessStrategy {
     SLIDING_SIZE_WINDOW,
 
     /** @see SessionTimeWindowAccessStrategy */
-    SESSION_TIME_WINDOW
+    SESSION_TIME_WINDOW,
+
+    /** @see StateWindowAccessStrategy */
+    STATE_WINDOW
   }
 
   /**
diff --git a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/StateWindowAccessStrategy.java b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/StateWindowAccessStrategy.java
new file mode 100644
index 0000000000..73c89143ef
--- /dev/null
+++ b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/StateWindowAccessStrategy.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf.api.customizer.strategy;
+
+import java.time.ZoneId;
+
+public class StateWindowAccessStrategy implements AccessStrategy {
+
+  private final long displayWindowBegin;
+  private final long displayWindowEnd;
+  private final double delta;
+
+  private ZoneId zoneId;
+
+  /**
+   * @param displayWindowBegin displayWindowBegin < displayWindowEnd
+   * @param displayWindowEnd displayWindowBegin < displayWindowEnd
+   * @param delta delta >= 0
+   */
+  public StateWindowAccessStrategy(long displayWindowBegin, long displayWindowEnd, double delta) {
+    this.displayWindowBegin = displayWindowBegin;
+    this.displayWindowEnd = displayWindowEnd;
+    this.delta = delta;
+  }
+
+  /**
+   * @param displayWindowBegin displayWindowBegin < displayWindowEnd
+   * @param displayWindowEnd displayWindowBegin < displayWindowEnd
+   */
+  public StateWindowAccessStrategy(long displayWindowBegin, long displayWindowEnd) {
+    this.displayWindowBegin = displayWindowBegin;
+    this.displayWindowEnd = displayWindowEnd;
+    this.delta = 0.0;
+  }
+
+  /**
+   * Display window begin will be set to the same as the minimum timestamp of the query result set,
+   * and display window end will be set to the same as the maximum timestamp of the query result
+   * set.
+   *
+   * @param delta 0 < delta
+   */
+  public StateWindowAccessStrategy(double delta) {
+    this.displayWindowBegin = Long.MIN_VALUE;
+    this.displayWindowEnd = Long.MAX_VALUE;
+    this.delta = delta;
+  }
+
+  /**
+   * Display window begin will be set to the same as the minimum timestamp of the query result set,
+   * and display window end will be set to the same as the maximum timestamp of the query result
+   * set. delta default equals 0.
+   */
+  public StateWindowAccessStrategy() {
+    this.displayWindowBegin = Long.MIN_VALUE;
+    this.displayWindowEnd = Long.MAX_VALUE;
+    this.delta = 0.0;
+  }
+
+  @Override
+  public void check() {
+    if (delta < 0) {
+      throw new RuntimeException(
+          String.format("Parameter delta(%f) should be positive or equal to 0.", delta));
+    }
+    if (displayWindowEnd < displayWindowBegin) {
+      throw new RuntimeException(
+          String.format(
+              "displayWindowEnd(%d) < displayWindowBegin(%d)",
+              displayWindowEnd, displayWindowBegin));
+    }
+  }
+
+  @Override
+  public AccessStrategyType getAccessStrategyType() {
+    return AccessStrategyType.STATE_WINDOW;
+  }
+
+  public long getDisplayWindowBegin() {
+    return displayWindowBegin;
+  }
+
+  public long getDisplayWindowEnd() {
+    return displayWindowEnd;
+  }
+
+  public double getDelta() {
+    return delta;
+  }
+
+  public ZoneId getZoneId() {
+    return zoneId;
+  }
+
+  public void setZoneId(ZoneId zoneId) {
+    this.zoneId = zoneId;
+  }
+}