You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/14 10:45:16 UTC

[iotdb] branch iotdb-2353-cq created (now 3f4cb90)

This is an automated email from the ASF dual-hosted git repository.

rong pushed a change to branch iotdb-2353-cq
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at 3f4cb90  iotdb-2353

This branch includes the following new commits:

     new 3f4cb90  iotdb-2353

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: iotdb-2353

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch iotdb-2353-cq
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3f4cb908f6aba934a82daeaff864184de3427dc1
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Fri Jan 14 14:18:09 2022 +0800

    iotdb-2353
---
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4    |  4 +
 .../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4   |  2 +-
 .../db/cq/ContinuousQuerySchemaCheckTask.java      | 96 ++++++++++++++++++++++
 .../apache/iotdb/db/cq/ContinuousQueryService.java | 42 ++++++----
 .../apache/iotdb/db/cq/ContinuousQueryTask.java    | 32 ++++----
 .../logical/sys/CreateContinuousQueryOperator.java |  8 +-
 .../qp/physical/sys/CreateContinuousQueryPlan.java | 18 ++--
 .../apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java    |  6 +-
 .../db/service/thrift/impl/TSServiceImpl.java      | 10 +--
 9 files changed, 168 insertions(+), 50 deletions(-)

diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
index be310fb..2162e8e 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlLexer.g4
@@ -93,6 +93,10 @@ BEGIN
     : B E G I N
     ;
 
+BOUNDARY
+    : B O U N D A R Y
+    ;
+
 BY
     : B Y
     ;
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 7046cdd..6d5d50e 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -141,7 +141,7 @@ cqGroupByTimeClause
     ;
 
 resampleClause
