You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/25 07:04:19 UTC
[iotdb] 04/04: Implement basic task executor
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/mppCQ
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f53711e5531f8f3ce41830fa38c7ca60c9fdbb15
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 25 15:03:40 2022 +0800
Implement basic task executor
---
.../db/mpp/common/header/ColumnHeaderConstant.java | 11 ++++
.../db/mpp/common/header/DatasetHeaderFactory.java | 4 ++
.../config/executor/ClusterConfigTaskExecutor.java | 77 ++++++++++++++++++++++
.../config/executor/IConfigTaskExecutor.java | 8 +++
.../executor/StandaloneConfigTaskExecutor.java | 46 +++++++++++++
.../config/metadata/CreateContinuousQueryTask.java | 2 +-
.../config/metadata/DropContinuousQueryTask.java | 2 +-
.../config/metadata/ShowContinuousQueriesTask.java | 42 +++++++++++-
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 1 +
.../metadata/CreateContinuousQueryStatement.java | 13 ++++
10 files changed, 203 insertions(+), 3 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index 2ebb1705a7..7924503897 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -121,6 +121,11 @@ public class ColumnHeaderConstant {
public static final String COLUMN_TARGET_TIMESERIES = "target timeseries";
public static final String COLUMN_WRITTEN = "written";
+ // column names for show cq
+ public static final String COLUMN_CQ_ID = "cq id";
+ public static final String COLUMN_QUERY = "query";
+ public static final String COLUMN_STATE = "state";
+
public static final List<ColumnHeader> lastQueryColumnHeaders =
ImmutableList.of(
new ColumnHeader(COLUMN_TIMESERIES, TSDataType.TEXT),
@@ -303,4 +308,10 @@ public class ColumnHeaderConstant {
public static final List<ColumnHeader> getSeriesSlotListColumnHeaders =
ImmutableList.of(new ColumnHeader(COLUMN_SERIES_SLOT_ID, TSDataType.INT32));
+
+ public static final List<ColumnHeader> showContinuousQueriesColumnHeaders =
+ ImmutableList.of(
+ new ColumnHeader(COLUMN_CQ_ID, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_QUERY, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_STATE, TSDataType.TEXT));
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index f64300890b..020f6426e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -146,4 +146,8 @@ public class DatasetHeaderFactory {
? new DatasetHeader(ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders, true)
: new DatasetHeader(ColumnHeaderConstant.selectIntoColumnHeaders, true);
}
+
+ public static DatasetHeader getShowContinuousQueriesHeader() {
+ return new DatasetHeader(ColumnHeaderConstant.showContinuousQueriesColumnHeaders, true);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 4857dbcd40..3e7e264756 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -32,10 +32,12 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
@@ -52,6 +54,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
@@ -76,6 +79,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.GetTimeSlotListTas
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.SetStorageGroupTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowClusterTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowConfigNodesTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowContinuousQueriesTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowDataNodesTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowRegionTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowStorageGroupTask;
@@ -87,6 +91,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowSchem
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
@@ -1115,4 +1120,76 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
GetTimeSlotListTask.buildTSBlock(resp, future);
return future;
}
+
+ @Override
+ public SettableFuture<ConfigTaskResult> createContinuousQuery(
+ CreateContinuousQueryStatement createContinuousQueryStatement) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ TCreateCQReq tCreateCQReq =
+ new TCreateCQReq(
+ createContinuousQueryStatement.getCqId(),
+ createContinuousQueryStatement.getEveryInterval(),
+ createContinuousQueryStatement.getBoundaryTime(),
+ createContinuousQueryStatement.getStartTimeOffset(),
+ createContinuousQueryStatement.getEndTimeOffset(),
+ createContinuousQueryStatement.getTimeoutPolicy().getType(),
+ createContinuousQueryStatement.getQueryBody(),
+ createContinuousQueryStatement.getSql(),
+ createContinuousQueryStatement.getZoneId());
+ final TSStatus executionStatus = client.createCQ(tCreateCQReq);
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+ LOGGER.error(
+ "[{}] Failed to create continuous query {}. TSStatus is {}",
+ executionStatus,
+ createContinuousQueryStatement.getCqId(),
+ executionStatus.message);
+ future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (TException | IOException e) {
+ future.setException(e);
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> dropContinuousQuery(String cqId) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ final TSStatus executionStatus = client.dropCQ(new TDropCQReq(cqId));
+ if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+ LOGGER.error("[{}] Failed to drop continuous query {}.", executionStatus, cqId);
+ future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+ } else {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ }
+ } catch (TException | IOException e) {
+ future.setException(e);
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> showContinuousQueries() {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try (ConfigNodeClient client =
+ CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+ TShowCQResp showCQResp = client.showCQ();
+ if (showCQResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.setException(
+ new IoTDBException(showCQResp.getStatus().message, showCQResp.getStatus().code));
+ return future;
+ }
+ // convert cqList and buildTsBlock
+ ShowContinuousQueriesTask.buildTsBlock(showCQResp.getCqList(), future);
+ } catch (TException | IOException e) {
+ future.setException(e);
+ }
+
+ return future;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 9cbae3e4f6..b8eaa69050 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
@@ -141,4 +142,11 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> getTimeSlotList(
GetTimeSlotListStatement getTimeSlotListStatement);
+
+ SettableFuture<ConfigTaskResult> createContinuousQuery(
+ CreateContinuousQueryStatement createContinuousQueryStatement);
+
+ SettableFuture<ConfigTaskResult> dropContinuousQuery(String cqId);
+
+ SettableFuture<ConfigTaskResult> showContinuousQueries();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index a934b815c7..af40b74604 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTTLTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
@@ -620,4 +621,49 @@ public class StandaloneConfigTaskExecutor implements IConfigTaskExecutor {
TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
return future;
}
+
+ @Override
+ public SettableFuture<ConfigTaskResult> createContinuousQuery(
+ CreateContinuousQueryStatement createContinuousQueryStatement) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ // todo: implementation
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } catch (Exception e) {
+ final String message =
+ String.format(
+ "Failed to create continuous query %s, because %s.",
+ createContinuousQueryStatement.getCqId(), e.getMessage());
+ LOGGER.error(message, e);
+ future.setException(
+ new IoTDBException(message, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> dropContinuousQuery(String cqId) {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ try {
+ // todo: implementation
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } catch (Exception e) {
+ final String message =
+ String.format("Failed to continuous query trigger %s, because %s.", cqId, e.getMessage());
+ LOGGER.error(message, e);
+ future.setException(
+ new IoTDBException(message, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ }
+ return future;
+ }
+
+ @Override
+ public SettableFuture<ConfigTaskResult> showContinuousQueries() {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ future.setException(
+ new IoTDBException(
+ "Executing show continuous queries in standalone mode is not supported",
+ TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+ return future;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
index c7e8617e44..060162e731 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
@@ -37,6 +37,6 @@ public class CreateContinuousQueryTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
throws InterruptedException {
- return null;
+ return configTaskExecutor.createContinuousQuery(createContinuousQueryStatement);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
index a0c5a953f5..17a76a56fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
@@ -37,6 +37,6 @@ public class DropContinuousQueryTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
throws InterruptedException {
- return null;
+ return configTaskExecutor.dropContinuousQuery(cqId);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
index 249f0dfd3a..deb4fb10e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
@@ -19,17 +19,57 @@
package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
+import org.apache.iotdb.commons.cq.CQState;
+import org.apache.iotdb.confignode.rpc.thrift.TCQEntry;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.List;
+import java.util.stream.Collectors;
public class ShowContinuousQueriesTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
throws InterruptedException {
- return null;
+ return configTaskExecutor.showContinuousQueries();
+ }
+
+ public static void buildTsBlock(List<TCQEntry> cqList, SettableFuture<ConfigTaskResult> future) {
+ List<TSDataType> outputDataTypes =
+ ColumnHeaderConstant.showContinuousQueriesColumnHeaders.stream()
+ .map(ColumnHeader::getColumnType)
+ .collect(Collectors.toList());
+ TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+
+ if (cqList != null && !cqList.isEmpty()) {
+ TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+ ColumnBuilder[] columnBuilders = builder.getValueColumnBuilders();
+ for (TCQEntry cqEntry : cqList) {
+ timeColumnBuilder.writeLong(0);
+ columnBuilders[0].writeBinary(Binary.valueOf(cqEntry.getCqId()));
+ columnBuilders[1].writeBinary(Binary.valueOf(cqEntry.getSql()));
+ columnBuilders[2].writeBinary(
+ Binary.valueOf(CQState.values()[cqEntry.getState()].toString()));
+ builder.declarePosition();
+ }
+ }
+
+ DatasetHeader datasetHeader = DatasetHeaderFactory.getShowContinuousQueriesHeader();
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 2ba49c7ed7..767c37fccb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -814,6 +814,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
@Override
public Statement visitCreateContinuousQuery(IoTDBSqlParser.CreateContinuousQueryContext ctx) {
CreateContinuousQueryStatement statement = new CreateContinuousQueryStatement();
+ statement.setSql(ctx.getText());
statement.setCqId(parseIdentifier(ctx.cqId.getText()));
statement.setQueryBody(ctx.selectStatement().getText());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
index 9ff67cc071..7857fd3685 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
@@ -33,6 +33,7 @@ import java.util.List;
public class CreateContinuousQueryStatement extends Statement implements IConfigStatement {
+ private String sql;
private String cqId;
// The query execution time interval, default value is group_by_interval in group by clause.
@@ -59,6 +60,14 @@ public class CreateContinuousQueryStatement extends Statement implements IConfig
statementType = StatementType.CREATE_CONTINUOUS_QUERY;
}
+ public String getSql() {
+ return sql;
+ }
+
+ public void setSql(String sql) {
+ this.sql = sql;
+ }
+
public String getCqId() {
return cqId;
}
@@ -123,6 +132,10 @@ public class CreateContinuousQueryStatement extends Statement implements IConfig
this.queryBodyStatement = queryBodyStatement;
}
+ public String getZoneId() {
+ return queryBodyStatement.getSelectComponent().getZoneId().getId();
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;