You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/25 07:04:19 UTC

[iotdb] 04/04: Implement basic task executor

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/mppCQ
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f53711e5531f8f3ce41830fa38c7ca60c9fdbb15
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 25 15:03:40 2022 +0800

    Implement basic task executor
---
 .../db/mpp/common/header/ColumnHeaderConstant.java | 11 ++++
 .../db/mpp/common/header/DatasetHeaderFactory.java |  4 ++
 .../config/executor/ClusterConfigTaskExecutor.java | 77 ++++++++++++++++++++++
 .../config/executor/IConfigTaskExecutor.java       |  8 +++
 .../executor/StandaloneConfigTaskExecutor.java     | 46 +++++++++++++
 .../config/metadata/CreateContinuousQueryTask.java |  2 +-
 .../config/metadata/DropContinuousQueryTask.java   |  2 +-
 .../config/metadata/ShowContinuousQueriesTask.java | 42 +++++++++++-
 .../iotdb/db/mpp/plan/parser/ASTVisitor.java       |  1 +
 .../metadata/CreateContinuousQueryStatement.java   | 13 ++++
 10 files changed, 203 insertions(+), 3 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
index 2ebb1705a7..7924503897 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/ColumnHeaderConstant.java
@@ -121,6 +121,11 @@ public class ColumnHeaderConstant {
   public static final String COLUMN_TARGET_TIMESERIES = "target timeseries";
   public static final String COLUMN_WRITTEN = "written";
 
+  // column names for show cq
+  public static final String COLUMN_CQ_ID = "cq id";
+  public static final String COLUMN_QUERY = "query";
+  public static final String COLUMN_STATE = "state";
+
   public static final List<ColumnHeader> lastQueryColumnHeaders =
       ImmutableList.of(
           new ColumnHeader(COLUMN_TIMESERIES, TSDataType.TEXT),
@@ -303,4 +308,10 @@ public class ColumnHeaderConstant {
 
   public static final List<ColumnHeader> getSeriesSlotListColumnHeaders =
       ImmutableList.of(new ColumnHeader(COLUMN_SERIES_SLOT_ID, TSDataType.INT32));
+
+  public static final List<ColumnHeader> showContinuousQueriesColumnHeaders =
+      ImmutableList.of(
+          new ColumnHeader(COLUMN_CQ_ID, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_QUERY, TSDataType.TEXT),
+          new ColumnHeader(COLUMN_STATE, TSDataType.TEXT));
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
index f64300890b..020f6426e1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/DatasetHeaderFactory.java
@@ -146,4 +146,8 @@ public class DatasetHeaderFactory {
         ? new DatasetHeader(ColumnHeaderConstant.selectIntoAlignByDeviceColumnHeaders, true)
         : new DatasetHeader(ColumnHeaderConstant.selectIntoColumnHeaders, true);
   }
+
+  public static DatasetHeader getShowContinuousQueriesHeader() {
+    return new DatasetHeader(ColumnHeaderConstant.showContinuousQueriesColumnHeaders, true);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 4857dbcd40..3e7e264756 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -32,10 +32,12 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
 import org.apache.iotdb.commons.trigger.service.TriggerExecutableManager;
 import org.apache.iotdb.confignode.rpc.thrift.TCountStorageGroupResp;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TCreateTriggerReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteStorageGroupsReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
+import org.apache.iotdb.confignode.rpc.thrift.TDropCQReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropFunctionReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropPipeSinkReq;
 import org.apache.iotdb.confignode.rpc.thrift.TDropTriggerReq;
@@ -52,6 +54,7 @@ import org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TPipeSinkInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TSetStorageGroupReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowCQResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowClusterResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowConfigNodesResp;
 import org.apache.iotdb.confignode.rpc.thrift.TShowDataNodesResp;
@@ -76,6 +79,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.GetTimeSlotListTas
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.SetStorageGroupTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowClusterTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowConfigNodesTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowContinuousQueriesTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowDataNodesTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowRegionTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowStorageGroupTask;
@@ -87,6 +91,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.template.ShowSchem
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
@@ -1115,4 +1120,76 @@ public class ClusterConfigTaskExecutor implements IConfigTaskExecutor {
     GetTimeSlotListTask.buildTSBlock(resp, future);
     return future;
   }
+
+  @Override
+  public SettableFuture<ConfigTaskResult> createContinuousQuery(
+      CreateContinuousQueryStatement createContinuousQueryStatement) {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      TCreateCQReq tCreateCQReq =
+          new TCreateCQReq(
+              createContinuousQueryStatement.getCqId(),
+              createContinuousQueryStatement.getEveryInterval(),
+              createContinuousQueryStatement.getBoundaryTime(),
+              createContinuousQueryStatement.getStartTimeOffset(),
+              createContinuousQueryStatement.getEndTimeOffset(),
+              createContinuousQueryStatement.getTimeoutPolicy().getType(),
+              createContinuousQueryStatement.getQueryBody(),
+              createContinuousQueryStatement.getSql(),
+              createContinuousQueryStatement.getZoneId());
+      final TSStatus executionStatus = client.createCQ(tCreateCQReq);
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+        LOGGER.error(
+            "[{}] Failed to create continuous query {}. TSStatus is {}",
+            executionStatus,
+            createContinuousQueryStatement.getCqId(),
+            executionStatus.message);
+        future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (TException | IOException e) {
+      future.setException(e);
+    }
+    return future;
+  }
+
+  @Override
+  public SettableFuture<ConfigTaskResult> dropContinuousQuery(String cqId) {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      final TSStatus executionStatus = client.dropCQ(new TDropCQReq(cqId));
+      if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != executionStatus.getCode()) {
+        LOGGER.error("[{}] Failed to drop continuous query {}.", executionStatus, cqId);
+        future.setException(new IoTDBException(executionStatus.message, executionStatus.code));
+      } else {
+        future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+      }
+    } catch (TException | IOException e) {
+      future.setException(e);
+    }
+    return future;
+  }
+
+  @Override
+  public SettableFuture<ConfigTaskResult> showContinuousQueries() {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try (ConfigNodeClient client =
+        CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.partitionRegionId)) {
+      TShowCQResp showCQResp = client.showCQ();
+      if (showCQResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        future.setException(
+            new IoTDBException(showCQResp.getStatus().message, showCQResp.getStatus().code));
+        return future;
+      }
+      // convert cqList and buildTsBlock
+      ShowContinuousQueriesTask.buildTsBlock(showCQResp.getCqList(), future);
+    } catch (TException | IOException e) {
+      future.setException(e);
+    }
+
+    return future;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
index 9cbae3e4f6..b8eaa69050 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TFlushReq;
 import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
@@ -141,4 +142,11 @@ public interface IConfigTaskExecutor {
 
   SettableFuture<ConfigTaskResult> getTimeSlotList(
       GetTimeSlotListStatement getTimeSlotListStatement);
+
+  SettableFuture<ConfigTaskResult> createContinuousQuery(
+      CreateContinuousQueryStatement createContinuousQueryStatement);
+
+  SettableFuture<ConfigTaskResult> dropContinuousQuery(String cqId);
+
+  SettableFuture<ConfigTaskResult> showContinuousQueries();
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
index a934b815c7..af40b74604 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/executor/StandaloneConfigTaskExecutor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowTTLTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeSinkTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.sys.sync.ShowPipeTask;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
@@ -620,4 +621,49 @@ public class StandaloneConfigTaskExecutor implements IConfigTaskExecutor {
             TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
     return future;
   }
+
+  @Override
+  public SettableFuture<ConfigTaskResult> createContinuousQuery(
+      CreateContinuousQueryStatement createContinuousQueryStatement) {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try {
+      // todo: implementation
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    } catch (Exception e) {
+      final String message =
+          String.format(
+              "Failed to create continuous query %s, because %s.",
+              createContinuousQueryStatement.getCqId(), e.getMessage());
+      LOGGER.error(message, e);
+      future.setException(
+          new IoTDBException(message, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    }
+    return future;
+  }
+
+  @Override
+  public SettableFuture<ConfigTaskResult> dropContinuousQuery(String cqId) {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    try {
+      // todo: implementation
+      future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+    } catch (Exception e) {
+      final String message =
+          String.format("Failed to continuous query trigger %s, because %s.", cqId, e.getMessage());
+      LOGGER.error(message, e);
+      future.setException(
+          new IoTDBException(message, TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    }
+    return future;
+  }
+
+  @Override
+  public SettableFuture<ConfigTaskResult> showContinuousQueries() {
+    SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+    future.setException(
+        new IoTDBException(
+            "Executing show continuous queries in standalone mode is not supported",
+            TSStatusCode.EXECUTE_STATEMENT_ERROR.getStatusCode()));
+    return future;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
index c7e8617e44..060162e731 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
@@ -37,6 +37,6 @@ public class CreateContinuousQueryTask implements IConfigTask {
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
       throws InterruptedException {
-    return null;
+    return configTaskExecutor.createContinuousQuery(createContinuousQueryStatement);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
index a0c5a953f5..17a76a56fa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
@@ -37,6 +37,6 @@ public class DropContinuousQueryTask implements IConfigTask {
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
       throws InterruptedException {
-    return null;
+    return configTaskExecutor.dropContinuousQuery(cqId);
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
index 249f0dfd3a..deb4fb10e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
@@ -19,17 +19,57 @@
 
 package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
 
+import org.apache.iotdb.commons.cq.CQState;
+import org.apache.iotdb.confignode.rpc.thrift.TCQEntry;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeaderConstant;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeaderFactory;
 import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
 import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
 
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+
+import java.util.List;
+import java.util.stream.Collectors;
 
 public class ShowContinuousQueriesTask implements IConfigTask {
 
   @Override
   public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
       throws InterruptedException {
-    return null;
+    return configTaskExecutor.showContinuousQueries();
+  }
+
+  public static void buildTsBlock(List<TCQEntry> cqList, SettableFuture<ConfigTaskResult> future) {
+    List<TSDataType> outputDataTypes =
+        ColumnHeaderConstant.showContinuousQueriesColumnHeaders.stream()
+            .map(ColumnHeader::getColumnType)
+            .collect(Collectors.toList());
+    TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
+
+    if (cqList != null && !cqList.isEmpty()) {
+      TimeColumnBuilder timeColumnBuilder = builder.getTimeColumnBuilder();
+      ColumnBuilder[] columnBuilders = builder.getValueColumnBuilders();
+      for (TCQEntry cqEntry : cqList) {
+        timeColumnBuilder.writeLong(0);
+        columnBuilders[0].writeBinary(Binary.valueOf(cqEntry.getCqId()));
+        columnBuilders[1].writeBinary(Binary.valueOf(cqEntry.getSql()));
+        columnBuilders[2].writeBinary(
+            Binary.valueOf(CQState.values()[cqEntry.getState()].toString()));
+        builder.declarePosition();
+      }
+    }
+
+    DatasetHeader datasetHeader = DatasetHeaderFactory.getShowContinuousQueriesHeader();
+    future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 2ba49c7ed7..767c37fccb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -814,6 +814,7 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
   @Override
   public Statement visitCreateContinuousQuery(IoTDBSqlParser.CreateContinuousQueryContext ctx) {
     CreateContinuousQueryStatement statement = new CreateContinuousQueryStatement();
+    statement.setSql(ctx.getText());
     statement.setCqId(parseIdentifier(ctx.cqId.getText()));
 
     statement.setQueryBody(ctx.selectStatement().getText());
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
index 9ff67cc071..7857fd3685 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
@@ -33,6 +33,7 @@ import java.util.List;
 
 public class CreateContinuousQueryStatement extends Statement implements IConfigStatement {
 
+  private String sql;
   private String cqId;
 
   // The query execution time interval, default value is group_by_interval in group by clause.
@@ -59,6 +60,14 @@ public class CreateContinuousQueryStatement extends Statement implements IConfig
     statementType = StatementType.CREATE_CONTINUOUS_QUERY;
   }
 
+  public String getSql() {
+    return sql;
+  }
+
+  public void setSql(String sql) {
+    this.sql = sql;
+  }
+
   public String getCqId() {
     return cqId;
   }
@@ -123,6 +132,10 @@ public class CreateContinuousQueryStatement extends Statement implements IConfig
     this.queryBodyStatement = queryBodyStatement;
   }
 
+  public String getZoneId() {
+    return queryBodyStatement.getSelectComponent().getZoneId().getId();
+  }
+
   @Override
   public QueryType getQueryType() {
     return QueryType.WRITE;