You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by hu...@apache.org on 2022/10/25 07:04:18 UTC
[iotdb] 03/04: add cq related tasks
This is an automated email from the ASF dual-hosted git repository.
hui pushed a commit to branch lmh/mppCQ
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 15f2f82f3bdc3ebc8ab3d4e089e1028a21ed3cc8
Author: Minghui Liu <li...@foxmail.com>
AuthorDate: Tue Oct 25 11:10:48 2022 +0800
add cq related tasks
---
.../plan/execution/config/ConfigTaskVisitor.java | 24 +++++++++++++
.../config/metadata/CreateContinuousQueryTask.java | 42 ++++++++++++++++++++++
.../config/metadata/DropContinuousQueryTask.java} | 30 +++++++---------
.../metadata/ShowContinuousQueriesTask.java} | 29 +++++----------
.../db/mpp/plan/statement/StatementVisitor.java | 18 ++++++++++
.../metadata/CreateContinuousQueryStatement.java | 6 ++++
.../metadata/DropContinuousQueryStatement.java | 6 ++++
.../metadata/ShowContinuousQueriesStatement.java | 6 ++++
8 files changed, 124 insertions(+), 37 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
index 7fc68c0d46..92c29a5f4a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ConfigTaskVisitor.java
@@ -20,10 +20,12 @@
package org.apache.iotdb.db.mpp.plan.execution.config;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CountStorageGroupTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateContinuousQueryTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateFunctionTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.CreateTriggerTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DeleteStorageGroupTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DeleteTimeSeriesTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropContinuousQueryTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropFunctionTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.DropTriggerTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.GetRegionIdTask;
@@ -33,6 +35,7 @@ import org.apache.iotdb.db.mpp.plan.execution.config.metadata.SetStorageGroupTas
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.SetTTLTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowClusterTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowConfigNodesTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowContinuousQueriesTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowDataNodesTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowFunctionsTask;
import org.apache.iotdb.db.mpp.plan.execution.config.metadata.ShowRegionTask;
@@ -63,10 +66,12 @@ import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
@@ -76,6 +81,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.SetStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.SetTTLStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowConfigNodesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowContinuousQueriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowFunctionsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowRegionStatement;
@@ -344,6 +350,24 @@ public class ConfigTaskVisitor
return new GetTimeSlotListTask(getTimeSlotListStatement);
}
+ @Override
+ public IConfigTask visitCreateContinuousQuery(
+ CreateContinuousQueryStatement createContinuousQueryStatement, TaskContext context) {
+ return new CreateContinuousQueryTask(createContinuousQueryStatement);
+ }
+
+ @Override
+ public IConfigTask visitDropContinuousQuery(
+ DropContinuousQueryStatement dropContinuousQueryStatement, TaskContext context) {
+ return new DropContinuousQueryTask(dropContinuousQueryStatement);
+ }
+
+ @Override
+ public IConfigTask visitShowContinuousQueries(
+ ShowContinuousQueriesStatement showContinuousQueriesStatement, TaskContext context) {
+ return new ShowContinuousQueriesTask();
+ }
+
public static class TaskContext {
private final String queryId;
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
new file mode 100644
index 0000000000..c7e8617e44
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/CreateContinuousQueryTask.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
+
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class CreateContinuousQueryTask implements IConfigTask {
+
+ private final CreateContinuousQueryStatement createContinuousQueryStatement;
+
+ public CreateContinuousQueryTask(CreateContinuousQueryStatement createContinuousQueryStatement) {
+ this.createContinuousQueryStatement = createContinuousQueryStatement;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+ throws InterruptedException {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
similarity index 50%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
index 94edcdd8d5..a0c5a953f5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/DropContinuousQueryTask.java
@@ -17,30 +17,26 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.statement.metadata;
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.constant.StatementType;
-import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.util.concurrent.ListenableFuture;
-public class ShowContinuousQueriesStatement extends ShowStatement implements IConfigStatement {
+public class DropContinuousQueryTask implements IConfigTask {
- public ShowContinuousQueriesStatement() {
- super();
- statementType = StatementType.SHOW_CONTINUOUS_QUERIES;
- }
+ private final String cqId;
- @Override
- public QueryType getQueryType() {
- return QueryType.READ;
+ public DropContinuousQueryTask(DropContinuousQueryStatement dropContinuousQueryStatement) {
+ this.cqId = dropContinuousQueryStatement.getCqId();
}
@Override
- public List<PartialPath> getPaths() {
- return Collections.emptyList();
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+ throws InterruptedException {
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
similarity index 53%
copy from server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
copy to server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
index 94edcdd8d5..249f0dfd3a 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/metadata/ShowContinuousQueriesTask.java
@@ -17,30 +17,19 @@
* under the License.
*/
-package org.apache.iotdb.db.mpp.plan.statement.metadata;
+package org.apache.iotdb.db.mpp.plan.execution.config.metadata;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
-import org.apache.iotdb.db.mpp.plan.constant.StatementType;
-import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.mpp.plan.execution.config.IConfigTask;
+import org.apache.iotdb.db.mpp.plan.execution.config.executor.IConfigTaskExecutor;
-import java.util.Collections;
-import java.util.List;
+import com.google.common.util.concurrent.ListenableFuture;
-public class ShowContinuousQueriesStatement extends ShowStatement implements IConfigStatement {
-
- public ShowContinuousQueriesStatement() {
- super();
- statementType = StatementType.SHOW_CONTINUOUS_QUERIES;
- }
-
- @Override
- public QueryType getQueryType() {
- return QueryType.READ;
- }
+public class ShowContinuousQueriesTask implements IConfigTask {
@Override
- public List<PartialPath> getPaths() {
- return Collections.emptyList();
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor configTaskExecutor)
+ throws InterruptedException {
+ return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
index be99227d27..2559f09c4f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/StatementVisitor.java
@@ -37,12 +37,14 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.CountNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CountTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateAlignedTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateFunctionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateMultiTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTimeSeriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.CreateTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteStorageGroupStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DeleteTimeSeriesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.DropContinuousQueryStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropFunctionStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.DropTriggerStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.GetRegionIdStatement;
@@ -54,6 +56,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowChildPathsStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowClusterStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowConfigNodesStatement;
+import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowContinuousQueriesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDataNodesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowDevicesStatement;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowFunctionsStatement;
@@ -409,4 +412,19 @@ public abstract class StatementVisitor<R, C> {
public R visitGetTimeSlotList(GetTimeSlotListStatement getTimeSlotListStatement, C context) {
return visitStatement(getTimeSlotListStatement, context);
}
+
+ public R visitCreateContinuousQuery(
+ CreateContinuousQueryStatement createContinuousQueryStatement, C context) {
+ return visitStatement(createContinuousQueryStatement, context);
+ }
+
+ public R visitDropContinuousQuery(
+ DropContinuousQueryStatement dropContinuousQueryStatement, C context) {
+ return visitStatement(dropContinuousQueryStatement, context);
+ }
+
+ public R visitShowContinuousQueries(
+ ShowContinuousQueriesStatement showContinuousQueriesStatement, C context) {
+ return visitStatement(showContinuousQueriesStatement, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
index fd6120b6da..9ff67cc071 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/CreateContinuousQueryStatement.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import java.util.Collections;
@@ -131,4 +132,9 @@ public class CreateContinuousQueryStatement extends Statement implements IConfig
public List<? extends PartialPath> getPaths() {
return Collections.emptyList();
}
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitCreateContinuousQuery(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropContinuousQueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropContinuousQueryStatement.java
index fe35672421..85bc2382e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropContinuousQueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/DropContinuousQueryStatement.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import java.util.Collections;
import java.util.List;
@@ -51,4 +52,9 @@ public class DropContinuousQueryStatement extends Statement implements IConfigSt
public List<? extends PartialPath> getPaths() {
return Collections.emptyList();
}
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitDropContinuousQuery(this, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
index 94edcdd8d5..413a4c88de 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/ShowContinuousQueriesStatement.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.mpp.plan.analyze.QueryType;
import org.apache.iotdb.db.mpp.plan.constant.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
import java.util.Collections;
import java.util.List;
@@ -43,4 +44,9 @@ public class ShowContinuousQueriesStatement extends ShowStatement implements ICo
public List<PartialPath> getPaths() {
return Collections.emptyList();
}
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitShowContinuousQueries(this, context);
+ }
}