You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/06/15 03:00:12 UTC
[incubator-doris] branch master updated: Add storage policy for remote storage migration (#9997)
This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new c4d0fba713 Add storage policy for remote storage migration (#9997)
c4d0fba713 is described below
commit c4d0fba713934c7f441d52bd4e95d80660fe829f
Author: pengxiangyu <di...@163.com>
AuthorDate: Wed Jun 15 11:00:06 2022 +0800
Add storage policy for remote storage migration (#9997)
---
.../Create/CREATE-POLICY.md | 41 ++++-
.../Create/CREATE-POLICY.md | 41 ++++-
fe/fe-core/src/main/cup/sql_parser.cup | 13 ++
.../apache/doris/analysis/CreatePolicyStmt.java | 51 ++++--
.../org/apache/doris/analysis/DropPolicyStmt.java | 25 ++-
.../org/apache/doris/analysis/ShowPolicyStmt.java | 32 ++--
.../java/org/apache/doris/catalog/Resource.java | 6 +-
.../java/org/apache/doris/catalog/ResourceMgr.java | 3 +-
.../doris/catalog/StoragePolicyResource.java | 136 --------------
.../java/org/apache/doris/persist/EditLog.java | 9 +-
.../org/apache/doris/persist/gson/GsonUtils.java | 4 +-
.../main/java/org/apache/doris/policy/Policy.java | 18 +-
.../java/org/apache/doris/policy/PolicyMgr.java | 3 +
.../java/org/apache/doris/policy/RowPolicy.java | 15 ++
.../org/apache/doris/policy/StoragePolicy.java | 196 +++++++++++++++++++++
15 files changed, 397 insertions(+), 196 deletions(-)
diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
index 7202f2a30b..73e7b240c7 100644
--- a/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
+++ b/docs/en/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
@@ -32,11 +32,13 @@ CREATE POLICY
### Description
-Create security policies and explain to view the rewritten SQL.
+Create policies,such as:
+1. Create security policies(ROW POLICY) and explain to view the rewritten SQL.
+2. Create storage migration policy(STORAGE POLICY), used for cold and hot data transform
-#### 行安全策略
-grammar:
+#### Grammar:
+1. ROW POLICY
```sql
CREATE ROW POLICY test_row_policy_1 ON test.table1
AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2));
@@ -49,6 +51,21 @@ illustrate:
- It is connected with AND between RESTRICTIVE AND PERMISSIVE
- It cannot be created for users root and admin
+2. STORAGE POLICY
+```sql
+CREATE STORAGE POLICY test_storage_policy_1
+PROPERTIES ("key"="value", ...);
+```
+illustrate:
+- PROPERTIES has such keys:
+ 1. storage_resource:storage resource name for policy
+ 2. cooldown_datetime:cool down time for tablet, can't be set with cooldown_ttl.
+ 3. cooldown_ttl:hot data stay time. The time cost between the time of tablet created and
+ the time of migrated to cold data, formatted as:
+ 1d:1 day
+ 1h:1 hour
+ 50000: 50000 second
+
### Example
1. Create a set of row security policies
@@ -76,6 +93,24 @@ illustrate:
select * from (select * from table1 where c1 = 'a' and c2 = 'b' or c3 = 'c' or c4 = 'd')
```
+2. Create policy for storage
+ 1. Create policy on cooldown_datetime
+ ```sql
+ CREATE STORAGE POLICY testPolicy
+ PROPERTIES(
+ "storage_resource" = "s3",
+ "cooldown_datetime" = "2022-06-08 00:00:00"
+ );
+ ```
+ 2. Create policy on cooldown_ttl
+ ```sql
+ CREATE STORAGE POLICY testPolicy
+ PROPERTIES(
+ "storage_resource" = "s3",
+ "cooldown_ttl" = "1d"
+ );
+ ```
+
### Keywords
CREATE, POLICY
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
index f17db5b5c5..b3b9c7f041 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-POLICY.md
@@ -32,16 +32,18 @@ CREATE POLICY
### Description
-创建安全策略,explain 可以查看改写后的 SQL。
+创建策略,包含以下几种:
-#### 行安全策略
-语法:
+1. 创建安全策略(ROW POLICY),explain 可以查看改写后的 SQL。
+2. 创建数据迁移策略(STORAGE POLICY),用于冷热数据转换。
+#### 语法:
+
+1. ROW POLICY
```sql
CREATE ROW POLICY test_row_policy_1 ON test.table1
AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2));
```
-
参数说明:
- filterType:RESTRICTIVE 将一组策略通过 AND 连接, PERMISSIVE 将一组策略通过 OR 连接
@@ -49,6 +51,20 @@ AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2));
- RESTRICTIVE 和 PERMISSIVE 之间通过 AND 连接的
- 不允许对 root 和 admin 用户创建
+2. STORAGE POLICY
+```sql
+CREATE STORAGE POLICY test_storage_policy_1
+PROPERTIES ("key"="value", ...);
+```
+参数说明:
+- PROPERTIES中需要指定资源的类型:
+ 1. storage_resource:指定策略使用的storage resource名称。
+ 2. cooldown_datetime:热数据转为冷数据时间,不能与cooldown_ttl同时存在。
+ 3. cooldown_ttl:热数据持续时间。从数据分片生成时开始计算,经过指定时间后转为冷数据。支持的格式:
+ 1d:1天
+ 1h:1小时
+ 50000: 50000秒
+
### Example
1. 创建一组行安全策略
@@ -75,6 +91,23 @@ AS {RESTRICTIVE|PERMISSIVE} TO test USING (id in (1, 2));
```sql
select * from (select * from table1 where c1 = 'a' and c2 = 'b' or c3 = 'c' or c4 = 'd')
```
+2. 创建数据迁移策略
+ 1. 指定数据冷却时间创建数据迁移策略
+ ```sql
+ CREATE STORAGE POLICY testPolicy
+ PROPERTIES(
+ "storage_resource" = "s3",
+ "cooldown_datetime" = "2022-06-08 00:00:00"
+ );
+ ```
+ 2. 指定热数据持续时间创建数据迁移策略
+ ```sql
+ CREATE STORAGE POLICY testPolicy
+ PROPERTIES(
+ "storage_resource" = "s3",
+ "cooldown_ttl" = "1d"
+ );
+ ```
### Keywords
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 281c4c884f..aecbef4687 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -1361,6 +1361,11 @@ create_stmt ::=
{:
RESULT = new CreatePolicyStmt(PolicyTypeEnum.ROW, ifNotExists, policyName, tbl, filterType, user, wherePredicate);
:}
+ /* storage policy */
+ | KW_CREATE KW_STORAGE KW_POLICY opt_if_not_exists:ifNotExists ident:policyName opt_properties:properties
+ {:
+ RESULT = new CreatePolicyStmt(PolicyTypeEnum.STORAGE, ifNotExists, policyName, properties);
+ :}
;
channel_desc_list ::=
@@ -2074,6 +2079,10 @@ drop_stmt ::=
{:
RESULT = new DropPolicyStmt(PolicyTypeEnum.ROW, ifExists, policyName, tbl, user);
:}
+ | KW_DROP KW_STORAGE KW_POLICY opt_if_exists:ifExists ident:policyName
+ {:
+ RESULT = new DropPolicyStmt(PolicyTypeEnum.STORAGE, ifExists, policyName, null, null);
+ :}
;
// Recover statement
@@ -2574,6 +2583,10 @@ show_stmt ::=
{:
RESULT = new ShowPolicyStmt(PolicyTypeEnum.ROW, null);
:}
+ | KW_SHOW KW_STORAGE KW_POLICY
+ {:
+ RESULT = new ShowPolicyStmt(PolicyTypeEnum.STORAGE, null);
+ :}
;
show_param ::=
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java
index 3f8c80c9cf..e73d4c8f7f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreatePolicyStmt.java
@@ -21,6 +21,7 @@ import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.policy.FilterType;
import org.apache.doris.policy.PolicyTypeEnum;
@@ -28,6 +29,8 @@ import org.apache.doris.qe.ConnectContext;
import lombok.Getter;
+import java.util.Map;
+
/**
* Create policy statement.
* syntax:
@@ -45,17 +48,20 @@ public class CreatePolicyStmt extends DdlStmt {
private final String policyName;
@Getter
- private final TableName tableName;
+ private TableName tableName = null;
@Getter
- private final FilterType filterType;
+ private FilterType filterType = null;
@Getter
- private final UserIdentity user;
+ private UserIdentity user = null;
@Getter
private Expr wherePredicate;
+ @Getter
+ private Map<String, String> properties;
+
/**
* Use for cup.
**/
@@ -70,14 +76,31 @@ public class CreatePolicyStmt extends DdlStmt {
this.wherePredicate = wherePredicate;
}
+ /**
+ * Use for cup.
+ */
+ public CreatePolicyStmt(PolicyTypeEnum type, boolean ifNotExists, String policyName,
+ Map<String, String> properties) {
+ this.type = type;
+ this.ifNotExists = ifNotExists;
+ this.policyName = policyName;
+ this.properties = properties;
+ }
+
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
- tableName.analyze(analyzer);
- user.analyze(analyzer.getClusterName());
- if (user.isRootUser() || user.isAdminUser()) {
- ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CreatePolicyStmt",
- user.getQualifiedUser(), user.getHost(), tableName.getTbl());
+ switch (type) {
+ case STORAGE:
+ break;
+ case ROW:
+ default:
+ tableName.analyze(analyzer);
+ user.analyze(analyzer.getClusterName());
+ if (user.isRootUser() || user.isAdminUser()) {
+ ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, "CreatePolicyStmt",
+ user.getQualifiedUser(), user.getHost(), tableName.getTbl());
+ }
}
// check auth
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
@@ -92,8 +115,16 @@ public class CreatePolicyStmt extends DdlStmt {
if (ifNotExists) {
sb.append("IF NOT EXISTS");
}
- sb.append(policyName).append(" ON ").append(tableName.toSql()).append(" AS ").append(filterType)
- .append(" TO ").append(user.getQualifiedUser()).append(" USING ").append(wherePredicate.toSql());
+ sb.append(policyName);
+ switch (type) {
+ case STORAGE:
+ sb.append(" PROPERTIES(").append(new PrintableMap<>(properties, " = ", true, false)).append(")");
+ break;
+ case ROW:
+ default:
+ sb.append(" ON ").append(tableName.toSql()).append(" AS ").append(filterType)
+ .append(" TO ").append(user.getQualifiedUser()).append(" USING ").append(wherePredicate.toSql());
+ }
return sb.toString();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java
index 87fb616c0a..c1fdd13aad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPolicyStmt.java
@@ -54,9 +54,15 @@ public class DropPolicyStmt extends DdlStmt {
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
- tableName.analyze(analyzer);
- if (user != null) {
- user.analyze(analyzer.getClusterName());
+ switch (type) {
+ case STORAGE:
+ break;
+ case ROW:
+ default:
+ tableName.analyze(analyzer);
+ if (user != null) {
+ user.analyze(analyzer.getClusterName());
+ }
}
// check auth
if (!Catalog.getCurrentCatalog().getAuth().checkGlobalPriv(ConnectContext.get(), PrivPredicate.ADMIN)) {
@@ -71,9 +77,16 @@ public class DropPolicyStmt extends DdlStmt {
if (ifExists) {
sb.append("IF EXISTS ");
}
- sb.append(policyName).append(" ON ").append(tableName.toSql());
- if (user != null) {
- sb.append(" FOR ").append(user.getQualifiedUser());
+ sb.append(policyName);
+ switch (type) {
+ case STORAGE:
+ break;
+ case ROW:
+ default:
+ sb.append(" ON ").append(tableName.toSql());
+ if (user != null) {
+ sb.append(" FOR ").append(user.getQualifiedUser());
+ }
}
return sb.toString();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java
index f450952ba1..c7b84bcef7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowPolicyStmt.java
@@ -25,6 +25,8 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.policy.PolicyTypeEnum;
+import org.apache.doris.policy.RowPolicy;
+import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
@@ -48,18 +50,6 @@ public class ShowPolicyStmt extends ShowStmt {
this.user = user;
}
- private static final ShowResultSetMetaData ROW_META_DATA =
- ShowResultSetMetaData.builder()
- .addColumn(new Column("PolicyName", ScalarType.createVarchar(100)))
- .addColumn(new Column("DbName", ScalarType.createVarchar(100)))
- .addColumn(new Column("TableName", ScalarType.createVarchar(100)))
- .addColumn(new Column("Type", ScalarType.createVarchar(20)))
- .addColumn(new Column("FilterType", ScalarType.createVarchar(20)))
- .addColumn(new Column("WherePredicate", ScalarType.createVarchar(65535)))
- .addColumn(new Column("User", ScalarType.createVarchar(20)))
- .addColumn(new Column("OriginStmt", ScalarType.createVarchar(65535)))
- .build();
-
@Override
public void analyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);
@@ -76,14 +66,26 @@ public class ShowPolicyStmt extends ShowStmt {
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("SHOW ").append(type).append(" POLICY");
- if (user != null) {
- sb.append(" FOR ").append(user);
+ switch (type) {
+ case STORAGE:
+ break;
+ case ROW:
+ default:
+ if (user != null) {
+ sb.append(" FOR ").append(user);
+ }
}
return sb.toString();
}
@Override
public ShowResultSetMetaData getMetaData() {
- return ROW_META_DATA;
+ switch (type) {
+ case STORAGE:
+ return StoragePolicy.STORAGE_META_DATA;
+ case ROW:
+ default:
+ return RowPolicy.ROW_META_DATA;
+ }
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
index 726139d4c4..d2c2e7f48f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
@@ -44,8 +44,7 @@ public abstract class Resource implements Writable {
UNKNOWN,
SPARK,
ODBC_CATALOG,
- S3,
- STORAGE_POLICY;
+ S3;
public static ResourceType fromString(String resourceType) {
for (ResourceType type : ResourceType.values()) {
@@ -96,9 +95,6 @@ public abstract class Resource implements Writable {
case S3:
resource = new S3Resource(name);
break;
- case STORAGE_POLICY:
- resource = new StoragePolicyResource(name);
- break;
default:
throw new DdlException("Unknown resource type: " + type);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
index 19d39e4dc5..d7d57272ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
@@ -72,8 +72,7 @@ public class ResourceMgr implements Writable {
public void createResource(CreateResourceStmt stmt) throws DdlException {
if (stmt.getResourceType() != ResourceType.SPARK
&& stmt.getResourceType() != ResourceType.ODBC_CATALOG
- && stmt.getResourceType() != ResourceType.S3
- && stmt.getResourceType() != ResourceType.STORAGE_POLICY) {
+ && stmt.getResourceType() != ResourceType.S3) {
throw new DdlException("Only support SPARK, ODBC_CATALOG and REMOTE_STORAGE resource.");
}
Resource resource = Resource.fromStmt(stmt);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/StoragePolicyResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/StoragePolicyResource.java
deleted file mode 100644
index 285be637f7..0000000000
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/StoragePolicyResource.java
+++ /dev/null
@@ -1,136 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.catalog;
-
-import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.DdlException;
-import org.apache.doris.common.proc.BaseProcResult;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.gson.annotations.SerializedName;
-
-import java.util.Map;
-
-/**
- * Policy resource for olap table.
- * Syntax:
- * CREATE RESOURCE "storage_policy_name"
- * PROPERTIES(
- * "type"="storage_policy",
- * "cooldown_datetime" = "2022-06-01", // time when data is transfter to medium
- * "cooldown_ttl" = "1h", // data is transfter to medium after 1 hour
- * "s3_resource" = "my_s3" // point to a s3 resource
- * );
- */
-public class StoragePolicyResource extends Resource {
- // required
- private static final String STORAGE_RESOURCE = "storage_resource";
- // optional
- private static final String COOLDOWN_DATETIME = "cooldown_datetime";
- private static final String COOLDOWN_TTL = "cooldown_ttl";
-
- private static final String DEFAULT_COOLDOWN_DATETIME = "9999-01-01 00:00:00";
- private static final String DEFAULT_COOLDOWN_TTL = "1h";
-
- @SerializedName(value = "properties")
- private Map<String, String> properties;
-
- public StoragePolicyResource(String name) {
- this(name, Maps.newHashMap());
- }
-
- public StoragePolicyResource(String name, Map<String, String> properties) {
- super(name, ResourceType.STORAGE_POLICY);
- this.properties = properties;
- }
-
- public String getProperty(String propertyKey) {
- return properties.get(propertyKey);
- }
-
- @Override
- protected void setProperties(Map<String, String> properties) throws DdlException {
- Preconditions.checkState(properties != null);
- this.properties = properties;
- // check properties
- // required
- checkRequiredProperty(STORAGE_RESOURCE);
- // optional
- checkOptionalProperty(COOLDOWN_DATETIME, DEFAULT_COOLDOWN_DATETIME);
- checkOptionalProperty(COOLDOWN_TTL, DEFAULT_COOLDOWN_TTL);
- if (properties.containsKey(COOLDOWN_DATETIME) && properties.containsKey(COOLDOWN_TTL)
- && !properties.get(COOLDOWN_DATETIME).isEmpty() && !properties.get(COOLDOWN_TTL).isEmpty()) {
- throw new DdlException("Only one of [" + COOLDOWN_DATETIME + "] and [" + COOLDOWN_TTL
- + "] can be specified in properties.");
- }
- }
-
- private void checkRequiredProperty(String propertyKey) throws DdlException {
- String value = properties.get(propertyKey);
-
- if (Strings.isNullOrEmpty(value)) {
- throw new DdlException("Missing [" + propertyKey + "] in properties.");
- }
- }
-
- private void checkOptionalProperty(String propertyKey, String defaultValue) {
- this.properties.putIfAbsent(propertyKey, defaultValue);
- }
-
- @Override
- public void modifyProperties(Map<String, String> properties) throws DdlException {
- if (properties.containsKey(COOLDOWN_DATETIME) && properties.containsKey(COOLDOWN_TTL)
- && !properties.get(COOLDOWN_DATETIME).isEmpty() && !properties.get(COOLDOWN_TTL).isEmpty()) {
- throw new DdlException("Only one of [" + COOLDOWN_DATETIME + "] and [" + COOLDOWN_TTL
- + "] can be specified in properties.");
- }
- // modify properties
- replaceIfEffectiveValue(this.properties, STORAGE_RESOURCE, properties.get(STORAGE_RESOURCE));
- replaceIfEffectiveValue(this.properties, COOLDOWN_DATETIME, properties.get(COOLDOWN_DATETIME));
- replaceIfEffectiveValue(this.properties, COOLDOWN_TTL, properties.get(COOLDOWN_TTL));
- }
-
- @Override
- public void checkProperties(Map<String, String> properties) throws AnalysisException {
- // check properties
- Map<String, String> copiedProperties = Maps.newHashMap(properties);
- copiedProperties.remove(STORAGE_RESOURCE);
- copiedProperties.remove(COOLDOWN_DATETIME);
- copiedProperties.remove(COOLDOWN_TTL);
-
- if (!copiedProperties.isEmpty()) {
- throw new AnalysisException("Unknown policy resource properties: " + copiedProperties);
- }
- }
-
- @Override
- public Map<String, String> getCopiedProperties() {
- return Maps.newHashMap(properties);
- }
-
- @Override
- protected void getProcNodeData(BaseProcResult result) {
- String lowerCaseType = type.name().toLowerCase();
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- result.addRow(Lists.newArrayList(name, lowerCaseType, entry.getKey(), entry.getValue()));
- }
- }
-}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index de497deb75..ca958128ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -65,7 +65,6 @@ import org.apache.doris.mysql.privilege.UserPropertyInfo;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.policy.DropPolicyLog;
import org.apache.doris.policy.Policy;
-import org.apache.doris.policy.RowPolicy;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.transaction.TransactionState;
@@ -813,7 +812,7 @@ public class EditLog {
break;
}
case OperationType.OP_CREATE_POLICY: {
- RowPolicy log = (RowPolicy) journal.getData();
+ Policy log = (Policy) journal.getData();
catalog.getPolicyMgr().replayCreate(log);
break;
}
@@ -1426,11 +1425,7 @@ public class EditLog {
}
public void logCreatePolicy(Policy policy) {
- if (policy instanceof RowPolicy) {
- logEdit(OperationType.OP_CREATE_POLICY, policy);
- } else {
- LOG.error("invalid policy: " + policy.getType().name());
- }
+ logEdit(OperationType.OP_CREATE_POLICY, policy);
}
public void logDropPolicy(DropPolicyLog log) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index 08a61c5bb0..1fcb098b96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -37,6 +37,7 @@ import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.canal.CanalSyncJob;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.RowPolicy;
+import org.apache.doris.policy.StoragePolicy;
import com.google.common.base.Preconditions;
import com.google.common.collect.ArrayListMultimap;
@@ -136,7 +137,8 @@ public class GsonUtils {
// runtime adapter for class "Policy"
private static RuntimeTypeAdapterFactory<Policy> policyTypeAdapterFactory = RuntimeTypeAdapterFactory
.of(Policy.class, "clazz")
- .registerSubtype(RowPolicy.class, RowPolicy.class.getSimpleName());
+ .registerSubtype(RowPolicy.class, RowPolicy.class.getSimpleName())
+ .registerSubtype(StoragePolicy.class, StoragePolicy.class.getSimpleName());
// the builder of GSON instance.
// Add any other adapters if necessary.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
index 894bc463ad..332b9a0026 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
@@ -77,16 +77,20 @@ public abstract class Policy implements Writable, GsonPostProcessable {
* Trans stmt to Policy.
**/
public static Policy fromCreateStmt(CreatePolicyStmt stmt) throws AnalysisException {
- String curDb = stmt.getTableName().getDb();
- if (curDb == null) {
- curDb = ConnectContext.get().getDatabase();
- }
- Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
- UserIdentity userIdent = stmt.getUser();
- userIdent.analyze(ConnectContext.get().getClusterName());
switch (stmt.getType()) {
+ case STORAGE:
+ StoragePolicy storagePolicy = new StoragePolicy(stmt.getType(), stmt.getPolicyName());
+ storagePolicy.init(stmt.getProperties());
+ return storagePolicy;
case ROW:
default:
+ String curDb = stmt.getTableName().getDb();
+ if (curDb == null) {
+ curDb = ConnectContext.get().getDatabase();
+ }
+ Database db = Catalog.getCurrentCatalog().getDbOrAnalysisException(curDb);
+ UserIdentity userIdent = stmt.getUser();
+ userIdent.analyze(ConnectContext.get().getClusterName());
Table table = db.getTableOrAnalysisException(stmt.getTableName().getTbl());
return new RowPolicy(stmt.getType(), stmt.getPolicyName(), db.getId(), userIdent,
stmt.getOrigStmt().originStmt, table.getId(), stmt.getFilterType(),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
index 0746e191b1..ab567117aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
@@ -208,6 +208,9 @@ public class PolicyMgr implements Writable {
long currentDbId = ConnectContext.get().getCurrentDbId();
Policy checkedPolicy = null;
switch (showStmt.getType()) {
+ case STORAGE:
+ checkedPolicy = new StoragePolicy();
+ break;
case ROW:
default:
RowPolicy rowPolicy = new RowPolicy();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java
index ab4b0a74c6..b989583a4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java
@@ -23,10 +23,13 @@ import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
@@ -45,6 +48,18 @@ import java.util.List;
@Data
public class RowPolicy extends Policy {
+ public static final ShowResultSetMetaData ROW_META_DATA =
+ ShowResultSetMetaData.builder()
+ .addColumn(new Column("PolicyName", ScalarType.createVarchar(100)))
+ .addColumn(new Column("DbName", ScalarType.createVarchar(100)))
+ .addColumn(new Column("TableName", ScalarType.createVarchar(100)))
+ .addColumn(new Column("Type", ScalarType.createVarchar(20)))
+ .addColumn(new Column("FilterType", ScalarType.createVarchar(20)))
+ .addColumn(new Column("WherePredicate", ScalarType.createVarchar(65535)))
+ .addColumn(new Column("User", ScalarType.createVarchar(20)))
+ .addColumn(new Column("OriginStmt", ScalarType.createVarchar(65535)))
+ .build();
+
private static final Logger LOG = LogManager.getLogger(RowPolicy.class);
/**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
new file mode 100644
index 0000000000..c5a48c4a7e
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
@@ -0,0 +1,196 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.policy;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.qe.ShowResultSetMetaData;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.gson.annotations.SerializedName;
+import lombok.Data;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Save policy for storage migration.
+ **/
+@Data
+public class StoragePolicy extends Policy {
+
+ public static final ShowResultSetMetaData STORAGE_META_DATA =
+ ShowResultSetMetaData.builder()
+ .addColumn(new Column("PolicyName", ScalarType.createVarchar(100)))
+ .addColumn(new Column("Type", ScalarType.createVarchar(20)))
+ .addColumn(new Column("StorageResource", ScalarType.createVarchar(20)))
+ .addColumn(new Column("CooldownDatetime", ScalarType.createVarchar(20)))
+ .addColumn(new Column("CooldownTtl", ScalarType.createVarchar(20)))
+ .addColumn(new Column("properties", ScalarType.createVarchar(65535)))
+ .build();
+
+ private static final Logger LOG = LogManager.getLogger(StoragePolicy.class);
+ // required
+ private static final String STORAGE_RESOURCE = "storage_resource";
+ // optional
+ private static final String COOLDOWN_DATETIME = "cooldown_datetime";
+ private static final String COOLDOWN_TTL = "cooldown_ttl";
+
+ @SerializedName(value = "storageResource")
+ private String storageResource = null;
+
+ @SerializedName(value = "cooldownDatetime")
+ private Date cooldownDatetime = null;
+
+ @SerializedName(value = "cooldownTtl")
+ private String cooldownTtl = null;
+
+ private Map<String, String> props;
+
+ public StoragePolicy() {}
+
+ /**
+ * Policy for Storage Migration.
+ *
+ * @param type PolicyType
+ * @param policyName policy name
+ * @param storageResource resource name for storage
+ * @param cooldownDatetime cool down time
+ * @param cooldownTtl cool down time cost after partition is created
+ */
+ public StoragePolicy(final PolicyTypeEnum type, final String policyName, final String storageResource,
+ final Date cooldownDatetime, final String cooldownTtl) {
+ super(type, policyName);
+ this.storageResource = storageResource;
+ this.cooldownDatetime = cooldownDatetime;
+ this.cooldownTtl = cooldownTtl;
+ }
+
+ /**
+ * Policy for Storage Migration.
+ *
+ * @param type PolicyType
+ * @param policyName policy name
+ */
+ public StoragePolicy(final PolicyTypeEnum type, final String policyName) {
+ super(type, policyName);
+ }
+
+ /**
+ * Init props for storage policy.
+ *
+ * @param props properties for storage policy
+ */
+ public void init(final Map<String, String> props) throws AnalysisException {
+ if (props == null) {
+ throw new AnalysisException("properties config is required");
+ }
+ checkRequiredProperty(props, STORAGE_RESOURCE);
+ this.storageResource = props.get(STORAGE_RESOURCE);
+ boolean hasCooldownDatetime = false;
+ boolean hasCooldownTtl = false;
+ if (props.containsKey(COOLDOWN_DATETIME)) {
+ hasCooldownDatetime = true;
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ try {
+ this.cooldownDatetime = df.parse(props.get(COOLDOWN_DATETIME));
+ } catch (ParseException e) {
+ throw new AnalysisException(String.format("cooldown_datetime format error: %s",
+ props.get(COOLDOWN_DATETIME)), e);
+ }
+ }
+ if (props.containsKey(COOLDOWN_TTL)) {
+ hasCooldownTtl = true;
+ this.cooldownTtl = props.get(COOLDOWN_TTL);
+ }
+ if (hasCooldownDatetime && hasCooldownTtl) {
+ throw new AnalysisException(COOLDOWN_DATETIME + " and " + COOLDOWN_TTL + " can't be set together.");
+ }
+ if (!hasCooldownDatetime && !hasCooldownTtl) {
+ throw new AnalysisException(COOLDOWN_DATETIME + " or " + COOLDOWN_TTL + " must be set");
+ }
+ if (!Catalog.getCurrentCatalog().getResourceMgr().containsResource(this.storageResource)) {
+ throw new AnalysisException("storage resource doesn't exist: " + this.storageResource);
+ }
+ }
+
+ /**
+ * Use for SHOW POLICY.
+ **/
+ public List<String> getShowInfo() throws AnalysisException {
+ String props = "";
+ if (Catalog.getCurrentCatalog().getResourceMgr().containsResource(this.storageResource)) {
+ props = Catalog.getCurrentCatalog().getResourceMgr().getResource(this.storageResource).toString();
+ }
+ SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ return Lists.newArrayList(this.policyName, this.type.name(), this.storageResource,
+ df.format(this.cooldownDatetime), this.cooldownTtl, props);
+ }
+
+ @Override
+ public void gsonPostProcess() throws IOException {}
+
+ @Override
+ public StoragePolicy clone() {
+ return new StoragePolicy(this.type, this.policyName, this.storageResource,
+ this.cooldownDatetime, this.cooldownTtl);
+ }
+
+ @Override
+ public boolean matchPolicy(Policy checkedPolicyCondition) {
+ if (!(checkedPolicyCondition instanceof StoragePolicy)) {
+ return false;
+ }
+ StoragePolicy storagePolicy = (StoragePolicy) checkedPolicyCondition;
+ return checkMatched(storagePolicy.getType(), storagePolicy.getPolicyName());
+ }
+
+ @Override
+ public boolean matchPolicy(DropPolicyLog checkedDropCondition) {
+ return checkMatched(checkedDropCondition.getType(), checkedDropCondition.getPolicyName());
+ }
+
+ /**
+ * check required key in properties.
+ *
+ * @param props properties for storage policy
+ * @param propertyKey key for property
+ * @throws AnalysisException exception for properties error
+ */
+ private void checkRequiredProperty(final Map<String, String> props, String propertyKey) throws AnalysisException {
+ String value = props.get(propertyKey);
+
+ if (Strings.isNullOrEmpty(value)) {
+ throw new AnalysisException("Missing [" + propertyKey + "] in properties.");
+ }
+ }
+
+ @Override
+ public boolean isInvalid() {
+ return false;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org