You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/08/28 11:53:40 UTC
[iotdb] branch master updated: [IOTDB-4085] Add StateWindowAccessStrategy in UDF (#7005)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 136bfd05e8 [IOTDB-4085] Add StateWindowAccessStrategy in UDF (#7005)
136bfd05e8 is described below
commit 136bfd05e8332c9b1f86338a8bea993a82aa09dc
Author: AACEPT <34...@users.noreply.github.com>
AuthorDate: Sun Aug 28 19:53:34 2022 +0800
[IOTDB-4085] Add StateWindowAccessStrategy in UDF (#7005)
---
.../db/query/udf/example/ExampleUDFConstant.java | 2 +
.../iotdb/db/query/udf/example/WindowStartEnd.java | 23 ++
.../iotdb/itbase/constant/UDFTestConstant.java | 2 +
...ueryIT.java => IoTDBUDFOtherWindowQueryIT.java} | 238 ++++++++++++++++++++-
.../visitor/IntermediateLayerVisitor.java | 1 +
.../intermediate/ConstantIntermediateLayer.java | 8 +
.../dag/intermediate/IntermediateLayer.java | 8 +
.../MultiInputColumnIntermediateLayer.java | 7 +
...InputColumnMultiReferenceIntermediateLayer.java | 121 +++++++++++
...nputColumnSingleReferenceIntermediateLayer.java | 121 +++++++++++
.../transformation/dag/util/TransformUtils.java | 90 ++++++++
.../datastructure/util/ValueRecorder.java | 88 ++++++++
.../api/customizer/strategy/AccessStrategy.java | 5 +-
.../strategy/StateWindowAccessStrategy.java | 115 ++++++++++
14 files changed, 827 insertions(+), 2 deletions(-)
diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
index 3dfb8303a0..98b0e4aa19 100644
--- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
+++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/ExampleUDFConstant.java
@@ -24,10 +24,12 @@ public class ExampleUDFConstant {
public static final String ACCESS_STRATEGY_SLIDING_SIZE = "size";
public static final String ACCESS_STRATEGY_SLIDING_TIME = "time";
public static final String ACCESS_STRATEGY_SESSION = "session";
+ public static final String ACCESS_STRATEGY_STATE = "state";
public static final String WINDOW_SIZE_KEY = "windowSize";
public static final String TIME_INTERVAL_KEY = "timeInterval";
public static final String SLIDING_STEP_KEY = "slidingStep";
public static final String SESSION_GAP_KEY = "sessionGap";
+ public static final String STATE_DELTA_KEY = "delta";
public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
index bd3a369365..7045aca687 100644
--- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
+++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/WindowStartEnd.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.udf.api.customizer.parameter.UDFParameters;
import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
import org.apache.iotdb.udf.api.type.Type;
import java.io.IOException;
@@ -70,6 +71,28 @@ public class WindowStartEnd implements UDTF {
parameters.getLong(ExampleUDFConstant.SESSION_GAP_KEY))
: new SessionTimeWindowAccessStrategy(
parameters.getLong(ExampleUDFConstant.SESSION_GAP_KEY)));
+ } else if (ExampleUDFConstant.ACCESS_STRATEGY_STATE.equals(
+ parameters.getString(ExampleUDFConstant.ACCESS_STRATEGY_KEY))) {
+ if (parameters.hasAttribute(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY)
+ && parameters.hasAttribute(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY)) {
+ if (parameters.hasAttribute(ExampleUDFConstant.STATE_DELTA_KEY)) {
+ configurations.setAccessStrategy(
+ new StateWindowAccessStrategy(
+ parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY),
+ parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY),
+ parameters.getLong(ExampleUDFConstant.STATE_DELTA_KEY)));
+ } else {
+ configurations.setAccessStrategy(
+ new StateWindowAccessStrategy(
+ parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_BEGIN_KEY),
+ parameters.getLong(ExampleUDFConstant.DISPLAY_WINDOW_END_KEY)));
+ }
+ } else if (parameters.hasAttribute(ExampleUDFConstant.STATE_DELTA_KEY)) {
+ configurations.setAccessStrategy(
+ new StateWindowAccessStrategy(parameters.getLong(ExampleUDFConstant.STATE_DELTA_KEY)));
+ } else {
+ configurations.setAccessStrategy(new StateWindowAccessStrategy());
+ }
}
}
diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
index 31dfeeb661..8114f59610 100644
--- a/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
+++ b/integration-test/src/main/java/org/apache/iotdb/itbase/constant/UDFTestConstant.java
@@ -24,10 +24,12 @@ public class UDFTestConstant {
public static final String ACCESS_STRATEGY_SLIDING_SIZE = "size";
public static final String ACCESS_STRATEGY_SLIDING_TIME = "time";
public static final String ACCESS_STRATEGY_SESSION = "session";
+ public static final String ACCESS_STRATEGY_STATE = "state";
public static final String WINDOW_SIZE_KEY = "windowSize";
public static final String TIME_INTERVAL_KEY = "timeInterval";
public static final String SLIDING_STEP_KEY = "slidingStep";
public static final String SESSION_GAP_KEY = "sessionGap";
+ public static final String STATE_DELTA_KEY = "delta";
public static final String DISPLAY_WINDOW_BEGIN_KEY = "displayWindowBegin";
public static final String DISPLAY_WINDOW_END_KEY = "displayWindowEnd";
}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFOtherWindowQueryIT.java
similarity index 53%
rename from integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java
rename to integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFOtherWindowQueryIT.java
index 5698d7d4bc..07744152a6 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFSessionWindowQueryIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFOtherWindowQueryIT.java
@@ -41,9 +41,10 @@ import java.util.ArrayList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+/** Test for SessionWindow, StateWindow and UserDefinedWindow. */
@RunWith(IoTDBTestRunner.class)
@Category({ClusterIT.class})
-public class IoTDBUDFSessionWindowQueryIT {
+public class IoTDBUDFOtherWindowQueryIT {
protected static final int ITERATION_TIMES = 1000;
@@ -86,12 +87,17 @@ public class IoTDBUDFSessionWindowQueryIT {
Statement statement = connection.createStatement()) {
statement.execute("SET STORAGE GROUP TO root.vehicle");
statement.execute("CREATE TIMESERIES root.vehicle.d1.s3 with datatype=INT32,encoding=PLAIN");
+ statement.execute("CREATE TIMESERIES root.vehicle.d1.s4 with datatype=INT32,encoding=PLAIN");
+ statement.execute(
+ "CREATE TIMESERIES root.vehicle.d1.s5 with datatype=BOOLEAN,encoding=PLAIN");
+ statement.execute("CREATE TIMESERIES root.vehicle.d1.s6 with datatype=TEXT,encoding=PLAIN");
} catch (SQLException throwable) {
fail(throwable.getMessage());
}
}
private static void generateData() {
+ // SessionWindow
try (Connection connection = EnvFactory.getEnv().getConnection();
Statement statement = connection.createStatement()) {
for (int i = 0; i < ITERATION_TIMES; ++i) {
@@ -102,6 +108,51 @@ public class IoTDBUDFSessionWindowQueryIT {
statement.execute(
(String.format("insert into root.vehicle.d1(timestamp,s3) values(%d,%d)", i, i)));
}
+
+ // StateWindow INT32
+ for (int i = 0; i < ITERATION_TIMES; ++i) {
+ if (i == 1 || i == 2 || i == 3 || i == 15 || i == 17 || i == 53 || i == 54 || i == 996
+ || i == 997 || i == 998) {
+ statement.execute(
+ (String.format(
+ "insert into root.vehicle.d1(timestamp,s4) values(%d,%d)", i, i + 100)));
+ } else if (i == 500 || i == 501) {
+ statement.execute(
+ (String.format("insert into root.vehicle.d1(timestamp,s4) values(%d,%d)", i, 12)));
+ } else if (i % 2 == 0) {
+ statement.execute(
+ (String.format("insert into root.vehicle.d1(timestamp,s4) values(%d,%d)", i, 0)));
+ } else {
+ statement.execute(
+ (String.format("insert into root.vehicle.d1(timestamp,s4) values(%d,%d)", i, 3)));
+ }
+ }
+
+ // StateWindow BOOLEAN
+ for (int i = 0; i < ITERATION_TIMES; ++i) {
+ if (i == 1 || i == 2 || i == 3 || i == 15 || i == 17 || i == 53 || i == 54 || i == 996
+ || i == 997 || i == 998) {
+ statement.execute(
+ (String.format("insert into root.vehicle.d1(timestamp,s5) values(%d, true)", i)));
+ } else {
+ statement.execute(
+ (String.format("insert into root.vehicle.d1(timestamp,s5) values(%d, false)", i)));
+ }
+ }
+
+ // StateWindow TEXT
+ for (int i = 0; i < ITERATION_TIMES; ++i) {
+ if (i < 500) {
+ statement.execute(
+ (String.format("insert into root.vehicle.d1(timestamp,s6) values(%d, '<500')", i)));
+ } else if (i < 993) {
+ statement.execute(
+ (String.format("insert into root.vehicle.d1(timestamp,s6) values(%d, '<993')", i)));
+ } else {
+ statement.execute(
+ (String.format("insert into root.vehicle.d1(timestamp,s6) values(%d, '>=994')", i)));
+ }
+ }
} catch (SQLException throwable) {
fail(throwable.getMessage());
}
@@ -280,4 +331,189 @@ public class IoTDBUDFSessionWindowQueryIT {
Long displayEnd = -100L;
testSessionTimeWindowSSOutOfRange(sessionGap, displayBegin, displayEnd);
}
+
+ private void testStateWindowSS(
+ String measurement,
+ String delta,
+ long[] windowStart,
+ long[] windowEnd,
+ Long displayBegin,
+ Long displayEnd) {
+ String sql;
+ if (displayBegin == null) {
+ if (delta == null) {
+ sql =
+ String.format(
+ "select window_start_end(%s, '%s'='%s') from root.vehicle.d1",
+ measurement,
+ UDFTestConstant.ACCESS_STRATEGY_KEY,
+ UDFTestConstant.ACCESS_STRATEGY_STATE);
+ } else {
+ sql =
+ String.format(
+ "select window_start_end(%s, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+ measurement,
+ UDFTestConstant.ACCESS_STRATEGY_KEY,
+ UDFTestConstant.ACCESS_STRATEGY_STATE,
+ UDFTestConstant.STATE_DELTA_KEY,
+ delta);
+ }
+ } else {
+ sql =
+ String.format(
+ "select window_start_end(%s, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+ measurement,
+ UDFTestConstant.ACCESS_STRATEGY_KEY,
+ UDFTestConstant.ACCESS_STRATEGY_STATE,
+ UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+ displayBegin,
+ UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+ displayEnd,
+ UDFTestConstant.STATE_DELTA_KEY,
+ delta);
+ }
+
+ try (Connection conn = EnvFactory.getEnv().getConnection();
+ Statement statement = conn.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+ assertEquals(2, resultSet.getMetaData().getColumnCount());
+ int cnt = 0;
+ while (resultSet.next()) {
+ Assert.assertEquals(resultSet.getLong(1), windowStart[cnt]);
+ Assert.assertEquals(resultSet.getLong(2), windowEnd[cnt]);
+ cnt++;
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testStateWindowSS1() {
+ String delta = "10";
+ Long displayBegin = 10L;
+ Long displayEnd = 976L;
+ long[] windowStart = new long[] {10, 15, 16, 17, 18, 53, 55};
+ long[] windowEnd = new long[] {14, 15, 16, 17, 52, 54, 975};
+ testStateWindowSS("s4", delta, windowStart, windowEnd, displayBegin, displayEnd);
+ }
+
+ @Test
+ public void testStateWindowSS2() {
+ String delta = "9";
+ Long displayBegin = 0L;
+ Long displayEnd = 100000L;
+ long[] windowStart = new long[] {0, 1, 4, 15, 16, 17, 18, 53, 55, 500, 502, 996, 999};
+ long[] windowEnd = new long[] {0, 3, 14, 15, 16, 17, 52, 54, 499, 501, 995, 998, 999};
+ testStateWindowSS("s4", delta, windowStart, windowEnd, displayBegin, displayEnd);
+ }
+
+ @Test
+ public void testStateWindowSS3() {
+ String delta = "102";
+ Long displayBegin = -100L;
+ Long displayEnd = 100000L;
+ long[] windowStart = new long[] {0, 2, 4, 15, 16, 17, 18, 53, 55, 996, 999};
+ long[] windowEnd = new long[] {1, 3, 14, 15, 16, 17, 52, 54, 995, 998, 999};
+ testStateWindowSS("s4", delta, windowStart, windowEnd, displayBegin, displayEnd);
+ }
+
+ @Test
+ public void testStateWindowSS4() {
+ String delta = "102";
+ long[] windowStart = new long[] {0, 2, 4, 15, 16, 17, 18, 53, 55, 996, 999};
+ long[] windowEnd = new long[] {1, 3, 14, 15, 16, 17, 52, 54, 995, 998, 999};
+ testStateWindowSS("s4", delta, windowStart, windowEnd, null, null);
+ }
+
+ @Test
+ public void testStateWindowSS5() {
+ String delta = "2";
+ Long displayBegin = -100L;
+ Long displayEnd = 1L;
+ long[] windowStart = new long[] {0};
+ long[] windowEnd = new long[] {0};
+ testStateWindowSS("s4", delta, windowStart, windowEnd, displayBegin, displayEnd);
+ }
+
+ private void testStateWindowSSOutOfRange(
+ String measurement, String delta, Long displayBegin, Long displayEnd) {
+ String sql;
+ if (displayBegin == null) {
+ if (delta == null) {
+ sql =
+ String.format(
+ "select window_start_end(%s, '%s'='%s') from root.vehicle.d1",
+ measurement,
+ UDFTestConstant.ACCESS_STRATEGY_KEY,
+ UDFTestConstant.ACCESS_STRATEGY_STATE);
+ } else {
+ sql =
+ String.format(
+ "select window_start_end(%s, '%s'='%s', '%s'='%s') from root.vehicle.d1",
+ measurement,
+ UDFTestConstant.ACCESS_STRATEGY_KEY,
+ UDFTestConstant.ACCESS_STRATEGY_STATE,
+ UDFTestConstant.STATE_DELTA_KEY,
+ delta);
+ }
+ } else {
+ sql =
+ String.format(
+ "select window_start_end(%s, '%s'='%s', '%s'='%s', '%s'='%s', '%s'='%s') from root.vehicle.d1",
+ measurement,
+ UDFTestConstant.ACCESS_STRATEGY_KEY,
+ UDFTestConstant.ACCESS_STRATEGY_STATE,
+ UDFTestConstant.DISPLAY_WINDOW_BEGIN_KEY,
+ displayBegin,
+ UDFTestConstant.DISPLAY_WINDOW_END_KEY,
+ displayEnd,
+ UDFTestConstant.STATE_DELTA_KEY,
+ delta);
+ }
+
+ try (Connection conn = EnvFactory.getEnv().getConnection();
+ Statement statement = conn.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql)) {
+ int count = 0;
+ while (resultSet.next()) {
+ count++;
+ }
+ assertEquals(count, 0);
+ } catch (SQLException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testStateWindowSS6() {
+ String delta = "2";
+ Long displayBegin = -100L;
+ Long displayEnd = -1L;
+ testStateWindowSSOutOfRange("s4", delta, displayBegin, displayEnd);
+ }
+
+ @Test
+ public void testStateWindowSS7() {
+ String delta = "2";
+ Long displayBegin = 10000L;
+ Long displayEnd = 100005L;
+ testStateWindowSSOutOfRange("s4", delta, displayBegin, displayEnd);
+ }
+
+ @Test
+ public void testStateWindowSS8() {
+ long[] windowStart = new long[] {0, 1, 4, 15, 16, 17, 18, 53, 55, 996, 999};
+ long[] windowEnd = new long[] {0, 3, 14, 15, 16, 17, 52, 54, 995, 998, 999};
+ testStateWindowSS("s5", null, windowStart, windowEnd, null, null);
+ }
+
+ @Test
+ public void testStateWindowSS9() {
+ long[] windowStart = new long[] {0, 500, 993};
+ long[] windowEnd = new long[] {499, 992, 999};
+ testStateWindowSS("s6", null, windowStart, windowEnd, null, null);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
index f7d9a4e6c5..e68c5d7aa8 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/expression/visitor/IntermediateLayerVisitor.java
@@ -397,6 +397,7 @@ public class IntermediateLayerVisitor
case SLIDING_SIZE_WINDOW:
case SLIDING_TIME_WINDOW:
case SESSION_TIME_WINDOW:
+ case STATE_WINDOW:
return new UDFQueryRowWindowTransformer(
udfInputIntermediateLayer.constructRowWindowReader(
accessStrategy, context.memoryAssigner.assign()),
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
index 440c2e6fbd..dc9865a5ed 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/ConstantIntermediateLayer.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.db.mpp.transformation.dag.input.ConstantInputReader;
import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
/** IntermediateLayer for constants. */
public class ConstantIntermediateLayer extends IntermediateLayer {
@@ -71,4 +72,11 @@ public class ConstantIntermediateLayer extends IntermediateLayer {
// Not allowed since the timestamp of a constant row is not defined.
throw new UnsupportedOperationException();
}
+
+ @Override
+ protected LayerRowWindowReader constructRowStateWindowReader(
+ StateWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ // Not allowed since the timestamp of a constant row is not defined.
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
index 0a8cd2e6b4..94608a54ba 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/IntermediateLayer.java
@@ -28,6 +28,7 @@ import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
import java.io.IOException;
@@ -63,6 +64,9 @@ public abstract class IntermediateLayer {
case SESSION_TIME_WINDOW:
return constructRowSessionTimeWindowReader(
(SessionTimeWindowAccessStrategy) strategy, memoryBudgetInMB);
+ case STATE_WINDOW:
+ return constructRowStateWindowReader(
+ (StateWindowAccessStrategy) strategy, memoryBudgetInMB);
default:
throw new IllegalStateException(
"Unexpected access strategy: " + strategy.getAccessStrategyType());
@@ -81,6 +85,10 @@ public abstract class IntermediateLayer {
SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
throws QueryProcessException, IOException;
+ protected abstract LayerRowWindowReader constructRowStateWindowReader(
+ StateWindowAccessStrategy strategy, float memoryBudgetInMB)
+ throws QueryProcessException, IOException;
+
@Override
public String toString() {
return expression.toString();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
index 4edf946b97..588ba5ae3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/MultiInputColumnIntermediateLayer.java
@@ -39,6 +39,7 @@ import org.apache.iotdb.udf.api.access.RowWindow;
import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -772,4 +773,10 @@ public class MultiInputColumnIntermediateLayer extends IntermediateLayer
}
};
}
+
+ @Override
+ protected LayerRowWindowReader constructRowStateWindowReader(
+ StateWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ throw new UnsupportedOperationException();
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
index 9171fcc6ed..a61d75083e 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnMultiReferenceIntermediateLayer.java
@@ -30,7 +30,9 @@ import org.apache.iotdb.db.mpp.transformation.dag.adapter.ElasticSerializableTVL
import org.apache.iotdb.db.mpp.transformation.dag.memory.SafetyLine;
import org.apache.iotdb.db.mpp.transformation.dag.memory.SafetyLine.SafetyPile;
import org.apache.iotdb.db.mpp.transformation.dag.util.LayerCacheUtils;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
import org.apache.iotdb.db.mpp.transformation.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.db.mpp.transformation.datastructure.util.ValueRecorder;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.iotdb.udf.api.access.Row;
@@ -38,6 +40,7 @@ import org.apache.iotdb.udf.api.access.RowWindow;
import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -659,4 +662,122 @@ public class SingleInputColumnMultiReferenceIntermediateLayer extends Intermedia
}
};
}
+
+ @Override
+ protected LayerRowWindowReader constructRowStateWindowReader(
+ StateWindowAccessStrategy strategy, float memoryBudgetInMB) {
+ final long displayWindowBegin = strategy.getDisplayWindowBegin();
+ final long displayWindowEnd = strategy.getDisplayWindowEnd();
+ final double delta = strategy.getDelta();
+
+ final SafetyPile safetyPile = safetyLine.addSafetyPile();
+ final ElasticSerializableTVListBackedSingleColumnWindow window =
+ new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+ return new LayerRowWindowReader() {
+
+ private boolean isFirstIteration = true;
+ private boolean hasAtLeastOneRow = false;
+
+ private long nextWindowTimeBegin = displayWindowBegin;
+ private long nextWindowTimeEnd = 0;
+ private int nextIndexBegin = 0;
+ private int nextIndexEnd = 1;
+
+ private ValueRecorder valueRecorder = new ValueRecorder();
+
+ @Override
+ public YieldableState yield() throws IOException, QueryProcessException {
+ if (isFirstIteration) {
+ if (tvList.size() == 0) {
+ final YieldableState yieldableState =
+ LayerCacheUtils.yieldPoint(
+ parentLayerPointReaderDataType, parentLayerPointReader, tvList);
+ if (yieldableState != YieldableState.YIELDABLE) {
+ return yieldableState;
+ }
+ }
+ nextWindowTimeBegin = Math.max(displayWindowBegin, tvList.getTime(0));
+ hasAtLeastOneRow = tvList.size() != 0;
+ isFirstIteration = false;
+ }
+
+ if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+
+ while (tvList.getTime(tvList.size() - 1) < displayWindowEnd) {
+ final YieldableState yieldableState =
+ LayerCacheUtils.yieldPoint(
+ parentLayerPointReaderDataType, parentLayerPointReader, tvList);
+ if (yieldableState == YieldableState.YIELDABLE) {
+ if (tvList.getTime(tvList.size() - 2) >= displayWindowBegin
+ && TransformUtils.splitWindowForStateWindow(
+ parentLayerPointReaderDataType, valueRecorder, delta, tvList)) {
+ nextIndexEnd = tvList.size() - 1;
+ break;
+ } else {
+ nextIndexEnd++;
+ }
+ } else if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+ return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+ } else if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+ nextIndexEnd = tvList.size();
+ break;
+ }
+ }
+
+ nextWindowTimeEnd = tvList.getTime(nextIndexEnd - 1);
+
+ if (nextIndexBegin == nextIndexEnd) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+
+ // Only if encounter user set the strategy's displayWindowBegin, which will go into the for
+ // loop to find the true index of the first window begin.
+ // For other situation, we will only go into if (nextWindowTimeBegin <= tvList.getTime(i))
+ // once.
+ for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+ if (nextWindowTimeBegin <= tvList.getTime(i)) {
+ nextIndexBegin = i;
+ break;
+ }
+ // The first window's beginning time is greater than all the timestamp of the query result
+ // set
+ if (i == tvList.size() - 1) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+ }
+
+ window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, nextWindowTimeEnd);
+
+ return YieldableState.YIELDABLE;
+ }
+
+ @Override
+ public boolean next() throws IOException, QueryProcessException {
+ return false;
+ }
+
+ @Override
+ public void readyForNext() throws IOException, QueryProcessException {
+ if (nextIndexEnd < tvList.size()) {
+ nextWindowTimeBegin = tvList.getTime(nextIndexEnd);
+ }
+ safetyPile.moveForwardTo(nextIndexBegin + 1);
+ tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
+ nextIndexBegin = nextIndexEnd;
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return new TSDataType[] {parentLayerPointReaderDataType};
+ }
+
+ @Override
+ public RowWindow currentWindow() {
+ return window;
+ }
+ };
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
index 6f46cd0f01..35090627f0 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/intermediate/SingleInputColumnSingleReferenceIntermediateLayer.java
@@ -28,13 +28,16 @@ import org.apache.iotdb.db.mpp.transformation.api.YieldableState;
import org.apache.iotdb.db.mpp.transformation.dag.adapter.ElasticSerializableTVListBackedSingleColumnWindow;
import org.apache.iotdb.db.mpp.transformation.dag.adapter.LayerPointReaderBackedSingleColumnRow;
import org.apache.iotdb.db.mpp.transformation.dag.util.LayerCacheUtils;
+import org.apache.iotdb.db.mpp.transformation.dag.util.TransformUtils;
import org.apache.iotdb.db.mpp.transformation.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.db.mpp.transformation.datastructure.util.ValueRecorder;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.udf.api.access.Row;
import org.apache.iotdb.udf.api.access.RowWindow;
import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
+import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -530,4 +533,122 @@ public class SingleInputColumnSingleReferenceIntermediateLayer extends Intermedi
}
};
}
+
+ @Override
+ protected LayerRowWindowReader constructRowStateWindowReader(
+ StateWindowAccessStrategy strategy, float memoryBudgetInMB) {
+
+ final long displayWindowBegin = strategy.getDisplayWindowBegin();
+ final long displayWindowEnd = strategy.getDisplayWindowEnd();
+ final double delta = strategy.getDelta();
+
+ final ElasticSerializableTVList tvList =
+ ElasticSerializableTVList.newElasticSerializableTVList(
+ dataType, queryId, memoryBudgetInMB, CACHE_BLOCK_SIZE);
+ final ElasticSerializableTVListBackedSingleColumnWindow window =
+ new ElasticSerializableTVListBackedSingleColumnWindow(tvList);
+
+ return new LayerRowWindowReader() {
+
+ private boolean isFirstIteration = true;
+ private boolean hasAtLeastOneRow = false;
+
+ private long nextWindowTimeBegin = displayWindowBegin;
+ private long nextWindowTimeEnd = 0;
+ private int nextIndexBegin = 0;
+ private int nextIndexEnd = 1;
+
+ private ValueRecorder valueRecorder = new ValueRecorder();
+
+ @Override
+ public YieldableState yield() throws IOException, QueryProcessException {
+ if (isFirstIteration) {
+ if (tvList.size() == 0) {
+ final YieldableState yieldableState =
+ LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader, tvList);
+ if (yieldableState != YieldableState.YIELDABLE) {
+ return yieldableState;
+ }
+ }
+ nextWindowTimeBegin = Math.max(displayWindowBegin, tvList.getTime(0));
+ hasAtLeastOneRow = tvList.size() != 0;
+ isFirstIteration = false;
+ }
+
+ if (!hasAtLeastOneRow || displayWindowEnd <= nextWindowTimeBegin) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+
+ while (tvList.getTime(tvList.size() - 1) < displayWindowEnd) {
+ final YieldableState yieldableState =
+ LayerCacheUtils.yieldPoint(dataType, parentLayerPointReader, tvList);
+ if (yieldableState == YieldableState.YIELDABLE) {
+ if (tvList.getTime(tvList.size() - 2) >= displayWindowBegin
+ && TransformUtils.splitWindowForStateWindow(
+ dataType, valueRecorder, delta, tvList)) {
+ nextIndexEnd = tvList.size() - 1;
+ break;
+ } else {
+ nextIndexEnd++;
+ }
+ } else if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
+ return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
+ } else if (yieldableState == YieldableState.NOT_YIELDABLE_NO_MORE_DATA) {
+ nextIndexEnd = tvList.size();
+ break;
+ }
+ }
+
+ nextWindowTimeEnd = tvList.getTime(nextIndexEnd - 1);
+
+ if (nextIndexBegin == nextIndexEnd) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+
+ // Only if encounter user set the strategy's displayWindowBegin, which will go into the for
+ // loop to find the true index of the first window begin.
+ // For other situation, we will only go into if (nextWindowTimeBegin <= tvList.getTime(i))
+ // once.
+ for (int i = nextIndexBegin; i < tvList.size(); ++i) {
+ if (nextWindowTimeBegin <= tvList.getTime(i)) {
+ nextIndexBegin = i;
+ break;
+ }
+ // The first window's beginning time is greater than all the timestamp of the query result
+ // set
+ if (i == tvList.size() - 1) {
+ return YieldableState.NOT_YIELDABLE_NO_MORE_DATA;
+ }
+ }
+
+ window.seek(nextIndexBegin, nextIndexEnd, nextWindowTimeBegin, nextWindowTimeEnd);
+
+ return YieldableState.YIELDABLE;
+ }
+
+ @Override
+ public boolean next() throws IOException, QueryProcessException {
+ return false;
+ }
+
+ @Override
+ public void readyForNext() throws IOException {
+ if (nextIndexEnd < tvList.size()) {
+ nextWindowTimeBegin = tvList.getTime(nextIndexEnd);
+ }
+ tvList.setEvictionUpperBound(nextIndexBegin + 1);
+ nextIndexBegin = nextIndexEnd;
+ }
+
+ @Override
+ public TSDataType[] getDataTypes() {
+ return new TSDataType[] {dataType};
+ }
+
+ @Override
+ public RowWindow currentWindow() {
+ return window;
+ }
+ };
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/TransformUtils.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/TransformUtils.java
index fd426281ff..ed95095cda 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/TransformUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/dag/util/TransformUtils.java
@@ -21,8 +21,11 @@ package org.apache.iotdb.db.mpp.transformation.dag.util;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.expression.leaf.ConstantOperand;
+import org.apache.iotdb.db.mpp.transformation.datastructure.tv.ElasticSerializableTVList;
+import org.apache.iotdb.db.mpp.transformation.datastructure.util.ValueRecorder;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
import org.apache.iotdb.tsfile.read.common.block.column.BooleanColumn;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
@@ -34,6 +37,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
import org.apache.commons.lang3.Validate;
+import java.io.IOException;
import java.util.Objects;
import java.util.Optional;
@@ -96,4 +100,90 @@ public class TransformUtils {
throw new UnsupportedOperationException(e);
}
}
+
+ public static boolean splitWindowForStateWindow(
+ TSDataType dataType,
+ ValueRecorder valueRecorder,
+ double delta,
+ ElasticSerializableTVList tvList)
+ throws IOException {
+ boolean res;
+ switch (dataType) {
+ case INT32:
+ {
+ if (!valueRecorder.hasRecorded()) {
+ valueRecorder.recordInt(tvList.getInt(tvList.size() - 2));
+ valueRecorder.setRecorded(true);
+ }
+ res = Math.abs(tvList.getInt(tvList.size() - 1) - valueRecorder.getInt()) >= delta;
+ if (res) {
+ valueRecorder.recordInt(tvList.getInt(tvList.size() - 1));
+ }
+ break;
+ }
+ case INT64:
+ {
+ if (!valueRecorder.hasRecorded()) {
+ valueRecorder.recordLong(tvList.getLong(tvList.size() - 2));
+ valueRecorder.setRecorded(true);
+ }
+ res = Math.abs(tvList.getLong(tvList.size() - 1) - valueRecorder.getLong()) >= delta;
+ if (res) {
+ valueRecorder.recordLong(tvList.getLong(tvList.size() - 1));
+ }
+ break;
+ }
+ case FLOAT:
+ {
+ if (!valueRecorder.hasRecorded()) {
+ valueRecorder.recordFloat(tvList.getFloat(tvList.size() - 2));
+ valueRecorder.setRecorded(true);
+ }
+ res = Math.abs(tvList.getFloat(tvList.size() - 1) - valueRecorder.getFloat()) >= delta;
+ if (res) {
+ valueRecorder.recordFloat(tvList.getFloat(tvList.size() - 1));
+ }
+ break;
+ }
+ case DOUBLE:
+ {
+ if (!valueRecorder.hasRecorded()) {
+ valueRecorder.recordDouble(tvList.getDouble(tvList.size() - 2));
+ valueRecorder.setRecorded(true);
+ }
+ res = Math.abs(tvList.getDouble(tvList.size() - 1) - valueRecorder.getDouble()) >= delta;
+ if (res) {
+ valueRecorder.recordDouble(tvList.getDouble(tvList.size() - 1));
+ }
+ break;
+ }
+ case BOOLEAN:
+ {
+ if (!valueRecorder.hasRecorded()) {
+ valueRecorder.recordBoolean(tvList.getBoolean(tvList.size() - 2));
+ valueRecorder.setRecorded(true);
+ }
+ res = tvList.getBoolean(tvList.size() - 1) != valueRecorder.getBoolean();
+ if (res) {
+ valueRecorder.recordBoolean(tvList.getBoolean(tvList.size() - 1));
+ }
+ break;
+ }
+ case TEXT:
+ {
+ if (!valueRecorder.hasRecorded()) {
+ valueRecorder.recordString(tvList.getString(tvList.size() - 2));
+ valueRecorder.setRecorded(true);
+ }
+ res = !tvList.getString(tvList.size() - 1).equals(valueRecorder.getString());
+ if (res) {
+ valueRecorder.recordString(tvList.getString(tvList.size() - 1));
+ }
+ break;
+ }
+ default:
+ throw new RuntimeException("The data type of the state window strategy is not valid.");
+ }
+ return res;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/util/ValueRecorder.java b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/util/ValueRecorder.java
new file mode 100644
index 0000000000..91e0ab2ac1
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/transformation/datastructure/util/ValueRecorder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.transformation.datastructure.util;
+
+public class ValueRecorder {
+
+ boolean recorded = false;
+
+ int windowFirstValueInt = 0;
+ long windowFirstValueLong = 0;
+ float windowFirstValueFloat = 0;
+ double windowFirstValueDouble = 0;
+ boolean windowFirstValueBoolean = false;
+ String windowFirstValueString = "";
+
+ public boolean hasRecorded() {
+ return recorded;
+ }
+
+ public void setRecorded(boolean recorded) {
+ this.recorded = recorded;
+ }
+
+ public void recordInt(int value) {
+ windowFirstValueInt = value;
+ }
+
+ public void recordLong(long value) {
+ windowFirstValueLong = value;
+ }
+
+ public void recordFloat(float value) {
+ windowFirstValueFloat = value;
+ }
+
+ public void recordDouble(double value) {
+ windowFirstValueDouble = value;
+ }
+
+ public void recordBoolean(boolean value) {
+ windowFirstValueBoolean = value;
+ }
+
+ public void recordString(String value) {
+ windowFirstValueString = value;
+ }
+
+ public int getInt() {
+ return windowFirstValueInt;
+ }
+
+ public long getLong() {
+ return windowFirstValueLong;
+ }
+
+ public float getFloat() {
+ return windowFirstValueFloat;
+ }
+
+ public double getDouble() {
+ return windowFirstValueDouble;
+ }
+
+ public boolean getBoolean() {
+ return windowFirstValueBoolean;
+ }
+
+ public String getString() {
+ return windowFirstValueString;
+ }
+}
diff --git a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
index d543559a5f..a6ff234536 100644
--- a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
+++ b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/AccessStrategy.java
@@ -44,7 +44,10 @@ public interface AccessStrategy {
SLIDING_SIZE_WINDOW,
/** @see SessionTimeWindowAccessStrategy */
- SESSION_TIME_WINDOW
+ SESSION_TIME_WINDOW,
+
+ /** @see StateWindowAccessStrategy */
+ STATE_WINDOW
}
/**
diff --git a/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/StateWindowAccessStrategy.java b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/StateWindowAccessStrategy.java
new file mode 100644
index 0000000000..73c89143ef
--- /dev/null
+++ b/udf-api/src/main/java/org/apache/iotdb/udf/api/customizer/strategy/StateWindowAccessStrategy.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.udf.api.customizer.strategy;
+
+import java.time.ZoneId;
+
+public class StateWindowAccessStrategy implements AccessStrategy {
+
+ private final long displayWindowBegin;
+ private final long displayWindowEnd;
+ private final double delta;
+
+ private ZoneId zoneId;
+
+ /**
+ * @param displayWindowBegin displayWindowBegin < displayWindowEnd
+ * @param displayWindowEnd displayWindowBegin < displayWindowEnd
+ * @param delta delta >= 0
+ */
+ public StateWindowAccessStrategy(long displayWindowBegin, long displayWindowEnd, double delta) {
+ this.displayWindowBegin = displayWindowBegin;
+ this.displayWindowEnd = displayWindowEnd;
+ this.delta = delta;
+ }
+
+ /**
+ * @param displayWindowBegin displayWindowBegin < displayWindowEnd
+ * @param displayWindowEnd displayWindowBegin < displayWindowEnd
+ */
+ public StateWindowAccessStrategy(long displayWindowBegin, long displayWindowEnd) {
+ this.displayWindowBegin = displayWindowBegin;
+ this.displayWindowEnd = displayWindowEnd;
+ this.delta = 0.0;
+ }
+
+ /**
+ * Display window begin will be set to the same as the minimum timestamp of the query result set,
+ * and display window end will be set to the same as the maximum timestamp of the query result
+ * set.
+ *
+ * @param delta 0 < delta
+ */
+ public StateWindowAccessStrategy(double delta) {
+ this.displayWindowBegin = Long.MIN_VALUE;
+ this.displayWindowEnd = Long.MAX_VALUE;
+ this.delta = delta;
+ }
+
+ /**
+ * Display window begin will be set to the same as the minimum timestamp of the query result set,
+ * and display window end will be set to the same as the maximum timestamp of the query result
+ * set. delta default equals 0.
+ */
+ public StateWindowAccessStrategy() {
+ this.displayWindowBegin = Long.MIN_VALUE;
+ this.displayWindowEnd = Long.MAX_VALUE;
+ this.delta = 0.0;
+ }
+
+ @Override
+ public void check() {
+ if (delta < 0) {
+ throw new RuntimeException(
+ String.format("Parameter delta(%f) should be positive or equal to 0.", delta));
+ }
+ if (displayWindowEnd < displayWindowBegin) {
+ throw new RuntimeException(
+ String.format(
+ "displayWindowEnd(%d) < displayWindowBegin(%d)",
+ displayWindowEnd, displayWindowBegin));
+ }
+ }
+
+ @Override
+ public AccessStrategyType getAccessStrategyType() {
+ return AccessStrategyType.STATE_WINDOW;
+ }
+
+ public long getDisplayWindowBegin() {
+ return displayWindowBegin;
+ }
+
+ public long getDisplayWindowEnd() {
+ return displayWindowEnd;
+ }
+
+ public double getDelta() {
+ return delta;
+ }
+
+ public ZoneId getZoneId() {
+ return zoneId;
+ }
+
+ public void setZoneId(ZoneId zoneId) {
+ this.zoneId = zoneId;
+ }
+}