You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/12/29 15:03:07 UTC

[iotdb] branch master updated: [IOTDB-1070] Add interface `terminate` for UDTF (#2366)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c16060  [IOTDB-1070] Add interface `terminate` for UDTF (#2366)
4c16060 is described below

commit 4c160603878fc2fb244088e092929e32c717aaf4
Author: Steve Yurong Su <st...@outlook.com>
AuthorDate: Tue Dec 29 23:02:42 2020 +0800

    [IOTDB-1070] Add interface `terminate` for UDTF (#2366)
---
 .../Operation Manual/UDF User Defined Function.md  |  53 +++++++
 .../Operation Manual/UDF User Defined Function.md  |  53 +++++++
 .../org/apache/iotdb/db/query/udf/api/UDTF.java    |  12 ++
 .../db/query/udf/api/collector/PointCollector.java |   4 +-
 .../api/customizer/config/UDTFConfigurations.java  |   3 +-
 .../db/query/udf/core/executor/UDTFExecutor.java   |  21 ++-
 .../udf/core/transformer/UDFQueryTransformer.java  |  18 ++-
 .../integration/IoTDBUDTFAlignByTimeQueryIT.java   | 169 +++++++++++++++++++++
 .../org/apache/iotdb/db/query/udf/example/Max.java |  64 ++++++++
 .../db/query/udf/example/TerminateTester.java      |  64 ++++++++
 10 files changed, 451 insertions(+), 10 deletions(-)

diff --git a/docs/UserGuide/Operation Manual/UDF User Defined Function.md b/docs/UserGuide/Operation Manual/UDF User Defined Function.md
index 7a0a9b8..cef2567 100644
--- a/docs/UserGuide/Operation Manual/UDF User Defined Function.md	
+++ b/docs/UserGuide/Operation Manual/UDF User Defined Function.md	
@@ -67,6 +67,7 @@ The following table shows all the interfaces available for user implementation.
 | `void beforeDestroy() `                                      | This method is called by the framework after the last input data is processed, and will only be called once in the life cycle of each UDF instance. | Optional                                              |
 | `void transform(Row row, PointCollector collector) throws Exception` | This method is called by the framework. This data processing method will be called when you choose to use the `RowByRowAccessStrategy` strategy (set in `beforeStart`) to consume raw data. Input data is passed in by `Row`, and the transformation result should be output by `PointCollector`. You need to call the data collection method provided by `collector`  to determine the output data. | Required to implement at lea [...]
 | `void transform(RowWindow rowWindow, PointCollector collector) throws Exception` | This method is called by the framework. This data processing method will be called when you choose to use the `SlidingSizeWindowAccessStrategy` or `SlidingTimeWindowAccessStrategy` strategy (set in `beforeStart`) to consume raw data. Input data is passed in by `RowWindow`, and the transformation result should be output by `PointCollector`. You need to call the data collection method provided by `collecto [...]
+| `void terminate(PointCollector collector) throws Exception`  | This method is called by the framework. This method will be called once after all `transform` calls have been executed. In a single UDF query, this method will and will only be called once. You need to call the data collection method provided by `collector`  to determine the output data. | Optional                                              |
 
 Note that every time the framework executes a UDTF query, a new UDF instance will be constructed. When the query ends, the corresponding instance will be destroyed. Therefore, the internal data of the instances in different UDTF queries (even in the same SQL statement) are isolated. You can maintain some state data in the UDTF without considering the influence of concurrency and other factors.
 
@@ -289,6 +290,58 @@ public class Counter implements UDTF {
 
 
 
+### `void terminate(PointCollector collector) throws Exception`
+
+In some scenarios, a UDF needs to traverse all the original data to calculate the final output data points. The `terminate` interface provides support for those scenarios.
+
+This method is called after all `transform` calls are executed and before the `beforeDestory` method is executed. You can implement the `transform` method to perform pure data processing (without outputting any data points), and implement the  `terminate` method to output the processing results.
+
+The processing results need to be output by the  `PointCollector`. You can output any number of data points in one `terminate` method call. It should be noted that the type of output data points must be the same as you set in the `beforeStart` method, and the timestamps of output data points must be strictly monotonically increasing.
+
+Below is a complete UDF example that implements the `void terminate(PointCollector collector) throws Exception` method. It takes one time series whose data type is `INT32` as input, and outputs the maximum value point of the series.
+
+```java
+import java.io.IOException;
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class Max implements UDTF {
+
+  private Long time;
+  private int value;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) {
+    configurations
+        .setOutputDataType(TSDataType.INT32)
+        .setAccessStrategy(new RowByRowAccessStrategy());
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) {
+    int candidateValue = row.getInt(0);
+    if (time == null || value < candidateValue) {
+      time = row.getTime();
+      value = candidateValue;
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws IOException {
+    if (time != null) {
+      collector.putInt(time, value);
+    }
+  }
+}
+```
+
+
+
 ## Maven Project Example
 
 If you use Maven, you can build your own UDF project referring to our **udf-example** module. You can find the project [here](https://github.com/apache/iotdb/tree/master/example/udf).
diff --git a/docs/zh/UserGuide/Operation Manual/UDF User Defined Function.md b/docs/zh/UserGuide/Operation Manual/UDF User Defined Function.md
index d461c00..90444a9 100644
--- a/docs/zh/UserGuide/Operation Manual/UDF User Defined Function.md	
+++ b/docs/zh/UserGuide/Operation Manual/UDF User Defined Function.md	
@@ -67,6 +67,7 @@ IoTDB 支持两种类型的 UDF 函数,如下表所示。
 | `void beforeDestroy() `                                      | UDTF的结束方法。此方法由框架调用,并且只会被调用一次,即在处理完最后一条记录之后被调用。 | 否                 |
 | `void transform(Row row, PointCollector collector) throws Exception` | 这个方法由框架调用。当您在`beforeStart`中选择以`RowByRowAccessStrategy`的策略消费原始数据时,这个数据处理方法就会被调用。输入参数以`Row`的形式传入,输出结果通过`PointCollector`输出。您需要在该方法内自行调用`collector`提供的数据收集方法,以决定最终的输出数据。 | 与下面的方法二选一 |
 | `void transform(RowWindow rowWindow, PointCollector collector) throws Exception` | 这个方法由框架调用。当您在`beforeStart`中选择以`SlidingSizeWindowAccessStrategy`或者`SlidingTimeWindowAccessStrategy`的策略消费原始数据时,这个数据处理方法就会被调用。输入参数以`RowWindow`的形式传入,输出结果通过`PointCollector`输出。您需要在该方法内自行调用`collector`提供的数据收集方法,以决定最终的输出数据。 | 与上面的方法二选一 |
+| `void terminate(PointCollector collector) throws Exception`  | 这个方法由框架调用。该方法会在所有的`transform`调用执行完成后,在`beforeDestory`方法执行前被调用。在一个UDF查询过程中,该方法会且只会调用一次。您需要在该方法内自行调用`collector`提供的数据收集方法,以决定最终的输出数据。 | 否                 |
 
 注意,框架每执行一次UDTF查询,都会构造一个全新的UDF类实例,查询结束时,对应的UDF类实例即被销毁,因此不同UDTF查询(即使是在同一个SQL语句中)UDF类实例内部的数据都是隔离的。您可以放心地在UDTF中维护一些状态数据,无需考虑并发对UDF类实例内部状态数据的影响。
 
@@ -287,6 +288,58 @@ public class Counter implements UDTF {
 
 
 
+### `void terminate(PointCollector collector) throws Exception`
+
+在一些场景下,UDF需要遍历完所有的原始数据后才能得到最后的输出结果。`terminate`接口为这类UDF提供了支持。
+
+该方法会在所有的`transform`调用执行完成后,在`beforeDestory`方法执行前被调用。您可以选择使用`transform`方法进行单纯的数据处理,最后使用`terminate`将处理结果输出。
+
+结果需要由`PointCollector`输出。您可以选择在一次`terminate`方法调用中输出任意数量的数据点。需要注意的是,输出数据点的类型必须与您在`beforeStart`方法中设置的一致,而输出数据点的时间戳必须是严格单调递增的。
+
+下面是一个实现了`void terminate(PointCollector collector) throws Exception`方法的完整UDF示例。它接收一个`INT32`类型的时间序列输入,作用是输出该序列的最大值点。
+
+```java
+import java.io.IOException;
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class Max implements UDTF {
+
+  private Long time;
+  private int value;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) {
+    configurations
+        .setOutputDataType(TSDataType.INT32)
+        .setAccessStrategy(new RowByRowAccessStrategy());
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) {
+    int candidateValue = row.getInt(0);
+    if (time == null || value < candidateValue) {
+      time = row.getTime();
+      value = candidateValue;
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws IOException {
+    if (time != null) {
+      collector.putInt(time, value);
+    }
+  }
+}
+```
+
+
+
 ## 完整Maven项目示例
 
 如果您使用[Maven](http://search.maven.org/),可以参考我们编写的示例项目**udf-example**。您可以在[这里](https://github.com/apache/iotdb/tree/master/example/udf)找到它。
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/api/UDTF.java b/server/src/main/java/org/apache/iotdb/db/query/udf/api/UDTF.java
index 944204f..a106d5e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/api/UDTF.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/api/UDTF.java
@@ -98,4 +98,16 @@ public interface UDTF extends UDF {
   @SuppressWarnings("squid:S112")
   default void transform(RowWindow rowWindow, PointCollector collector) throws Exception {
   }
+
+  /**
+   * This method will be called once after all {@link UDTF#transform(Row, PointCollector) calls or
+   * {@link UDTF#transform(RowWindow, PointCollector) calls have been executed. In a single UDF
+   * query, this method will and will only be called once.
+   *
+   * @param collector used to collect output data points
+   * @throws Exception the user can throw errors if necessary
+   */
+  @SuppressWarnings("squid:S112")
+  default void terminate(PointCollector collector) throws Exception {
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java b/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java
index 2e46924..269dfbb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/api/collector/PointCollector.java
@@ -30,8 +30,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 /**
- * Used to collect time series data points generated by {@link UDTF#transform(Row, PointCollector)}
- * or {@link UDTF#transform(RowWindow, PointCollector)}.
+ * Used to collect time series data points generated by {@link UDTF#transform(Row, PointCollector)},
+ * {@link UDTF#transform(RowWindow, PointCollector)} or {@link UDTF#terminate(PointCollector)}.
  */
 public interface PointCollector {
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/config/UDTFConfigurations.java b/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/config/UDTFConfigurations.java
index db5b110..d2e6dd8 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/config/UDTFConfigurations.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/api/customizer/config/UDTFConfigurations.java
@@ -57,7 +57,8 @@ public class UDTFConfigurations extends UDFConfigurations {
   /**
    * Used to specify the output data type of the UDTF. In other words, the data type you set here
    * determines the type of data that the PointCollector in {@link UDTF#transform(Row,
-   * PointCollector)} or {@link UDTF#transform(RowWindow, PointCollector)} can receive.
+   * PointCollector)}, {@link UDTF#transform(RowWindow, PointCollector)} or {@link
+   * UDTF#terminate(PointCollector)} can receive.
    *
    * @param outputDataType the output data type of the UDTF
    * @return this
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
index b38ae1e..e582bb2 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/executor/UDTFExecutor.java
@@ -49,8 +49,7 @@ public class UDTFExecutor {
       udtf.beforeStart(new UDFParameters(context.getPaths(), context.getAttributes()),
           configurations);
     } catch (Exception e) {
-      throw new QueryProcessException(
-          "Error occurred during initialization of udf:\n" + e.toString());
+      onError("beforeStart(UDFParameters, UDTFConfigurations)", e);
     }
     configurations.check();
     collector = ElasticSerializableTVList
@@ -62,7 +61,7 @@ public class UDTFExecutor {
     try {
       udtf.transform(row, collector);
     } catch (Exception e) {
-      throw new QueryProcessException("Error occurred during execution of udf:\n" + e.toString());
+      onError("transform(Row, PointCollector)", e);
     }
   }
 
@@ -70,7 +69,15 @@ public class UDTFExecutor {
     try {
       udtf.transform(rowWindow, collector);
     } catch (Exception e) {
-      throw new QueryProcessException("Error occurred during execution of udf:\n" + e.toString());
+      onError("transform(RowWindow, PointCollector)", e);
+    }
+  }
+
+  public void terminate() throws QueryProcessException {
+    try {
+      udtf.terminate(collector);
+    } catch (Exception e) {
+      onError("terminate(PointCollector)", e);
     }
   }
 
@@ -78,6 +85,12 @@ public class UDTFExecutor {
     udtf.beforeDestroy();
   }
 
+  private void onError(String methodName, Exception e) throws QueryProcessException {
+    throw new QueryProcessException(String
+        .format("Error occurred during executing UDTF#%s: %s", methodName, System.lineSeparator())
+        + e.toString());
+  }
+
   public UDFContext getContext() {
     return context;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/UDFQueryTransformer.java b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/UDFQueryTransformer.java
index aafc463..e958ca9 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/UDFQueryTransformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/udf/core/transformer/UDFQueryTransformer.java
@@ -33,24 +33,25 @@ public abstract class UDFQueryTransformer extends Transformer {
   protected final TSDataType udfOutputDataType;
   protected final LayerPointReader udfOutput;
 
+  protected boolean terminated;
+
   protected UDFQueryTransformer(UDTFExecutor executor) {
     this.executor = executor;
     udfOutputDataType = executor.getConfigurations().getOutputDataType();
     udfOutput = executor.getCollector().getPointReaderUsingEvictionStrategy();
+    terminated = false;
   }
 
   @Override
   protected boolean cacheValue() throws QueryProcessException, IOException {
     while (!cacheValueFromUDFOutput()) {
-      if (!executeUDFOnce()) {
+      if (!executeUDFOnce() && !terminate()) {
         return false;
       }
     }
     return true;
   }
 
-  protected abstract boolean executeUDFOnce() throws QueryProcessException, IOException;
-
   protected final boolean cacheValueFromUDFOutput() throws QueryProcessException, IOException {
     boolean hasNext = udfOutput.next();
     if (hasNext) {
@@ -82,6 +83,17 @@ public abstract class UDFQueryTransformer extends Transformer {
     return hasNext;
   }
 
+  protected abstract boolean executeUDFOnce() throws QueryProcessException, IOException;
+
+  protected final boolean terminate() throws QueryProcessException {
+    if (terminated) {
+      return false;
+    }
+    executor.terminate();
+    terminated = true;
+    return true;
+  }
+
   @Override
   public final TSDataType getDataType() {
     return udfOutputDataType;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFAlignByTimeQueryIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFAlignByTimeQueryIT.java
index 9b6cab2..e130fd3 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFAlignByTimeQueryIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBUDTFAlignByTimeQueryIT.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.integration;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -87,6 +88,10 @@ public class IoTDBUDTFAlignByTimeQueryIT {
             CompressionType.UNCOMPRESSED, null);
     IoTDB.metaManager.createTimeseries(new PartialPath("root.vehicle.d3.s2"), TSDataType.DOUBLE,
         TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null);
+    IoTDB.metaManager.createTimeseries(new PartialPath("root.vehicle.d4.s1"), TSDataType.INT32,
+        TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null);
+    IoTDB.metaManager.createTimeseries(new PartialPath("root.vehicle.d4.s2"), TSDataType.INT32,
+        TSEncoding.PLAIN, CompressionType.UNCOMPRESSED, null);
   }
 
   private static void generateData() {
@@ -104,6 +109,8 @@ public class IoTDBUDTFAlignByTimeQueryIT {
             .format("insert into root.vehicle.d2(timestamp,s1,s2) values(%d,%d,%d)", i, i, i)));
         statement.execute((String
             .format("insert into root.vehicle.d3(timestamp,s1,s2) values(%d,%d,%d)", i, i, i)));
+        statement.execute((String
+            .format("insert into root.vehicle.d4(timestamp,s1) values(%d,%d)", 2 * i, 3 * i)));
       }
     } catch (SQLException throwable) {
       fail(throwable.getMessage());
@@ -117,6 +124,9 @@ public class IoTDBUDTFAlignByTimeQueryIT {
       statement.execute("create function udf as \"org.apache.iotdb.db.query.udf.example.Adder\"");
       statement.execute(
           "create function multiplier as \"org.apache.iotdb.db.query.udf.example.Multiplier\"");
+      statement.execute("create function max as \"org.apache.iotdb.db.query.udf.example.Max\"");
+      statement.execute(
+          "create function terminate as \"org.apache.iotdb.db.query.udf.example.TerminateTester\"");
     } catch (SQLException throwable) {
       fail(throwable.getMessage());
     }
@@ -288,6 +298,69 @@ public class IoTDBUDTFAlignByTimeQueryIT {
   }
 
   @Test
+  public void queryWithoutValueFilter6() {
+    String sqlStr = "select max(s1), max(s2) from root.vehicle.d4";
+
+    try (Statement statement = DriverManager.getConnection(
+        Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root").createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+
+      assertEquals(1 + 2, resultSet.getMetaData().getColumnCount());
+
+      assertEquals("Time", resultSet.getMetaData().getColumnName(1));
+
+      String columnS1 = "max(root.vehicle.d4.s1)";
+      String columnS2 = "max(root.vehicle.d4.s2)";
+      assertTrue(columnS1.equals(resultSet.getMetaData().getColumnName(2))
+          || columnS2.equals(resultSet.getMetaData().getColumnName(2)));
+      assertTrue(columnS1.equals(resultSet.getMetaData().getColumnName(3))
+          || columnS2.equals(resultSet.getMetaData().getColumnName(3)));
+
+      assertTrue(resultSet.next());
+      assertEquals(3 * (ITERATION_TIMES - 1), Integer.parseInt(resultSet.getString(columnS1)));
+      assertNull(resultSet.getString(columnS2));
+      assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void queryWithoutValueFilter7() {
+    String sqlStr = "select terminate(s1), terminate(s2) from root.vehicle.d4";
+
+    try (Statement statement = DriverManager.getConnection(
+        Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root").createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+
+      assertEquals(1 + 2, resultSet.getMetaData().getColumnCount());
+
+      assertEquals("Time", resultSet.getMetaData().getColumnName(1));
+
+      String columnS1 = "terminate(root.vehicle.d4.s1)";
+      String columnS2 = "terminate(root.vehicle.d4.s2)";
+      assertTrue(columnS1.equals(resultSet.getMetaData().getColumnName(2))
+          || columnS2.equals(resultSet.getMetaData().getColumnName(2)));
+      assertTrue(columnS1.equals(resultSet.getMetaData().getColumnName(3))
+          || columnS2.equals(resultSet.getMetaData().getColumnName(3)));
+
+      for (int i = 0; i < ITERATION_TIMES; ++i) {
+        assertTrue(resultSet.next());
+        assertEquals(1, Integer.parseInt(resultSet.getString(columnS1)));
+        assertNull(resultSet.getString(columnS2));
+      }
+
+      assertTrue(resultSet.next());
+      assertEquals(ITERATION_TIMES, Integer.parseInt(resultSet.getString(columnS1)));
+      assertNull(resultSet.getString(columnS2));
+
+      assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
   public void queryWithValueFilter1() {
     String sqlStr =
         "select udf(d2.s2, d2.s1), udf(d2.s1, d2.s2), d2.s1, d2.s2, udf(d2.s1, d2.s2), udf(d2.s2, d2.s1), d2.s1, d2.s2 from root.vehicle"
@@ -522,4 +595,100 @@ public class IoTDBUDTFAlignByTimeQueryIT {
       fail(throwable.getMessage());
     }
   }
+
+  @Test
+  public void queryWithValueFilter8() {
+    String sqlStr = "select max(s1), max(s2) from root.vehicle.d4"
+        + String.format(" where root.vehicle.d4.s1 >= %d and root.vehicle.d4.s2 < %d ",
+        (int) (0.3 * ITERATION_TIMES), (int) (0.7 * ITERATION_TIMES));
+
+    try (Statement statement = DriverManager.getConnection(
+        Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root").createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+
+      assertEquals(1 + 2, resultSet.getMetaData().getColumnCount());
+
+      assertEquals("Time", resultSet.getMetaData().getColumnName(1));
+
+      String columnS1 = "max(root.vehicle.d4.s1)";
+      String columnS2 = "max(root.vehicle.d4.s2)";
+      assertTrue(columnS1.equals(resultSet.getMetaData().getColumnName(2))
+          || columnS2.equals(resultSet.getMetaData().getColumnName(2)));
+      assertTrue(columnS1.equals(resultSet.getMetaData().getColumnName(3))
+          || columnS2.equals(resultSet.getMetaData().getColumnName(3)));
+
+      assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void queryWithValueFilter9() {
+    String sqlStr = "select max(s1), max(s2) from root.vehicle.d4"
+        + String.format(" where root.vehicle.d4.s1 >= %d and root.vehicle.d4.s1 < %d ",
+        (int) (0.3 * ITERATION_TIMES), (int) (0.7 * ITERATION_TIMES));
+
+    try (Statement statement = DriverManager.getConnection(
+        Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root").createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+
+      assertEquals(1 + 2, resultSet.getMetaData().getColumnCount());
+
+      assertEquals("Time", resultSet.getMetaData().getColumnName(1));
+
+      String columnS1 = "max(root.vehicle.d4.s1)";
+      String columnS2 = "max(root.vehicle.d4.s2)";
+      assertTrue(columnS1.equals(resultSet.getMetaData().getColumnName(2))
+          || columnS2.equals(resultSet.getMetaData().getColumnName(2)));
+      assertTrue(columnS1.equals(resultSet.getMetaData().getColumnName(3))
+          || columnS2.equals(resultSet.getMetaData().getColumnName(3)));
+
+      assertTrue(resultSet.next());
+      assertEquals((int) (0.7 * ITERATION_TIMES) - 1,
+          Integer.parseInt(resultSet.getString(columnS1)));
+      assertNull(resultSet.getString(columnS2));
+      assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
+
+  @Test
+  public void queryWithValueFilter10() {
+    String sqlStr = "select terminate(s1), terminate(s2) from root.vehicle.d4"
+        + String.format(" where root.vehicle.d4.s1 >= %d and root.vehicle.d4.s1 < %d ",
+        (int) (0.3 * ITERATION_TIMES), (int) (0.7 * ITERATION_TIMES));
+
+    try (Statement statement = DriverManager.getConnection(
+        Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root").createStatement()) {
+      ResultSet resultSet = statement.executeQuery(sqlStr);
+
+      assertEquals(1 + 2, resultSet.getMetaData().getColumnCount());
+
+      assertEquals("Time", resultSet.getMetaData().getColumnName(1));
+
+      String columnS1 = "terminate(root.vehicle.d4.s1)";
+      String columnS2 = "terminate(root.vehicle.d4.s2)";
+      assertTrue(columnS1.equals(resultSet.getMetaData().getColumnName(2))
+          || columnS2.equals(resultSet.getMetaData().getColumnName(2)));
+      assertTrue(columnS1.equals(resultSet.getMetaData().getColumnName(3))
+          || columnS2.equals(resultSet.getMetaData().getColumnName(3)));
+
+      for (int i = 0; i < (int) ((0.7 - 0.3) * ITERATION_TIMES) / 3 + 1; ++i) {
+        assertTrue(resultSet.next());
+        assertEquals(1, Integer.parseInt(resultSet.getString(columnS1)));
+        assertNull(resultSet.getString(columnS2));
+      }
+
+      assertTrue(resultSet.next());
+      assertEquals((int) ((0.7 - 0.3) * ITERATION_TIMES) / 3 + 1,
+          Integer.parseInt(resultSet.getString(columnS1)));
+      assertNull(resultSet.getString(columnS2));
+
+      assertFalse(resultSet.next());
+    } catch (SQLException throwable) {
+      fail(throwable.getMessage());
+    }
+  }
 }
diff --git a/server/src/test/java/org/apache/iotdb/db/query/udf/example/Max.java b/server/src/test/java/org/apache/iotdb/db/query/udf/example/Max.java
new file mode 100644
index 0000000..e545151
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/query/udf/example/Max.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.example;
+
+import java.io.IOException;
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class Max implements UDTF {
+
+  private Long time;
+  private int value;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) {
+    System.out.println("Max#beforeStart");
+    configurations
+        .setOutputDataType(TSDataType.INT32)
+        .setAccessStrategy(new RowByRowAccessStrategy());
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) {
+    int candidateValue = row.getInt(0);
+    if (time == null || value < candidateValue) {
+      time = row.getTime();
+      value = candidateValue;
+    }
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws IOException {
+    if (time != null) {
+      collector.putInt(time, value);
+    }
+  }
+
+  @Override
+  public void beforeDestroy() {
+    System.out.println("Max#beforeDestroy");
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/udf/example/TerminateTester.java b/server/src/test/java/org/apache/iotdb/db/query/udf/example/TerminateTester.java
new file mode 100644
index 0000000..d5d797f
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/query/udf/example/TerminateTester.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.query.udf.example;
+
+import org.apache.iotdb.db.query.udf.api.UDTF;
+import org.apache.iotdb.db.query.udf.api.access.Row;
+import org.apache.iotdb.db.query.udf.api.collector.PointCollector;
+import org.apache.iotdb.db.query.udf.api.customizer.config.UDTFConfigurations;
+import org.apache.iotdb.db.query.udf.api.customizer.parameter.UDFParameters;
+import org.apache.iotdb.db.query.udf.api.customizer.strategy.RowByRowAccessStrategy;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+public class TerminateTester implements UDTF {
+
+  private Long maxTime;
+  private int count;
+
+  @Override
+  public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations) {
+    System.out.println("TerminateTester#beforeStart");
+    configurations
+        .setOutputDataType(TSDataType.INT32)
+        .setAccessStrategy(new RowByRowAccessStrategy());
+    maxTime = null;
+    count = 0;
+  }
+
+  @Override
+  public void transform(Row row, PointCollector collector) throws Exception {
+    maxTime = row.getTime();
+    ++count;
+
+    collector.putInt(maxTime, 1);
+  }
+
+  @Override
+  public void terminate(PointCollector collector) throws Exception {
+    if (maxTime != null) {
+      collector.putInt(maxTime + 1, count);
+    }
+  }
+
+  @Override
+  public void beforeDestroy() {
+    System.out.println("TerminateTester#beforeDestroy");
+  }
+}