You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/25 07:04:18 UTC

[iotdb] 03/04: add cq related tasks

This is an automated email from the ASF dual-hosted git repository.

hui pushed a commit to branch lmh/mppCQ
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 15f2f82f3bdc3ebc8ab3d4e089e1028a21ed3cc8
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 25 11:10:48 2022 +0800

    add cq related tasks
---
 .../plan/execution/config/ConfigTaskVisitor.java   | 24 +++++++++++++
 .../config/metadata/CreateContinuousQueryTask.java | 42 ++++++++++++++++++++++
 .../config/metadata/DropContinuousQueryTask.java}  | 30 +++++++---------
 .../metadata/ShowContinuousQueriesTask.java}       | 29 +++++----------
 .../db/mpp/plan/statement/StatementVisitor.java    | 18 ++++++++++
 .../metadata/CreateContinuousQueryStatement.java   |  6 ++++
 .../metadata/DropContinuousQueryStatement.java     |  6 ++++
 .../metadata/ShowContinuousQueriesStatement.java   |  6 ++++
 8 files changed, 124 insertions(+), 37 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 7fc68c0d46..92c29a5f4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -20,10 +20,12 @@
 package org.apache.iotdb.db.mpp.plan.execution.config;
 
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CountStorageGroupTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateContinuousQueryTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateFunctionTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateTriggerTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DeleteStorageGroupTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DeleteTimeSeriesTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropContinuousQueryTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropFunctionTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropTriggerTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.GetRegionIdTask;
@@ -33,6 +35,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.SetStorageGroupTas
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.SetTTLTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowClusterTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowConfigNodesTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowContinuousQueriesTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowDataNodesTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowFunctionsTask;
 import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowRegionTask;
@@ -63,10 +66,12 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
 import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
 import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
@@ -76,6 +81,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowConfigNodesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowContinuousQueriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowFunctionsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement;
@@ -344,6 +350,24 @@ public class ConfigTaskVisitor
     return new GetTimeSlotListTask(getTimeSlotListStatement);
   }
 
+  @Override
+  public IConfigTask visitCreateContinuousQuery(
+      CreateContinuousQueryStatement createContinuousQueryStatement, TaskContext context) {
+    return new CreateContinuousQueryTask(createContinuousQueryStatement);
+  }
+
+  @Override
+  public IConfigTask visitDropContinuousQuery(
+      DropContinuousQueryStatement dropContinuousQueryStatement, TaskContext context) {
+    return new DropContinuousQueryTask(dropContinuousQueryStatement);
+  }
+
+  @Override
+  public IConfigTask visitShowContinuousQueries(
+      ShowContinuousQueriesStatement showContinuousQueriesStatement, TaskContext context) {
+    return new ShowContinuousQueriesTask();
+  }
+
   public static class TaskContext {
 
     private final String queryId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
new file mode 100644
index 0000000000..c7e8617e44
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class CreateContinuousQueryTask implements IConfigTask {
+
+  private final CreateContinuousQueryStatement createContinuousQueryStatement;
+
+  public CreateContinuousQueryTask(CreateContinuousQueryStatement createContinuousQueryStatement) {
+    this.createContinuousQueryStatement = createContinuousQueryStatement;
+  }
+
+  @Override
+  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+      throws InterruptedException {
+    return null;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
index 94edcdd8d5..a0c5a953f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
@@ -17,30 +17,26 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.statement.metadata;
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
 
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.constant.StatementType;
-import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
 
-import java.util.Collections;
-import java.util.List;
+import com.google.common.util.concurrent.ListenableFuture;
 
-public class ShowContinuousQueriesStatement extends ShowStatement implements IConfigStatement {
+public class DropContinuousQueryTask implements IConfigTask {
 
-  public ShowContinuousQueriesStatement() {
-    super();
-    statementType = StatementType.SHOW_CONTINUOUS_QUERIES;
-  }
+  private final String cqId;
 
-  @Override
-  public QueryType getQueryType() {
-    return QueryType.READ;
+  public DropContinuousQueryTask(DropContinuousQueryStatement dropContinuousQueryStatement) {
+    this.cqId = dropContinuousQueryStatement.getCqId();
   }
 
   @Override
-  public List<PartialPath> getPaths() {
-    return Collections.emptyList();
+  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+      throws InterruptedException {
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
similarity index 53%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
index 94edcdd8d5..249f0dfd3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
@@ -17,30 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.mpp.plan.statement.metadata;
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
 
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.constant.StatementType;
-import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
 
-import java.util.Collections;
-import java.util.List;
+import com.google.common.util.concurrent.ListenableFuture;
 
-public class ShowContinuousQueriesStatement extends ShowStatement implements IConfigStatement {
-
-  public ShowContinuousQueriesStatement() {
-    super();
-    statementType = StatementType.SHOW_CONTINUOUS_QUERIES;
-  }
-
-  @Override
-  public QueryType getQueryType() {
-    return QueryType.READ;
-  }
+public class ShowContinuousQueriesTask implements IConfigTask {
 
   @Override
-  public List<PartialPath> getPaths() {
-    return Collections.emptyList();
+  public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+      throws InterruptedException {
+    return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index be99227d27..2559f09c4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -37,12 +37,14 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountNodesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
@@ -54,6 +56,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowConfigNodesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowContinuousQueriesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
 import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowFunctionsStatement;
@@ -409,4 +412,19 @@ public abstract class StatementVisitor<R, C> {
   public R visitGetTimeSlotList(GetTimeSlotListStatement getTimeSlotListStatement, C context) {
     return visitStatement(getTimeSlotListStatement, context);
   }
+
+  public R visitCreateContinuousQuery(
+      CreateContinuousQueryStatement createContinuousQueryStatement, C context) {
+    return visitStatement(createContinuousQueryStatement, context);
+  }
+
+  public R visitDropContinuousQuery(
+      DropContinuousQueryStatement dropContinuousQueryStatement, C context) {
+    return visitStatement(dropContinuousQueryStatement, context);
+  }
+
+  public R visitShowContinuousQueries(
+      ShowContinuousQueriesStatement showContinuousQueriesStatement, C context) {
+    return visitStatement(showContinuousQueriesStatement, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
index fd6120b6da..9ff67cc071 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
 
 import java.util.Collections;
@@ -131,4 +132,9 @@ public class CreateContinuousQueryStatement extends Statement implements IConfig
   public List<? extends PartialPath> getPaths() {
     return Collections.emptyList();
   }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitCreateContinuousQuery(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropContinuousQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropContinuousQueryStatement.java
index fe35672421..85bc2382e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropContinuousQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropContinuousQueryStatement.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
 import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 
 import java.util.Collections;
 import java.util.List;
@@ -51,4 +52,9 @@ public class DropContinuousQueryStatement extends Statement implements IConfigSt
   public List<? extends PartialPath> getPaths() {
     return Collections.emptyList();
   }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitDropContinuousQuery(this, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
index 94edcdd8d5..413a4c88de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
 import org.apache.iotdb.db.mpp.plan.constant.StatementType;
 import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
 
 import java.util.Collections;
 import java.util.List;
@@ -43,4 +44,9 @@ public class ShowContinuousQueriesStatement extends ShowStatement implements ICo
   public List<PartialPath> getPaths() {
     return Collections.emptyList();
   }
+
+  @Override
+  public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+    return visitor.visitShowContinuousQueries(this, context);
+  }
 }