You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/08 06:41:52 UTC

[iotdb] branch master updated: [IOTDB-2307]UDF Library Series Discovery Functions (#4848)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b5e16dfbb9 [IOTDB-2307]UDF Library Series Discovery Functions (#4848)
b5e16dfbb9 is described below

commit b5e16dfbb9cef3160b8b37e962b87e95ed112905
Author: Pengyu Chen <48...@users.noreply.github.com>
AuthorDate: Fri Apr 8 14:41:46 2022 +0800

    [IOTDB-2307]UDF Library Series Discovery Functions (#4848)
---
 docs/UserGuide/Library-UDF/Series-Discovery.md     | 170 ++++++++
 docs/zh/UserGuide/Library-UDF/Series-Discovery.md  | 171 ++++++++
 .../library/series/UDTFConsecutiveSequences.java   |  78 ++++
 .../library/series/UDTFConsecutiveWindows.java     |  90 ++++
 .../iotdb/library/series/util/ConsecutiveUtil.java | 129 ++++++
 .../apache/iotdb/library/series/SeriesTest.java    | 467 +++++++++++++++++++++
 6 files changed, 1105 insertions(+)

diff --git a/docs/UserGuide/Library-UDF/Series-Discovery.md b/docs/UserGuide/Library-UDF/Series-Discovery.md
new file mode 100644
index 0000000000..caf88a3df8
--- /dev/null
+++ b/docs/UserGuide/Library-UDF/Series-Discovery.md
@@ -0,0 +1,170 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+        http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+# Series Discovery
+
+## ConsecutiveSequences
+
+### Usage
+
+This function is used to find locally longest consecutive subsequences in strictly equispaced multidimensional data.
+
+Strictly equispaced data is the data whose time intervals are strictly equal. Missing data, including missing rows and missing values, is allowed in it, while data redundancy and timestamp drift is not allowed.
+
+Consecutive subsequence is the subsequence that is strictly equispaced with the standard time interval without any missing data. If a consecutive subsequence is not a proper subsequence of any consecutive subsequence, it is locally longest.
+
+**Name:** CONSECUTIVESEQUENCES
+
+**Input Series:** Support multiple input series. The type is arbitrary but the data is strictly equispaced.
+
+**Parameters:** 
+
++ `gap`: The standard time interval which is a positive number with an unit. The unit is 'ms' for millisecond, 's' for second, 'm' for minute, 'h' for hour and 'd' for day. By default, it will be estimated by the mode of time intervals.
+
+**Output Series:** Output a single series. The type is INT32. Each data point in the output series corresponds to a locally longest consecutive subsequence. The output timestamp is the starting timestamp of the subsequence and the output value is the number of data points in the subsequence.
+
+**Note:** For input series that is not strictly equispaced, there is no guarantee on the output.
+
+### Examples
+
+#### Manually Specify the Standard Time Interval
+
+It's able to manually specify the standard time interval by the parameter `gap`. It's notable that false parameter leads to false output.
+
+Input series: 
+
+```
++-----------------------------+---------------+---------------+
+|                         Time|root.test.d1.s1|root.test.d1.s2|
++-----------------------------+---------------+---------------+
+|2020-01-01T00:00:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:05:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:10:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:20:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:25:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:30:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:35:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:40:00.000+08:00|            1.0|           null|
+|2020-01-01T00:45:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:50:00.000+08:00|            1.0|            1.0|
++-----------------------------+---------------+---------------+
+```
+
+SQL for query: 
+
+```sql
+select consecutivesequences(s1,s2,'gap'='5m') from root.test.d1
+```
+
+Output series:
+
+```
++-----------------------------+------------------------------------------------------------------+
+|                         Time|consecutivesequences(root.test.d1.s1, root.test.d1.s2, "gap"="5m")|
++-----------------------------+------------------------------------------------------------------+
+|2020-01-01T00:00:00.000+08:00|                                                                 3|
+|2020-01-01T00:20:00.000+08:00|                                                                 4|
+|2020-01-01T00:45:00.000+08:00|                                                                 2|
++-----------------------------+------------------------------------------------------------------+
+```
+
+
+#### Automatically Estimate the Standard Time Interval
+
+When `gap` is default, this function estimates the standard time interval by the mode of time intervals and gets the same results. Therefore, this usage is more recommended.
+
+Input series is the same as above, the SQL for query is shown below:
+
+```sql
+select consecutivesequences(s1,s2) from root.test.d1
+```
+
+Output series:
+```
++-----------------------------+------------------------------------------------------+
+|                         Time|consecutivesequences(root.test.d1.s1, root.test.d1.s2)|
++-----------------------------+------------------------------------------------------+
+|2020-01-01T00:00:00.000+08:00|                                                     3|
+|2020-01-01T00:20:00.000+08:00|                                                     4|
+|2020-01-01T00:45:00.000+08:00|                                                     2|
++-----------------------------+------------------------------------------------------+
+```
+
+## ConsecutiveWindows
+
+### Usage
+
+This function is used to find consecutive windows of specified length in strictly equispaced multidimensional data.
+
+Strictly equispaced data is the data whose time intervals are strictly equal. Missing data, including missing rows and missing values, is allowed in it, while data redundancy and timestamp drift is not allowed.
+
+Consecutive window is the subsequence that is strictly equispaced with the standard time interval without any missing data. 
+
+**Name:** CONSECUTIVEWINDOWS
+
+**Input Series:** Support multiple input series. The type is arbitrary but the data is strictly equispaced.
+
+**Parameters:** 
+
++ `gap`: The standard time interval which is a positive number with an unit. The unit is 'ms' for millisecond, 's' for second, 'm' for minute, 'h' for hour and 'd' for day. By default, it will be estimated by the mode of time intervals.
++ `length`: The length of the window which is a positive number with an unit. The unit is 'ms' for millisecond, 's' for second, 'm' for minute, 'h' for hour and 'd' for day. This parameter cannot be lacked.
+
+**Output Series:** Output a single series. The type is INT32. Each data point in the output series corresponds to a consecutive window. The output timestamp is the starting timestamp of the window and the output value is the number of data points in the window.
+
+**Note:** For input series that is not strictly equispaced, there is no guarantee on the output.
+
+### Examples
+
+
+Input series:
+
+```
++-----------------------------+---------------+---------------+
+|                         Time|root.test.d1.s1|root.test.d1.s2|
++-----------------------------+---------------+---------------+
+|2020-01-01T00:00:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:05:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:10:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:20:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:25:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:30:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:35:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:40:00.000+08:00|            1.0|           null|
+|2020-01-01T00:45:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:50:00.000+08:00|            1.0|            1.0|
++-----------------------------+---------------+---------------+
+```
+
+SQL for query:
+
+```sql
+select consecutivewindows(s1,s2,'length'='10m') from root.test.d1
+```
+
+Output series:
+```
++-----------------------------+--------------------------------------------------------------------+
+|                         Time|consecutivewindows(root.test.d1.s1, root.test.d1.s2, "length"="10m")|
++-----------------------------+--------------------------------------------------------------------+
+|2020-01-01T00:00:00.000+08:00|                                                                   3|
+|2020-01-01T00:20:00.000+08:00|                                                                   3|
+|2020-01-01T00:25:00.000+08:00|                                                                   3|
++-----------------------------+--------------------------------------------------------------------+
+```
diff --git a/docs/zh/UserGuide/Library-UDF/Series-Discovery.md b/docs/zh/UserGuide/Library-UDF/Series-Discovery.md
new file mode 100644
index 0000000000..8557ac58bd
--- /dev/null
+++ b/docs/zh/UserGuide/Library-UDF/Series-Discovery.md
@@ -0,0 +1,171 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+    
+        http://www.apache.org/licenses/LICENSE-2.0
+    
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+# 序列发现
+
+## ConsecutiveSequences
+
+### 函数简介
+
+本函数用于在多维严格等间隔数据中发现局部最长连续子序列。
+
+严格等间隔数据是指数据的时间间隔是严格相等的,允许存在数据缺失(包括行缺失和值缺失),但不允许存在数据冗余和时间戳偏移。
+
+连续子序列是指严格按照标准时间间隔等距排布,不存在任何数据缺失的子序列。如果某个连续子序列不是任何连续子序列的真子序列,那么它是局部最长的。
+
+
+**函数名:** CONSECUTIVESEQUENCES
+
+**输入序列:** 支持多个输入序列,类型可以是任意的,但要满足严格等间隔的要求。
+
+**参数:**
+
++ `gap`:标准时间间隔,是一个有单位的正数。目前支持五种单位,分别是'ms'(毫秒)、's'(秒)、'm'(分钟)、'h'(小时)和'd'(天)。在缺省情况下,函数会利用众数估计标准时间间隔。
+
+**输出序列:** 输出单个序列,类型为 INT32。输出序列中的每一个数据点对应一个局部最长连续子序列,时间戳为子序列的起始时刻,值为子序列包含的数据点个数。
+
+**提示:** 对于不符合要求的输入,本函数不对输出做任何保证。
+
+### 使用示例
+
+#### 手动指定标准时间间隔
+
+本函数可以通过`gap`参数手动指定标准时间间隔。需要注意的是,错误的参数设置会导致输出产生严重错误。
+
+输入序列:
+
+```
++-----------------------------+---------------+---------------+
+|                         Time|root.test.d1.s1|root.test.d1.s2|
++-----------------------------+---------------+---------------+
+|2020-01-01T00:00:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:05:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:10:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:20:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:25:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:30:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:35:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:40:00.000+08:00|            1.0|           null|
+|2020-01-01T00:45:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:50:00.000+08:00|            1.0|            1.0|
++-----------------------------+---------------+---------------+
+```
+
+用于查询的SQL语句:
+
+```sql
+select consecutivesequences(s1,s2,'gap'='5m') from root.test.d1
+```
+
+输出序列:
+
+```
++-----------------------------+------------------------------------------------------------------+
+|                         Time|consecutivesequences(root.test.d1.s1, root.test.d1.s2, "gap"="5m")|
++-----------------------------+------------------------------------------------------------------+
+|2020-01-01T00:00:00.000+08:00|                                                                 3|
+|2020-01-01T00:20:00.000+08:00|                                                                 4|
+|2020-01-01T00:45:00.000+08:00|                                                                 2|
++-----------------------------+------------------------------------------------------------------+
+```
+
+#### 自动估计标准时间间隔
+
+当`gap`参数缺省时,本函数可以利用众数估计标准时间间隔,得到同样的结果。因此,这种用法更受推荐。
+
+输入序列同上,用于查询的SQL语句如下:
+
+```sql
+select consecutivesequences(s1,s2) from root.test.d1
+```
+
+输出序列:
+```
++-----------------------------+------------------------------------------------------+
+|                         Time|consecutivesequences(root.test.d1.s1, root.test.d1.s2)|
++-----------------------------+------------------------------------------------------+
+|2020-01-01T00:00:00.000+08:00|                                                     3|
+|2020-01-01T00:20:00.000+08:00|                                                     4|
+|2020-01-01T00:45:00.000+08:00|                                                     2|
++-----------------------------+------------------------------------------------------+
+```
+
+## ConsecutiveWindows
+
+### 函数简介
+
+本函数用于在多维严格等间隔数据中发现指定长度的连续窗口。
+
+严格等间隔数据是指数据的时间间隔是严格相等的,允许存在数据缺失(包括行缺失和值缺失),但不允许存在数据冗余和时间戳偏移。
+
+连续窗口是指严格按照标准时间间隔等距排布,不存在任何数据缺失的子序列。
+
+
+**函数名:** CONSECUTIVEWINDOWS
+
+**输入序列:** 支持多个输入序列,类型可以是任意的,但要满足严格等间隔的要求。
+
+**参数:**
+
++ `gap`:标准时间间隔,是一个有单位的正数。目前支持五种单位,分别是 'ms'(毫秒)、's'(秒)、'm'(分钟)、'h'(小时)和'd'(天)。在缺省情况下,函数会利用众数估计标准时间间隔。
++ `length`:序列长度,是一个有单位的正数。目前支持五种单位,分别是 'ms'(毫秒)、's'(秒)、'm'(分钟)、'h'(小时)和'd'(天)。该参数不允许缺省。
+
+**输出序列:** 输出单个序列,类型为 INT32。输出序列中的每一个数据点对应一个指定长度连续子序列,时间戳为子序列的起始时刻,值为子序列包含的数据点个数。
+
+**提示:** 对于不符合要求的输入,本函数不对输出做任何保证。
+
+### 使用示例
+
+输入序列:
+
+```
++-----------------------------+---------------+---------------+
+|                         Time|root.test.d1.s1|root.test.d1.s2|
++-----------------------------+---------------+---------------+
+|2020-01-01T00:00:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:05:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:10:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:20:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:25:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:30:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:35:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:40:00.000+08:00|            1.0|           null|
+|2020-01-01T00:45:00.000+08:00|            1.0|            1.0|
+|2020-01-01T00:50:00.000+08:00|            1.0|            1.0|
++-----------------------------+---------------+---------------+
+```
+
+用于查询的SQL语句:
+
+```sql
+select consecutivewindows(s1,s2,'length'='10m') from root.test.d1
+```
+
+输出序列:
+```
++-----------------------------+--------------------------------------------------------------------+
+|                         Time|consecutivewindows(root.test.d1.s1, root.test.d1.s2, "length"="10m")|
++-----------------------------+--------------------------------------------------------------------+
+|2020-01-01T00:00:00.000+08:00|                                                                   3|
+|2020-01-01T00:20:00.000+08:00|                                                                   3|
+|2020-01-01T00:25:00.000+08:00|                                                                   3|
++-----------------------------+--------------------------------------------------------------------+
+```
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/series/UDTFConsecutiveSequences.java b/library-udf/src/main/java/org/apache/iotdb/library/series/UDTFConsecutiveSequences.java
new file mode 100644
index 0000000000..66d5abb24d
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/series/UDTFConsecutiveSequences.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright © 2021 iotdb-quality developer group (iotdb-quality@protonmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.library.series;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.library.series.util.ConsecutiveUtil;
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+/** This function searches for all longest consecutive subsequences of input sereis. */
+public class UDTFConsecutiveSequences implements UDTF {
+  private ConsecutiveUtil consUtil;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator.validate(
+        x -> (long) x > 0,
+        "gap should be a time period whose unit is ms, s, m, h.",
+        Util.parseTime(validator.getParameters().getStringOrDefault("gap", "1ms")));
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    configurations
+        .setAccessStrategy(new RowByRowAccessStrategy())
+        .setOutputDataType(TSDataType.INT32);
+    long gap = Util.parseTime(parameters.getStringOrDefault("gap", "0ms"));
+    consUtil = new ConsecutiveUtil(-gap, -gap, gap);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (consUtil.getGap() == 0) {
+      if (consUtil.getWindow().size() < consUtil.getMaxLen()) { // window is not full
+        consUtil.getWindow().add(Pair.of(row.getTime(), consUtil.check(row)));
+      } else {
+        consUtil.calculateGap();
+        consUtil.cleanWindow(collector);
+      }
+    } else {
+      consUtil.process(row.getTime(), consUtil.check(row), collector);
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    if (consUtil.getGap() == 0) {
+      consUtil.calculateGap();
+      consUtil.cleanWindow(collector);
+    }
+    if (consUtil.getCount() > 1) {
+      collector.putInt(consUtil.getFirst(), consUtil.getCount());
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/series/UDTFConsecutiveWindows.java b/library-udf/src/main/java/org/apache/iotdb/library/series/UDTFConsecutiveWindows.java
new file mode 100644
index 0000000000..9424a7ac26
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/series/UDTFConsecutiveWindows.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright © 2021 iotdb-quality developer group (iotdb-quality@protonmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.iotdb.library.series;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameterValidator;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.library.series.util.ConsecutiveUtil;
+import org.apache.iotdb.library.util.Util;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+/** This function searches for consecutive subsequences of given length of input sereis. */
+public class UDTFConsecutiveWindows implements UDTF {
+  private ConsecutiveUtil consUtil;
+  private static final int maxLen = 128;
+  private long len;
+
+  @Override
+  public void validate(UDFParameterValidator validator) throws Exception {
+    validator
+        .validate(
+            x -> (long) x > 0,
+            "gap should be a time period whose unit is ms, s, m, h.",
+            Util.parseTime(validator.getParameters().getStringOrDefault("gap", "1ms")))
+        .validate(
+            x -> (long) x > 0,
+            "length should be a time period whose unit is ms, s, m, h.",
+            Util.parseTime(validator.getParameters().getString("length")));
+  }
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
+      throws Exception {
+    configurations
+        .setAccessStrategy(new RowByRowAccessStrategy())
+        .setOutputDataType(TSDataType.INT32);
+    long gap = Util.parseTime(parameters.getStringOrDefault("gap", "0ms"));
+    len = Util.parseTime(parameters.getString("length"));
+    int count = gap == 0 ? 0 : (int) (len / gap + 1);
+    consUtil = new ConsecutiveUtil(-gap, -gap, gap);
+    consUtil.setCount(count);
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    if (consUtil.getGap() == 0) {
+      if (consUtil.getWindow().size() < maxLen) { // window is not full
+        consUtil.getWindow().add(Pair.of(row.getTime(), consUtil.check(row)));
+      } else {
+        consUtil.calculateGap();
+        consUtil.cleanWindow(collector);
+      }
+    } else {
+      consUtil.process(row.getTime(), consUtil.check(row), collector);
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    if (consUtil.getGap() == 0) {
+      consUtil.calculateGap();
+      consUtil.cleanWindow(collector);
+    }
+    for (;
+        consUtil.getFirst() + len <= consUtil.getLast();
+        consUtil.setFirst(consUtil.getFirst() + consUtil.getGap())) {
+      collector.putInt(consUtil.getFirst(), consUtil.getCount());
+    }
+  }
+}
diff --git a/library-udf/src/main/java/org/apache/iotdb/library/series/util/ConsecutiveUtil.java b/library-udf/src/main/java/org/apache/iotdb/library/series/util/ConsecutiveUtil.java
new file mode 100644
index 0000000000..a5e085b46f
--- /dev/null
+++ b/library-udf/src/main/java/org/apache/iotdb/library/series/util/ConsecutiveUtil.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright © 2021 iotdb-quality developer group (iotdb-quality@protonmail.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iotdb.library.series.util;
+
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.library.util.Util;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+/** Util for ConsecutiveSequences and ConsecutiveWindows */
+public class ConsecutiveUtil {
+  private static final int maxLen = 128;
+  private long first;
+  private long last;
+  private long gap;
+  private int count = 0;
+  private final ArrayList<Pair<Long, Boolean>> window = new ArrayList<>(maxLen);
+
+  public ConsecutiveUtil(long first, long last, long gap) {
+    this.first = first;
+    this.last = last;
+    this.gap = gap;
+  }
+
+  public ArrayList<Pair<Long, Boolean>> getWindow() {
+    return window;
+  }
+
+  public long getGap() {
+    return gap;
+  }
+
+  public void setGap(long gap) {
+    this.gap = gap;
+  }
+
+  public int getMaxLen() {
+    return maxLen;
+  }
+
+  public int getCount() {
+    return count;
+  }
+
+  public void setCount(int count) {
+    this.count = count;
+  }
+
+  public long getFirst() {
+    return first;
+  }
+
+  public void setFirst(long first) {
+    this.first = first;
+  }
+
+  public long getLast() {
+    return last;
+  }
+
+  /** check Null values */
+  public boolean check(Row row) {
+    for (int i = 0; i < row.size(); i++) {
+      if (row.isNull(i)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /** calculate standard timestamp gap in given window. */
+  public void calculateGap() {
+    long[] time = new long[window.size() - 1];
+    for (int i = 0; i < time.length; i++) {
+      time[i] = window.get(i + 1).getLeft() - window.get(i).getLeft();
+    }
+    gap = Util.mode(time);
+  }
+
+  /** clear data points in the window */
+  public void cleanWindow(PointCollector collector) throws IOException {
+    if (window.isEmpty()) {
+      return;
+    }
+    first = last = -gap;
+    for (Pair<Long, Boolean> p : window) {
+      process(p.getLeft(), p.getRight(), collector);
+    }
+  }
+
+  /** process one row */
+  public void process(long time, boolean nullExist, PointCollector collector) throws IOException {
+    if (nullExist) { // consecutive subsequence ends with null
+      if (count > 1) {
+        collector.putInt(first, count);
+      }
+      first = last = -gap;
+      count = 0;
+    } else {
+      if (time == last + gap) { // correct gap and not null value, subsequence grows
+        last = time;
+        count++;
+      } else { // incorrect gap and not null value, subsequence ends, and new subsequence starts
+        if (count > 1) {
+          collector.putInt(first, count);
+        }
+        first = last = time;
+        count = 1;
+      }
+    }
+  }
+}
diff --git a/library-udf/src/test/java/org/apache/iotdb/library/series/SeriesTest.java b/library-udf/src/test/java/org/apache/iotdb/library/series/SeriesTest.java
new file mode 100644
index 0000000000..afb09147d2
--- /dev/null
+++ b/library-udf/src/test/java/org/apache/iotdb/library/series/SeriesTest.java
@@ -0,0 +1,467 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.library.series;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.integration.env.ConfigFactory;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.fail;
+
+public class SeriesTest {
+  protected static final int ITERATION_TIMES = 10_000;
+
+  protected static final long TIMESTAMP_INTERVAL = 60; // gap = 60ms
+
+  protected static final long START_TIMESTAMP = 0;
+
+  protected static final long END_TIMESTAMP = START_TIMESTAMP + ITERATION_TIMES * ITERATION_TIMES;
+
+  private static final float oldUdfCollectorMemoryBudgetInMB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfCollectorMemoryBudgetInMB();
+  private static final float oldUdfTransformerMemoryBudgetInMB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfTransformerMemoryBudgetInMB();
+  private static final float oldUdfReaderMemoryBudgetInMB =
+      IoTDBDescriptor.getInstance().getConfig().getUdfReaderMemoryBudgetInMB();
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(5)
+        .setUdfTransformerMemoryBudgetInMB(5)
+        .setUdfReaderMemoryBudgetInMB(5);
+    EnvFactory.getEnv().initBeforeClass();
+    createTimeSeries();
+    generateData();
+    registerUDF();
+  }
+
+  private static void createTimeSeries() throws MetadataException {
+    IoTDB.metaManager.setStorageGroup(new PartialPath("root.vehicle"));
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.vehicle.d1.s1"),
+        TSDataType.INT32,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.vehicle.d1.s2"),
+        TSDataType.INT64,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.vehicle.d2.s1"),
+        TSDataType.FLOAT,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+    IoTDB.metaManager.createTimeseries(
+        new PartialPath("root.vehicle.d2.s2"),
+        TSDataType.DOUBLE,
+        TSEncoding.PLAIN,
+        CompressionType.UNCOMPRESSED,
+        null);
+  }
+
+  private static void generateData() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      // d1
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", 1577808000, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", 1577808300, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", 1577808600, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", 1577809200, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", 1577809500, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", 1577809800, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", 1577810100, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1) values(%d,%d)",
+              1577810400, 1)); // s2 == null
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", 1577810700, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d1(timestamp,s1,s2) values(%d,%d,%d)", 1577811000, 1, 1));
+      // d2
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2) values(%d,%d,%d)", 1577808000, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2) values(%d,%d,%d)", 1577808300, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2) values(%d,%d,%d)", 1577808600, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2) values(%d,%d,%d)", 1577809200, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2) values(%d,%d,%d)", 1577809500, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2) values(%d,%d,%d)", 1577809800, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2) values(%d,%d,%d)", 1577810100, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1) values(%d,%d)",
+              1577810400, 1)); // s2 == null
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2) values(%d,%d,%d)", 1577810700, 1, 1));
+      statement.execute(
+          String.format(
+              "insert into root.vehicle.d2(timestamp,s1,s2) values(%d,%d,%d)", 1577811000, 1, 1));
+
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  private static void registerUDF() {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute(
+          "create function ConsecutiveSequences as 'org.apache.iotdb.library.series.UDTFConsecutiveSequences'");
+      statement.execute(
+          "create function ConsecutiveWindows as 'org.apache.iotdb.library.series.UDTFConsecutiveWindows'");
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterClass();
+    ConfigFactory.getConfig()
+        .setUdfCollectorMemoryBudgetInMB(oldUdfCollectorMemoryBudgetInMB)
+        .setUdfTransformerMemoryBudgetInMB(oldUdfTransformerMemoryBudgetInMB)
+        .setUdfReaderMemoryBudgetInMB(oldUdfReaderMemoryBudgetInMB);
+  }
+
+  @Test
+  public void testConsecutiveSequences1() {
+    String sqlStr = "select ConsecutiveSequences(d1.s1,d1.s2) from root.vehicle";
+    long timeStamp = 0;
+    int value = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      int resultSetLength = resultSet.getRow();
+      Assert.assertEquals(resultSetLength, 3);
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577808000);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809200);
+      Assert.assertEquals(value, 4);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577810700);
+      Assert.assertEquals(value, 2);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testConsecutiveSequences2() {
+    String sqlStr = "select ConsecutiveSequences(d2.s1,d2.s2) from root.vehicle";
+    long timeStamp = 0;
+    int value = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      int resultSetLength = resultSet.getRow();
+      Assert.assertEquals(resultSetLength, 3);
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577808000);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809200);
+      Assert.assertEquals(value, 4);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577810700);
+      Assert.assertEquals(value, 2);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testConsecutiveSequences3() {
+    String sqlStr = "select ConsecutiveSequences(d1.s1,d1.s2,'gap'='5m') from root.vehicle";
+    long timeStamp = 0;
+    int value = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      int resultSetLength = resultSet.getRow();
+      Assert.assertEquals(resultSetLength, 3);
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577808000);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809200);
+      Assert.assertEquals(value, 4);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577810700);
+      Assert.assertEquals(value, 2);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testConsecutiveSequences4() {
+    String sqlStr = "select ConsecutiveSequences(d2.s1,d2.s2,'gap'='5m') from root.vehicle";
+    long timeStamp = 0;
+    int value = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      int resultSetLength = resultSet.getRow();
+      Assert.assertEquals(resultSetLength, 3);
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577808000);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809200);
+      Assert.assertEquals(value, 4);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577810700);
+      Assert.assertEquals(value, 2);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testConsecutiveWindows1() {
+    String sqlStr = "select ConsecutiveWindows(d1.s1,d1.s2,'length'='10m') from root.vehicle";
+    long timeStamp = 0;
+    int value = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      int resultSetLength = resultSet.getRow();
+      Assert.assertEquals(resultSetLength, 3);
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577808000);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809200);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809500);
+      Assert.assertEquals(value, 3);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testConsecutiveWindows2() {
+    String sqlStr = "select ConsecutiveWindows(d2.s1,d2.s2,'length'='10m') from root.vehicle";
+    long timeStamp = 0;
+    int value = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      int resultSetLength = resultSet.getRow();
+      Assert.assertEquals(resultSetLength, 3);
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577808000);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809200);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809500);
+      Assert.assertEquals(value, 3);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testConsecutiveWindows3() {
+    String sqlStr =
+        "select ConsecutiveWindows(d1.s1,d1.s2,'length'='10m','gap'='5m') from root.vehicle";
+    long timeStamp = 0;
+    int value = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      int resultSetLength = resultSet.getRow();
+      Assert.assertEquals(resultSetLength, 3);
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577808000);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809200);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809500);
+      Assert.assertEquals(value, 3);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void testConsecutiveWindows4() {
+    String sqlStr =
+        "select ConsecutiveWindows(d2.s1,d2.s2,'length'='10m','gap'='5m') from root.vehicle";
+    long timeStamp = 0;
+    int value = 0;
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+      int resultSetLength = resultSet.getRow();
+      Assert.assertEquals(resultSetLength, 3);
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577808000);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809200);
+      Assert.assertEquals(value, 3);
+
+      resultSet.next();
+
+      timeStamp = Long.parseLong(resultSet.getString(0));
+      value = Integer.parseInt(resultSet.getString(1));
+      Assert.assertEquals(timeStamp, 1577809500);
+      Assert.assertEquals(value, 3);
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+}