You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/04/28 00:51:24 UTC
[iotdb] branch master updated: [IOTDB-3019] Feature/show storage group (#5687)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b4611ed823 [IOTDB-3019] Feature/show storage group (#5687)
b4611ed823 is described below
commit b4611ed82319f4d5918e2f8bae4fd2ffc03822df
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Thu Apr 28 08:51:18 2022 +0800
[IOTDB-3019] Feature/show storage group (#5687)
---
.../db/auth/authorizer/ClusterAuthorizer.java | 4 +-
.../apache/iotdb/db/mpp/execution/Coordinator.java | 6 +-
.../db/mpp/execution/config/ConfigExecution.java | 8 +-
.../db/mpp/execution/config/ConfigTaskResult.java | 10 +-
.../db/mpp/execution/config/ConfigTaskVisitor.java | 9 ++
.../mpp/execution/config/SetStorageGroupTask.java | 4 +
.../mpp/execution/config/ShowStorageGroupTask.java | 113 +++++++++++++++++++++
...{ConfigStatement.java => IConfigStatement.java} | 4 +-
.../metadata/SetStorageGroupStatement.java | 5 +-
.../metadata/ShowStorageGroupStatement.java | 9 +-
.../db/mpp/sql/statement/sys/AuthorStatement.java | 5 +-
.../db/mpp/execution/ConfigExecutionTest.java | 11 +-
12 files changed, 170 insertions(+), 18 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/auth/authorizer/ClusterAuthorizer.java b/server/src/main/java/org/apache/iotdb/db/auth/authorizer/ClusterAuthorizer.java
index ed628258a3..85ed49a992 100644
--- a/server/src/main/java/org/apache/iotdb/db/auth/authorizer/ClusterAuthorizer.java
+++ b/server/src/main/java/org/apache/iotdb/db/auth/authorizer/ClusterAuthorizer.java
@@ -94,8 +94,8 @@ public class ClusterAuthorizer {
authorizerResp.getStatus());
future.setException(new StatementExecutionException(authorizerResp.getStatus()));
} else {
- // TODO: Construct tsBlock
- future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock));
+ // TODO: Construct result
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock, null));
}
} catch (IoTDBConnectionException | BadNodeUrlException e) {
LOGGER.error("Failed to connect to config node.");
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
index 5d34edd441..7e0c78b268 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/Coordinator.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
import org.apache.iotdb.db.mpp.sql.analyze.IPartitionFetcher;
import org.apache.iotdb.db.mpp.sql.analyze.ISchemaFetcher;
-import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
+import org.apache.iotdb.db.mpp.sql.statement.IConfigStatement;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.slf4j.Logger;
@@ -87,8 +87,8 @@ public class Coordinator {
MPPQueryContext queryContext,
IPartitionFetcher partitionFetcher,
ISchemaFetcher schemaFetcher) {
- if (statement instanceof ConfigStatement) {
- queryContext.setQueryType(((ConfigStatement) statement).getQueryType());
+ if (statement instanceof IConfigStatement) {
+ queryContext.setQueryType(((IConfigStatement) statement).getQueryType());
return new ConfigExecution(queryContext, statement, executor);
}
return new QueryExecution(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
index fcb955010f..6ed015abf7 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigExecution.java
@@ -49,6 +49,7 @@ public class ConfigExecution implements IQueryExecution {
private final QueryStateMachine stateMachine;
private final SettableFuture<ConfigTaskResult> taskFuture;
private TsBlock resultSet;
+ private DatasetHeader datasetHeader;
private boolean resultSetConsumed;
private final IConfigTask task;
@@ -115,6 +116,7 @@ public class ConfigExecution implements IQueryExecution {
ConfigTaskResult taskResult = taskFuture.get();
TSStatusCode statusCode = taskResult.getStatusCode();
resultSet = taskResult.getResultSet();
+ datasetHeader = taskResult.getResultSetHeader();
String message =
statusCode == TSStatusCode.SUCCESS_STATUS ? "" : stateMachine.getFailureMessage();
return new ExecutionResult(context.getQueryId(), RpcUtils.getStatus(statusCode, message));
@@ -144,14 +146,12 @@ public class ConfigExecution implements IQueryExecution {
@Override
public int getOutputValueColumnCount() {
- // TODO
- return 0;
+ return datasetHeader.getColumnHeaders().size();
}
@Override
public DatasetHeader getDatasetHeader() {
- // TODO
- return null;
+ return datasetHeader;
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
index 7a8248b50e..ae6bd6bccb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskResult.java
@@ -19,20 +19,24 @@
package org.apache.iotdb.db.mpp.execution.config;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
public class ConfigTaskResult {
private TSStatusCode statusCode;
private TsBlock resultSet;
+ private DatasetHeader resultSetHeader;
public ConfigTaskResult(TSStatusCode statusCode) {
this.statusCode = statusCode;
}
- public ConfigTaskResult(TSStatusCode statusCode, TsBlock resultSet) {
+ public ConfigTaskResult(
+ TSStatusCode statusCode, TsBlock resultSet, DatasetHeader resultSetHeader) {
this.statusCode = statusCode;
this.resultSet = resultSet;
+ this.resultSetHeader = resultSetHeader;
}
public TSStatusCode getStatusCode() {
@@ -50,4 +54,8 @@ public class ConfigTaskResult {
public void setResultSet(TsBlock resultSet) {
this.resultSet = resultSet;
}
+
+ public DatasetHeader getResultSetHeader() {
+ return resultSetHeader;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
index 7b06744fdb..6acef97ea9 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ConfigTaskVisitor.java
@@ -22,20 +22,29 @@ package org.apache.iotdb.db.mpp.execution.config;
import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.mpp.sql.statement.metadata.SetStorageGroupStatement;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowStorageGroupStatement;
import org.apache.iotdb.db.mpp.sql.statement.sys.AuthorStatement;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
public class ConfigTaskVisitor
extends StatementVisitor<IConfigTask, ConfigTaskVisitor.TaskContext> {
+ @Override
public IConfigTask visitStatement(Statement statement, TaskContext context) {
throw new NotImplementedException("ConfigTask is not implemented for: " + statement);
}
+ @Override
public IConfigTask visitSetStorageGroup(SetStorageGroupStatement statement, TaskContext context) {
return new SetStorageGroupTask(statement);
}
+ @Override
+ public IConfigTask visitShowStorageGroup(
+ ShowStorageGroupStatement statement, TaskContext context) {
+ return new ShowStorageGroupTask(statement);
+ }
+
@Override
public IConfigTask visitAuthor(AuthorStatement statement, TaskContext context) {
return new AuthorizerConfigTask(statement);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
index 81eae020fe..d23936cc48 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/SetStorageGroupTask.java
@@ -76,6 +76,10 @@ public class SetStorageGroupTask implements IConfigTask {
} catch (IoTDBConnectionException | BadNodeUrlException e) {
LOGGER.error("Failed to connect to config node.");
future.setException(e);
+ } finally {
+ if (configNodeClient != null) {
+ configNodeClient.close();
+ }
}
} else {
try {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ShowStorageGroupTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ShowStorageGroupTask.java
new file mode 100644
index 0000000000..d7fdb2bf5a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/config/ShowStorageGroupTask.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.config;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.exception.BadNodeUrlException;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
+import org.apache.iotdb.db.client.ConfigNodeClient;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.localconfignode.LocalConfigNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.sql.statement.metadata.ShowStorageGroupStatement;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+public class ShowStorageGroupTask implements IConfigTask {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ShowStorageGroupTask.class);
+
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+ private static final TSDataType[] RESOURCE_TYPES = {TSDataType.TEXT};
+
+ private ShowStorageGroupStatement showStorageGroupStatement;
+
+ public ShowStorageGroupTask(ShowStorageGroupStatement showStorageGroupStatement) {
+ this.showStorageGroupStatement = showStorageGroupStatement;
+ }
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute() throws InterruptedException {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ List<String> storageGroupPaths = new ArrayList<>();
+ if (config.isClusterMode()) {
+ List<String> storageGroupPathPattern =
+ Arrays.asList(showStorageGroupStatement.getPathPattern().getNodes());
+ ConfigNodeClient client = null;
+ try {
+ client = new ConfigNodeClient();
+ TStorageGroupSchemaResp resp =
+ client.getMatchedStorageGroupSchemas(storageGroupPathPattern);
+ storageGroupPaths = new ArrayList<>(resp.getStorageGroupSchemaMap().keySet());
+ } catch (IoTDBConnectionException | BadNodeUrlException e) {
+ LOGGER.error("Failed to connect to config node.");
+ future.setException(e);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ } else {
+ try {
+ List<PartialPath> partialPaths =
+ LocalConfigNode.getInstance()
+ .getMatchedStorageGroups(
+ showStorageGroupStatement.getPathPattern(),
+ showStorageGroupStatement.isPrefixPath());
+ for (PartialPath partialPath : partialPaths) {
+ storageGroupPaths.add(partialPath.getFullPath());
+ }
+ } catch (MetadataException e) {
+ future.setException(e);
+ }
+ }
+ // build TSBlock
+ TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.TEXT));
+ for (String storageGroupPath : storageGroupPaths) {
+ // The Time column will be ignored by the setting of ColumnHeader.
+ // So we can put a meaningless value here
+ builder.getTimeColumnBuilder().writeLong(0L);
+ builder.getColumnBuilder(0).writeBinary(new Binary(storageGroupPath));
+ builder.declarePosition();
+ }
+ ColumnHeader storageGroupColumnHeader =
+ new ColumnHeader(IoTDBConstant.COLUMN_STORAGE_GROUP, TSDataType.TEXT);
+ DatasetHeader datasetHeader =
+ new DatasetHeader(Collections.singletonList(storageGroupColumnHeader), true);
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
+ return future;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/IConfigStatement.java
similarity index 92%
rename from server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
rename to server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/IConfigStatement.java
index 35d4f2c949..a162ac45bd 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/ConfigStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/IConfigStatement.java
@@ -25,12 +25,12 @@ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
* ConfigStatement represents the statement which should be executed by ConfigNode All the
* statements which need to be transformed into IConfigTask should extend this class
*/
-public abstract class ConfigStatement extends Statement {
+public interface IConfigStatement {
/**
* Determine whether the operation to be performed is read or write
*
* @return QueryType
*/
- public abstract QueryType getQueryType();
+ QueryType getQueryType();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
index 73c2f11cf9..f05d70b59c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/SetStorageGroupStatement.java
@@ -22,10 +22,11 @@ package org.apache.iotdb.db.mpp.sql.statement.metadata;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.constant.StatementType;
-import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
+import org.apache.iotdb.db.mpp.sql.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
-public class SetStorageGroupStatement extends ConfigStatement {
+public class SetStorageGroupStatement extends Statement implements IConfigStatement {
private PartialPath storageGroupPath;
public SetStorageGroupStatement() {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowStorageGroupStatement.java
index 226b7c15f4..efb57c57a1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/metadata/ShowStorageGroupStatement.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.mpp.sql.statement.metadata;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
+import org.apache.iotdb.db.mpp.sql.statement.IConfigStatement;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
/**
@@ -29,7 +31,7 @@ import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
*
* <p>SHOW STORAGE GROUP prefixPath?
*/
-public class ShowStorageGroupStatement extends ShowStatement {
+public class ShowStorageGroupStatement extends ShowStatement implements IConfigStatement {
private final PartialPath pathPattern;
@@ -46,4 +48,9 @@ public class ShowStorageGroupStatement extends ShowStatement {
public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
return visitor.visitShowStorageGroup(this, context);
}
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.READ;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/sys/AuthorStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/sys/AuthorStatement.java
index f375bb2f45..671f36717c 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/sys/AuthorStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/statement/sys/AuthorStatement.java
@@ -21,11 +21,12 @@ package org.apache.iotdb.db.mpp.sql.statement.sys;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.constant.StatementType;
-import org.apache.iotdb.db.mpp.sql.statement.ConfigStatement;
+import org.apache.iotdb.db.mpp.sql.statement.IConfigStatement;
+import org.apache.iotdb.db.mpp.sql.statement.Statement;
import org.apache.iotdb.db.mpp.sql.statement.StatementVisitor;
import org.apache.iotdb.db.qp.logical.sys.AuthorOperator;
-public class AuthorStatement extends ConfigStatement {
+public class AuthorStatement extends Statement implements IConfigStatement {
private final AuthorOperator.AuthorType authorType;
private String userName;
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
index 58d376bf87..5184750ae2 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/ConfigExecutionTest.java
@@ -22,11 +22,14 @@ package org.apache.iotdb.db.mpp.execution;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.mpp.common.MPPQueryContext;
import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.config.ConfigExecution;
import org.apache.iotdb.db.mpp.execution.config.ConfigTaskResult;
import org.apache.iotdb.db.mpp.execution.config.IConfigTask;
import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.IntColumn;
import org.apache.iotdb.tsfile.read.common.block.column.TimeColumn;
@@ -36,6 +39,7 @@ import com.google.common.util.concurrent.SettableFuture;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
@@ -59,8 +63,13 @@ public class ConfigExecutionTest {
new TsBlock(
new TimeColumn(1, new long[] {0}),
new IntColumn(1, Optional.of(new boolean[] {false}), new int[] {1}));
+ DatasetHeader datasetHeader =
+ new DatasetHeader(
+ Collections.singletonList(new ColumnHeader("TestValue", TSDataType.INT32)), false);
IConfigTask task =
- () -> immediateFuture(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock));
+ () ->
+ immediateFuture(
+ new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, tsBlock, datasetHeader));
ConfigExecution execution =
new ConfigExecution(genMPPQueryContext(), null, getExecutor(), task);
execution.start();