You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/05/09 15:42:00 UTC
[iotdb] branch master updated: [IOTDB-3119] extend set storage group (#5831)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 1968719d30 [IOTDB-3119] extend set storage group (#5831)
1968719d30 is described below
commit 1968719d30b10dbf0da6dd68042ecff46e74e402
Author: ZhangHongYin <46...@users.noreply.github.com>
AuthorDate: Mon May 9 23:41:53 2022 +0800
[IOTDB-3119] extend set storage group (#5831)
---
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 6 ++-
.../antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 | 11 +++++
.../confignode1conf/iotdb-confignode.properties | 8 ++--
.../confignode2conf/iotdb-confignode.properties | 8 ++--
.../confignode3conf/iotdb-confignode.properties | 8 ++--
.../iotdb/db/mpp/common/header/HeaderConstant.java | 10 ++++-
.../plan/execution/config/SetStorageGroupTask.java | 36 +++++++++++++---
.../execution/config/ShowStorageGroupTask.java | 50 ++++++++++++----------
.../iotdb/db/mpp/plan/parser/ASTVisitor.java | 18 ++++++++
.../metadata/SetStorageGroupStatement.java | 36 ++++++++++++++++
10 files changed, 150 insertions(+), 41 deletions(-)
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 9103af1929..56ca7bd9b5 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -75,7 +75,11 @@ syncStatement
// Create Storage Group
setStorageGroup
- : SET STORAGE GROUP TO prefixPath
+ : SET STORAGE GROUP TO prefixPath (WITH storageGroupAttributeClause (COMMA storageGroupAttributeClause)*)?
+ ;
+
+storageGroupAttributeClause
+ : (TTL | SCHEMA_REPLICATION_FACTOR | DATA_REPLICATION_FACTOR | TIME_PARTITION_INTERVAL) '=' INTEGER_LITERAL
;
createStorageGroup
diff --git a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4 b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
index 5c59eb24f0..f92fcbba5b 100644
--- a/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
+++ b/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/SqlLexer.g4
@@ -836,6 +836,17 @@ DROP_CONTINUOUS_QUERY
: D R O P '_' C O N T I N U O U S '_' Q U E R Y
;
+SCHEMA_REPLICATION_FACTOR
+ : S C H E M A '_' R E P L I C A T I O N '_' F A C T O R
+ ;
+
+DATA_REPLICATION_FACTOR
+ : D A T A '_' R E P L I C A T I O N '_' F A C T O R
+ ;
+
+TIME_PARTITION_INTERVAL
+ : T I M E '_' P A R T I T I O N '_' I N T E R V A L
+ ;
/**
* 3. Operators
diff --git a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
index e0a3226f92..592edc8389 100644
--- a/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode1conf/iotdb-confignode.properties
@@ -17,10 +17,10 @@
# under the License.
#
-config_node_rpc_address=0.0.0.0
-config_node_rpc_port=22277
-config_node_internal_port=22278
-config_node_group_address_list=0.0.0.0:22278,0.0.0.0:22280,0.0.0.0:22282
+rpc_address=0.0.0.0
+rpc_port=22277
+consensus_port=22278
+target_confignode=0.0.0.0:22277
config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
diff --git a/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties b/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties
index 86c8452372..5f42ac6cb5 100644
--- a/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode2conf/iotdb-confignode.properties
@@ -17,10 +17,10 @@
# under the License.
#
-config_node_rpc_address=0.0.0.0
-config_node_rpc_port=22279
-config_node_internal_port=22280
-config_node_group_address_list=0.0.0.0:22278,0.0.0.0:22280,0.0.0.0:22282
+rpc_address=0.0.0.0
+rpc_port=22279
+consensus_port=22280
+target_confignode=0.0.0.0:22277
config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
diff --git a/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties b/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties
index f2da7f3495..bfc3305cc2 100644
--- a/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties
+++ b/confignode/src/test/resources/confignode3conf/iotdb-confignode.properties
@@ -17,10 +17,10 @@
# under the License.
#
-config_node_rpc_address=0.0.0.0
-config_node_rpc_port=22281
-config_node_internal_port=22282
-config_node_group_address_list=0.0.0.0:22278,0.0.0.0:22280,0.0.0.0:22282
+rpc_address=0.0.0.0
+rpc_port=22281
+consensus_port=22282
+target_confignode=0.0.0.0:22277
config_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
data_node_consensus_protocol_class=org.apache.iotdb.consensus.ratis.RatisConsensus
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
index e05e8669d9..744351b580 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/common/header/HeaderConstant.java
@@ -42,6 +42,9 @@ public class HeaderConstant {
public static final String COLUMN_IS_ALIGNED = "isAligned";
public static final String COLUMN_COUNT = "count";
public static final String COLUMN_TTL = "ttl";
+ public static final String COLUMN_SCHEMA_REPLICATION_FACTOR = "schema_replication_factor";
+ public static final String COLUMN_DATA_REPLICATION_FACTOR = "data_replication_factor";
+ public static final String COLUMN_TIME_PARTITION_INTERVAL = "time_partition_interval";
// column names for count statement
public static final String COLUMN_COLUMN = "column";
@@ -118,7 +121,12 @@ public class HeaderConstant {
true);
showStorageGroupHeader =
new DatasetHeader(
- Collections.singletonList(new ColumnHeader(COLUMN_STORAGE_GROUP, TSDataType.TEXT)),
+ Arrays.asList(
+ new ColumnHeader(COLUMN_STORAGE_GROUP, TSDataType.TEXT),
+ new ColumnHeader(COLUMN_TTL, TSDataType.INT64),
+ new ColumnHeader(COLUMN_SCHEMA_REPLICATION_FACTOR, TSDataType.INT32),
+ new ColumnHeader(COLUMN_DATA_REPLICATION_FACTOR, TSDataType.INT32),
+ new ColumnHeader(COLUMN_TIME_PARTITION_INTERVAL, TSDataType.INT64)),
true);
showTTLHeader =
new DatasetHeader(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/SetStorageGroupTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/SetStorageGroupTask.java
index f3e5ed34e5..8bd63e5b0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/SetStorageGroupTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/SetStorageGroupTask.java
@@ -37,6 +37,8 @@ import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
public class SetStorageGroupTask implements IConfigTask {
private static final Logger LOGGER = LoggerFactory.getLogger(SetStorageGroupTask.class);
@@ -54,8 +56,7 @@ public class SetStorageGroupTask implements IConfigTask {
// TODO:(this judgement needs to be integrated in a high level framework)
if (config.isClusterMode()) {
// Construct request using statement
- TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
- storageGroupSchema.setName(setStorageGroupStatement.getStorageGroupPath().getFullPath());
+ TStorageGroupSchema storageGroupSchema = constructStorageGroupSchema();
TSetStorageGroupReq req = new TSetStorageGroupReq(storageGroupSchema);
ConfigNodeClient configNodeClient = null;
try {
@@ -82,9 +83,12 @@ public class SetStorageGroupTask implements IConfigTask {
}
} else {
try {
- LocalConfigNode.getInstance()
- .setStorageGroup(setStorageGroupStatement.getStorageGroupPath());
- } catch (MetadataException e) {
+ LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
+ localConfigNode.setStorageGroup(setStorageGroupStatement.getStorageGroupPath());
+ localConfigNode.setTTL(
+ setStorageGroupStatement.getStorageGroupPath(), setStorageGroupStatement.getTtl());
+ // schemaReplicationFactor, dataReplicationFactor, timePartitionInterval are ignored
+ } catch (MetadataException | IOException e) {
future.setException(e);
}
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
@@ -93,4 +97,26 @@ public class SetStorageGroupTask implements IConfigTask {
// If your operation is async, you can return the corresponding future directly.
return future;
}
+
+ /** construct set storage group schema according to statement */
+ private TStorageGroupSchema constructStorageGroupSchema() {
+ TStorageGroupSchema storageGroupSchema = new TStorageGroupSchema();
+ storageGroupSchema.setName(setStorageGroupStatement.getStorageGroupPath().getFullPath());
+ if (setStorageGroupStatement.getTtl() != null) {
+ storageGroupSchema.setTTL(setStorageGroupStatement.getTtl());
+ }
+ if (setStorageGroupStatement.getSchemaReplicationFactor() != null) {
+ storageGroupSchema.setSchemaReplicationFactor(
+ setStorageGroupStatement.getSchemaReplicationFactor());
+ }
+ if (setStorageGroupStatement.getDataReplicationFactor() != null) {
+ storageGroupSchema.setDataReplicationFactor(
+ setStorageGroupStatement.getDataReplicationFactor());
+ }
+ if (setStorageGroupStatement.getTimePartitionInterval() != null) {
+ storageGroupSchema.setTimePartitionInterval(
+ setStorageGroupStatement.getTimePartitionInterval());
+ }
+ return storageGroupSchema;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowStorageGroupTask.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowStorageGroupTask.java
index 853a0136a1..cd705d9cb1 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowStorageGroupTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/execution/config/ShowStorageGroupTask.java
@@ -19,20 +19,20 @@
package org.apache.iotdb.db.mpp.plan.execution.config;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchema;
import org.apache.iotdb.confignode.rpc.thrift.TStorageGroupSchemaResp;
import org.apache.iotdb.db.client.ConfigNodeClient;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.localconfignode.LocalConfigNode;
-import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
+import org.apache.iotdb.db.mpp.common.header.HeaderConstant;
import org.apache.iotdb.db.mpp.plan.statement.metadata.ShowStorageGroupStatement;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -41,10 +41,10 @@ import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class ShowStorageGroupTask implements IConfigTask {
private static final Logger LOGGER = LoggerFactory.getLogger(ShowStorageGroupTask.class);
@@ -60,7 +60,7 @@ public class ShowStorageGroupTask implements IConfigTask {
@Override
public ListenableFuture<ConfigTaskResult> execute() throws InterruptedException {
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
- List<String> storageGroupPaths = new ArrayList<>();
+ Map<String, TStorageGroupSchema> storageGroupSchemaMap = new HashMap<>();
if (config.isClusterMode()) {
List<String> storageGroupPathPattern =
Arrays.asList(showStorageGroupStatement.getPathPattern().getNodes());
@@ -69,7 +69,7 @@ public class ShowStorageGroupTask implements IConfigTask {
client = new ConfigNodeClient();
TStorageGroupSchemaResp resp =
client.getMatchedStorageGroupSchemas(storageGroupPathPattern);
- storageGroupPaths = new ArrayList<>(resp.getStorageGroupSchemaMap().keySet());
+ storageGroupSchemaMap = resp.getStorageGroupSchemaMap();
} catch (IoTDBConnectionException e) {
LOGGER.error("Failed to connect to config node.");
future.setException(e);
@@ -80,31 +80,37 @@ public class ShowStorageGroupTask implements IConfigTask {
}
} else {
try {
+ LocalConfigNode localConfigNode = LocalConfigNode.getInstance();
List<PartialPath> partialPaths =
- LocalConfigNode.getInstance()
- .getMatchedStorageGroups(
- showStorageGroupStatement.getPathPattern(),
- showStorageGroupStatement.isPrefixPath());
- for (PartialPath partialPath : partialPaths) {
- storageGroupPaths.add(partialPath.getFullPath());
+ localConfigNode.getMatchedStorageGroups(
+ showStorageGroupStatement.getPathPattern(),
+ showStorageGroupStatement.isPrefixPath());
+ for (PartialPath storageGroupPath : partialPaths) {
+ IStorageGroupMNode storageGroupMNode =
+ localConfigNode.getStorageGroupNodeByPath(storageGroupPath);
+ String storageGroup = storageGroupMNode.getFullPath();
+ TStorageGroupSchema storageGroupSchema = storageGroupMNode.getStorageGroupSchema();
+ storageGroupSchemaMap.put(storageGroup, storageGroupSchema);
}
} catch (MetadataException e) {
future.setException(e);
}
}
// build TSBlock
- TsBlockBuilder builder = new TsBlockBuilder(Collections.singletonList(TSDataType.TEXT));
- for (String storageGroupPath : storageGroupPaths) {
- // The Time column will be ignored by the setting of ColumnHeader.
- // So we can put a meaningless value here
+ TsBlockBuilder builder =
+ new TsBlockBuilder(HeaderConstant.showStorageGroupHeader.getRespDataTypes());
+ for (Map.Entry<String, TStorageGroupSchema> entry : storageGroupSchemaMap.entrySet()) {
+ String storageGroup = entry.getKey();
+ TStorageGroupSchema storageGroupSchema = entry.getValue();
builder.getTimeColumnBuilder().writeLong(0L);
- builder.getColumnBuilder(0).writeBinary(new Binary(storageGroupPath));
+ builder.getColumnBuilder(0).writeBinary(new Binary(storageGroup));
+ builder.getColumnBuilder(1).writeLong(storageGroupSchema.getTTL());
+ builder.getColumnBuilder(2).writeInt(storageGroupSchema.getSchemaReplicationFactor());
+ builder.getColumnBuilder(3).writeInt(storageGroupSchema.getDataReplicationFactor());
+ builder.getColumnBuilder(4).writeLong(storageGroupSchema.getTimePartitionInterval());
builder.declarePosition();
}
- ColumnHeader storageGroupColumnHeader =
- new ColumnHeader(IoTDBConstant.COLUMN_STORAGE_GROUP, TSDataType.TEXT);
- DatasetHeader datasetHeader =
- new DatasetHeader(Collections.singletonList(storageGroupColumnHeader), true);
+ DatasetHeader datasetHeader = HeaderConstant.showStorageGroupHeader;
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
return future;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
index 75c5580d4d..7000d07459 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/parser/ASTVisitor.java
@@ -1400,6 +1400,24 @@ public class ASTVisitor extends IoTDBSqlParserBaseVisitor<Statement> {
SetStorageGroupStatement setStorageGroupStatement = new SetStorageGroupStatement();
PartialPath path = parsePrefixPath(ctx.prefixPath());
setStorageGroupStatement.setStorageGroupPath(path);
+ if (ctx.storageGroupAttributeClause() != null) {
+ for (IoTDBSqlParser.StorageGroupAttributeClauseContext attribute :
+ ctx.storageGroupAttributeClause()) {
+ if (attribute.TTL() != null) {
+ long ttl = Long.parseLong(attribute.INTEGER_LITERAL().getText());
+ setStorageGroupStatement.setTtl(ttl);
+ } else if (attribute.SCHEMA_REPLICATION_FACTOR() != null) {
+ int schemaReplicationFactor = Integer.parseInt(attribute.INTEGER_LITERAL().getText());
+ setStorageGroupStatement.setSchemaReplicationFactor(schemaReplicationFactor);
+ } else if (attribute.DATA_REPLICATION_FACTOR() != null) {
+ int dataReplicationFactor = Integer.parseInt(attribute.INTEGER_LITERAL().getText());
+ setStorageGroupStatement.setDataReplicationFactor(dataReplicationFactor);
+ } else if (attribute.TIME_PARTITION_INTERVAL() != null) {
+ long timePartitionInterval = Long.parseLong(attribute.INTEGER_LITERAL().getText());
+ setStorageGroupStatement.setTimePartitionInterval(timePartitionInterval);
+ }
+ }
+ }
return setStorageGroupStatement;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java
index 4a5ffbf376..9afdbea821 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/metadata/SetStorageGroupStatement.java
@@ -31,6 +31,10 @@ import java.util.List;
public class SetStorageGroupStatement extends Statement implements IConfigStatement {
private PartialPath storageGroupPath;
+ private Long ttl = null;
+ private Integer schemaReplicationFactor = null;
+ private Integer dataReplicationFactor = null;
+ private Long timePartitionInterval = null;
public SetStorageGroupStatement() {
super();
@@ -50,6 +54,22 @@ public class SetStorageGroupStatement extends Statement implements IConfigStatem
this.storageGroupPath = storageGroupPath;
}
+ public void setTtl(Long ttl) {
+ this.ttl = ttl;
+ }
+
+ public void setSchemaReplicationFactor(Integer schemaReplicationFactor) {
+ this.schemaReplicationFactor = schemaReplicationFactor;
+ }
+
+ public void setDataReplicationFactor(Integer dataReplicationFactor) {
+ this.dataReplicationFactor = dataReplicationFactor;
+ }
+
+ public void setTimePartitionInterval(Long timePartitionInterval) {
+ this.timePartitionInterval = timePartitionInterval;
+ }
+
@Override
public QueryType getQueryType() {
return QueryType.WRITE;
@@ -61,4 +81,20 @@ public class SetStorageGroupStatement extends Statement implements IConfigStatem
? Collections.singletonList(storageGroupPath)
: Collections.emptyList();
}
+
+ public Long getTtl() {
+ return ttl;
+ }
+
+ public Integer getSchemaReplicationFactor() {
+ return schemaReplicationFactor;
+ }
+
+ public Integer getDataReplicationFactor() {
+ return dataReplicationFactor;
+ }
+
+ public Long getTimePartitionInterval() {
+ return timePartitionInterval;
+ }
}