-    : RESAMPLE (EVERY DURATION_LITERAL)? (FOR DURATION_LITERAL)?;
+    : RESAMPLE (EVERY DURATION_LITERAL)? (FOR DURATION_LITERAL)? (BOUNDARY dateExpression)?;
 
 // Create Snapshot for Schema
 createSnapshot
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java
new file mode 100644
index 0000000..73c0481
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.cq;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.ContinuousQueryException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.TypeInferenceUtils;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ContinuousQuerySchemaCheckTask extends ContinuousQueryTask {
+
+  public static final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
+
+  public ContinuousQuerySchemaCheckTask(
+      CreateContinuousQueryPlan continuousQueryPlan, long windowEndTimestamp) {
+    super(continuousQueryPlan, windowEndTimestamp);
+  }
+
+  /** we only do some checks here. we don't write any data. */
+  @Override
+  protected void doInsert(QueryDataSet queryDataSet, GroupByTimePlan queryPlan)
+      throws MetadataException, IOException, ContinuousQueryException {
+    Set<PartialPath> targetPaths = new HashSet<>(generateTargetPaths(queryDataSet.getPaths()));
+    checkTargetPathNumber(queryDataSet, targetPaths);
+    checkTargetPathDataType(queryPlan, targetPaths);
+    tryExecuteQueryOnce(queryDataSet);
+  }
+
+  private void checkTargetPathNumber(QueryDataSet queryDataSet, Set<PartialPath> targetPaths)
+      throws ContinuousQueryException {
+    if (targetPaths.size() != queryDataSet.getDataTypes().size()) {
+      throw new ContinuousQueryException(
+          "the target paths generated by the pattern in into clause are duplicated. please change the pattern.");
+    }
+  }
+
+  private void checkTargetPathDataType(GroupByTimePlan queryPlan, Set<PartialPath> targetPaths)
+      throws MetadataException, ContinuousQueryException {
+    TSDataType sourceDataType =
+        TypeInferenceUtils.getAggrDataType(
+            queryPlan.getAggregations().get(0), queryPlan.getDataTypes().get(0));
+    for (PartialPath targetPath : targetPaths) {
+      try {
+        TSDataType targetPathDataType = IoTDB.metaManager.getSeriesSchema(targetPath).getType();
+        if (!sourceDataType.equals(targetPathDataType)) {
+          throw new ContinuousQueryException(
+              String.format(
+                  "target path (%s) data type (%s) and source data type (%s) dose not match.",
+                  targetPath.getFullPath(), targetPathDataType.name(), sourceDataType.name()));
+        }
+      } catch (PathNotExistException pathNotExistException) {
+        if (!CONFIG.isAutoCreateSchemaEnabled()) {
+          throw new ContinuousQueryException(
+              String.format("target path (%s) dose not exist.", targetPath.getFullPath()));
+        }
+
+        // else ignored. we use the auto-create-schema feature.
+      }
+    }
+  }
+
+  private void tryExecuteQueryOnce(QueryDataSet queryDataSet) throws IOException {
+    if (queryDataSet.hasNext()) {
+      queryDataSet.next();
+    }
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
index 84471bd..c92bf8f 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
@@ -45,6 +45,8 @@ public class ContinuousQueryService implements IService {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryService.class);
 
+  private static final long SYSTEM_STARTUP_TIME = DatetimeUtils.currentTime();
+
   private static final ContinuousQueryTaskPoolManager TASK_POOL_MANAGER =
       ContinuousQueryTaskPoolManager.getInstance();
   private static final long TASK_SUBMIT_CHECK_INTERVAL =
@@ -63,13 +65,9 @@ public class ContinuousQueryService implements IService {
   @Override
   public void start() {
     for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
-      long durationFromCreation = DatetimeUtils.currentTime() - plan.getCreationTimestamp();
-      long nextExecutionTimestamp =
-          plan.getCreationTimestamp()
-              + plan.getEveryInterval()
-                  * (durationFromCreation / plan.getEveryInterval()
-                      + ((durationFromCreation % plan.getEveryInterval() == 0) ? 0 : 1));
-      nextExecutionTimestamps.put(plan.getContinuousQueryName(), nextExecutionTimestamp);
+      nextExecutionTimestamps.put(
+          plan.getContinuousQueryName(),
+          calculateNextExecutionTimestamp(plan, SYSTEM_STARTUP_TIME));
     }
 
     continuousQueryTaskSubmitThread =
@@ -84,6 +82,23 @@ public class ContinuousQueryService implements IService {
     LOGGER.info("Continuous query service started.");
   }
 
+  private long calculateNextExecutionTimestamp(
+      CreateContinuousQueryPlan plan, final long currentTime) {
+    final long expectedFirstExecutionTime =
+        plan.getFirstExecutionTimeBoundary() + plan.getForInterval();
+
+    if (currentTime <= expectedFirstExecutionTime) {
+      return expectedFirstExecutionTime;
+    }
+
+    final long durationFromExpectedFirstExecutionTime = currentTime - expectedFirstExecutionTime;
+    final long everyInterval = plan.getEveryInterval();
+    return expectedFirstExecutionTime
+        + everyInterval
+            * (durationFromExpectedFirstExecutionTime / everyInterval
+                + (durationFromExpectedFirstExecutionTime % everyInterval == 0 ? 0 : 1));
+  }
+
   private void checkAndSubmitTasks() {
     long currentTimestamp = DatetimeUtils.currentTime();
     for (CreateContinuousQueryPlan plan : continuousQueryPlans.values()) {
@@ -127,8 +142,7 @@ public class ContinuousQueryService implements IService {
           String.format("Continuous Query [%s] already exists", plan.getContinuousQueryName()));
     }
 
-    // some exceptions will only occur at runtime
-    tryExecuteCQTaskOnceBeforeRegistration(plan);
+    checkSchemaBeforeRegistration(plan);
 
     acquireRegistrationLock();
     try {
@@ -144,10 +158,10 @@ public class ContinuousQueryService implements IService {
     }
   }
 
-  private void tryExecuteCQTaskOnceBeforeRegistration(CreateContinuousQueryPlan plan)
+  private void checkSchemaBeforeRegistration(CreateContinuousQueryPlan plan)
       throws ContinuousQueryException {
     try {
-      new ContinuousQueryTask(plan, plan.getCreationTimestamp()).run();
+      new ContinuousQuerySchemaCheckTask(plan, plan.getFirstExecutionTimeBoundary()).run();
     } catch (Exception e) {
       throw new ContinuousQueryException("Failed to create continuous query task.", e);
     }
@@ -155,11 +169,9 @@ public class ContinuousQueryService implements IService {
 
   private void doRegister(CreateContinuousQueryPlan plan) {
     continuousQueryPlans.put(plan.getContinuousQueryName(), plan);
-    // one cq task has been executed in tryExecuteCQTaskOnceBeforeRegistration
-    // so nextExecutionTimestamp should start with
-    //     plan.getCreationTimestamp() + plan.getEveryInterval()
     nextExecutionTimestamps.put(
-        plan.getContinuousQueryName(), plan.getCreationTimestamp() + plan.getEveryInterval());
+        plan.getContinuousQueryName(),
+        calculateNextExecutionTimestamp(plan, DatetimeUtils.currentTime()));
   }
 
   @TestOnly
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
index 276827e..f05c977 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
 import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
-import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
@@ -55,17 +54,17 @@ import java.util.regex.Pattern;
 
 public class ContinuousQueryTask extends WrappedRunnable {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryTask.class);
+  protected static final Logger LOGGER = LoggerFactory.getLogger(ContinuousQueryTask.class);
 
-  private static final Pattern PATH_NODE_NAME_PATTERN = Pattern.compile("\\$\\{\\w+}");
-  private static final int EXECUTION_BATCH_SIZE = IoTDBConstant.DEFAULT_FETCH_SIZE;
+  protected static final Pattern PATH_NODE_NAME_PATTERN = Pattern.compile("\\$\\{\\w+}");
+  protected static final int EXECUTION_BATCH_SIZE = IoTDBConstant.DEFAULT_FETCH_SIZE;
 
-  private final ServiceProvider serviceProvider;
+  protected final ServiceProvider serviceProvider;
 
   // To save the continuous query info
-  private final CreateContinuousQueryPlan continuousQueryPlan;
+  protected final CreateContinuousQueryPlan continuousQueryPlan;
   // Next timestamp to execute a query
-  private final long windowEndTimestamp;
+  protected final long windowEndTimestamp;
 
   public ContinuousQueryTask(
       CreateContinuousQueryPlan continuousQueryPlan, long windowEndTimestamp) {
@@ -112,11 +111,11 @@ public class ContinuousQueryTask extends WrappedRunnable {
       // insert data into target timeseries
       doInsert(queryDataSet, queryPlan);
     } finally {
-      QueryResourceManager.getInstance().endQuery(queryId);
+      ServiceProvider.SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
     }
   }
 
-  private String generateSQL() {
+  protected String generateSQL() {
     return continuousQueryPlan.getQuerySqlBeforeGroupByClause()
         + "group by (["
         + (windowEndTimestamp - continuousQueryPlan.getForInterval())
@@ -129,9 +128,8 @@ public class ContinuousQueryTask extends WrappedRunnable {
         + continuousQueryPlan.getQuerySqlAfterGroupByClause();
   }
 
-  private void doInsert(QueryDataSet queryDataSet, GroupByTimePlan queryPlan)
-      throws IOException, IllegalPathException, QueryProcessException, StorageGroupNotSetException,
-          StorageEngineException {
+  protected void doInsert(QueryDataSet queryDataSet, GroupByTimePlan queryPlan)
+      throws IOException, MetadataException, QueryProcessException, StorageEngineException {
     int columnSize = queryDataSet.getDataTypes().size();
     TSDataType dataType =
         TypeInferenceUtils.getAggrDataType(
@@ -173,7 +171,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
     }
   }
 
-  private InsertTabletPlan[] generateInsertTabletPlans(
+  protected InsertTabletPlan[] generateInsertTabletPlans(
       int columnSize, QueryDataSet result, TSDataType dataType) throws IllegalPathException {
     List<PartialPath> targetPaths = generateTargetPaths(result.getPaths());
     InsertTabletPlan[] insertTabletPlans = new InsertTabletPlan[columnSize];
@@ -189,7 +187,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
     return insertTabletPlans;
   }
 
-  private Object[][] constructColumns(int columnSize, int fetchSize, TSDataType dataType) {
+  protected Object[][] constructColumns(int columnSize, int fetchSize, TSDataType dataType) {
     Object[][] columns = new Object[columnSize][1];
     for (int i = 0; i < columnSize; i++) {
       switch (dataType) {
@@ -212,7 +210,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
     return columns;
   }
 
-  private void fillColumns(
+  protected void fillColumns(
       Object[][] columns,
       TSDataType dataType,
       RowRecord record,
@@ -246,7 +244,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
     }
   }
 
-  private List<PartialPath> generateTargetPaths(List<Path> rawPaths) throws IllegalPathException {
+  protected List<PartialPath> generateTargetPaths(List<Path> rawPaths) throws IllegalPathException {
     List<PartialPath> targetPaths = new ArrayList<>(rawPaths.size());
     for (Path rawPath : rawPaths) {
       targetPaths.add(new PartialPath(fillTargetPathTemplate((PartialPath) rawPath)));
@@ -254,7 +252,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
     return targetPaths;
   }
 
-  private String fillTargetPathTemplate(PartialPath rawPath) {
+  protected String fillTargetPathTemplate(PartialPath rawPath) {
     String[] nodes = rawPath.getNodes();
     int indexOfLeftBracket = nodes[0].indexOf("(");
     if (indexOfLeftBracket != -1) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreateContinuousQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreateContinuousQueryOperator.java
index 99c8705..1dca384 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreateContinuousQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/sys/CreateContinuousQueryOperator.java
@@ -37,6 +37,7 @@ public class CreateContinuousQueryOperator extends Operator {
   private long forInterval;
   private long groupByTimeInterval;
   private String groupByTimeIntervalString;
+  private Long firstExecutionTimeBoundary;
 
   public CreateContinuousQueryOperator(int tokenIntType) {
     super(tokenIntType);
@@ -79,6 +80,10 @@ public class CreateContinuousQueryOperator extends Operator {
     this.groupByTimeIntervalString = groupByTimeIntervalString;
   }
 
+  public void setFirstExecutionTimeBoundary(long firstExecutionTimeBoundary) {
+    this.firstExecutionTimeBoundary = firstExecutionTimeBoundary;
+  }
+
   public void setQueryOperator(QueryOperator queryOperator) {
     this.queryOperator = queryOperator;
   }
@@ -97,6 +102,7 @@ public class CreateContinuousQueryOperator extends Operator {
         everyInterval,
         forInterval,
         groupByTimeInterval,
-        groupByTimeIntervalString);
+        groupByTimeIntervalString,
+        firstExecutionTimeBoundary);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java
index 94575be..eed9fb7 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/CreateContinuousQueryPlan.java
@@ -41,7 +41,7 @@ public class CreateContinuousQueryPlan extends PhysicalPlan {
   private long forInterval;
   private long groupByTimeInterval;
   private String groupByTimeIntervalString;
-  private long creationTimestamp;
+  private long firstExecutionTimeBoundary;
 
   public CreateContinuousQueryPlan() {
     super(Operator.OperatorType.CREATE_CONTINUOUS_QUERY);
@@ -54,7 +54,8 @@ public class CreateContinuousQueryPlan extends PhysicalPlan {
       long everyInterval,
       long forInterval,
       long groupByTimeIntervalUnit,
-      String groupByTimeIntervalString) {
+      String groupByTimeIntervalString,
+      Long firstExecutionTimeBoundary) {
     super(Operator.OperatorType.CREATE_CONTINUOUS_QUERY);
     querySql = querySql.toLowerCase();
     this.querySql = querySql;
@@ -69,7 +70,10 @@ public class CreateContinuousQueryPlan extends PhysicalPlan {
     this.forInterval = forInterval;
     this.groupByTimeInterval = groupByTimeIntervalUnit;
     this.groupByTimeIntervalString = groupByTimeIntervalString;
-    this.creationTimestamp = DatetimeUtils.currentTime();
+    this.firstExecutionTimeBoundary =
+        firstExecutionTimeBoundary != null
+            ? firstExecutionTimeBoundary
+            : DatetimeUtils.currentTime();
   }
 
   public String getQuerySql() {
@@ -112,8 +116,8 @@ public class CreateContinuousQueryPlan extends PhysicalPlan {
     return groupByTimeIntervalString;
   }
 
-  public long getCreationTimestamp() {
-    return creationTimestamp;
+  public long getFirstExecutionTimeBoundary() {
+    return firstExecutionTimeBoundary;
   }
 
   @Override
@@ -133,7 +137,7 @@ public class CreateContinuousQueryPlan extends PhysicalPlan {
     buffer.putLong(forInterval);
     buffer.putLong(groupByTimeInterval);
     ReadWriteIOUtils.write(groupByTimeIntervalString, buffer);
-    buffer.putLong(creationTimestamp);
+    buffer.putLong(firstExecutionTimeBoundary);
   }
 
   @Override
@@ -147,6 +151,6 @@ public class CreateContinuousQueryPlan extends PhysicalPlan {
     forInterval = ReadWriteIOUtils.readLong(buffer);
     groupByTimeInterval = ReadWriteIOUtils.readLong(buffer);
     groupByTimeIntervalString = ReadWriteIOUtils.readString(buffer);
-    creationTimestamp = ReadWriteIOUtils.readLong(buffer);
+    firstExecutionTimeBoundary = ReadWriteIOUtils.readLong(buffer);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
index 9727952..bc6ac6f 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/sql/IoTDBSqlVisitor.java
@@ -576,7 +576,6 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
 
   public void parseResampleClause(
       IoTDBSqlParser.ResampleClauseContext ctx, CreateContinuousQueryOperator operator) {
-
     if (ctx.DURATION_LITERAL().size() == 1) {
       if (ctx.EVERY() != null) {
         operator.setEveryInterval(
@@ -591,6 +590,10 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
       operator.setForInterval(
           DatetimeUtils.convertDurationStrToLong(ctx.DURATION_LITERAL(1).getText()));
     }
+
+    if (ctx.BOUNDARY() != null) {
+      operator.setFirstExecutionTimeBoundary(parseDateExpression(ctx.dateExpression()));
+    }
   }
 
   // Create Snapshot for Schema
@@ -2785,7 +2788,6 @@ public class IoTDBSqlVisitor extends IoTDBSqlParserBaseVisitor<Operator> {
   private Map<String, String> extractMap(
       List<IoTDBSqlParser.PropertyClauseContext> property2,
       IoTDBSqlParser.PropertyClauseContext property3) {
-    String value;
     Map<String, String> tags = new HashMap<>(property2.size());
     if (property3 != null) {
       for (IoTDBSqlParser.PropertyClauseContext property : property2) {
diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
index a942ac7..06ced83 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/TSServiceImpl.java
@@ -319,10 +319,6 @@ public class TSServiceImpl implements TSIService.Iface {
     return resp.setStatus(status);
   }
 
-  private String getMetadataInString() {
-    return IoTDB.metaManager.getMetadataInString();
-  }
-
   protected List<MeasurementPath> getPaths(PartialPath path) throws MetadataException {
     return IoTDB.metaManager.getMeasurementPaths(path);
   }
@@ -1824,7 +1820,7 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   @Override
-  public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) throws TException {
+  public TSStatus appendSchemaTemplate(TSAppendSchemaTemplateReq req) {
     int size = req.getMeasurementsSize();
     String[] measurements = new String[size];
     TSDataType[] dataTypes = new TSDataType[size];
@@ -1846,7 +1842,7 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   @Override
-  public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) throws TException {
+  public TSStatus pruneSchemaTemplate(TSPruneSchemaTemplateReq req) {
     PruneTemplatePlan plan =
         new PruneTemplatePlan(req.getName(), Collections.singletonList(req.getPath()));
     TSStatus status = serviceProvider.checkAuthority(plan, req.getSessionId());
@@ -1854,7 +1850,7 @@ public class TSServiceImpl implements TSIService.Iface {
   }
 
   @Override
-  public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) throws TException {
+  public TSQueryTemplateResp querySchemaTemplate(TSQueryTemplateReq req) {
     try {
       TSQueryTemplateResp resp = new TSQueryTemplateResp();
       String path;