You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/20 08:21:05 UTC
[doris] branch master updated: [refactor](policy) refactor some policy create and check logic (#11007)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7bdce8f572 [refactor](policy) refactor some policy create and check logic (#11007)
7bdce8f572 is described below
commit 7bdce8f57276f23452db075597be61ca4ccccfaf
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Wed Jul 20 16:20:59 2022 +0800
[refactor](policy) refactor some policy create and check logic (#11007)
* [refactor](policy) refactor some policy create and check logic
---
.../main/java/org/apache/doris/alter/Alter.java | 5 +--
.../org/apache/doris/analysis/AlterPolicyStmt.java | 9 ++--
.../java/org/apache/doris/catalog/Catalog.java | 2 +
.../java/org/apache/doris/catalog/ResourceMgr.java | 3 +-
.../main/java/org/apache/doris/common/Config.java | 3 --
.../doris/common/util/DynamicPartitionUtil.java | 14 +++----
.../apache/doris/common/util/PropertyAnalyzer.java | 5 +--
.../doris/datasource/InternalDataSource.java | 26 +-----------
.../main/java/org/apache/doris/policy/Policy.java | 21 +++++-----
.../java/org/apache/doris/policy/PolicyMgr.java | 42 ++++++++++++-------
.../java/org/apache/doris/policy/RowPolicy.java | 17 ++++----
.../org/apache/doris/policy/StoragePolicy.java | 48 +++++++++++++---------
.../apache/doris/analysis/CancelLoadStmtTest.java | 41 ++++++++++--------
.../java/org/apache/doris/policy/PolicyTest.java | 7 ++--
14 files changed, 124 insertions(+), 119 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 659a550be4..d41de3557f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -59,7 +59,6 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.PropertyAnalyzer;
-import org.apache.doris.datasource.InternalDataSource;
import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.ModifyCommentOperationLog;
@@ -161,7 +160,7 @@ public class Alter {
}
String currentStoragePolicy = currentAlterOps.getTableStoragePolicy(alterClauses);
// check currentStoragePolicy resource exist.
- InternalDataSource.checkStoragePolicyExist(currentStoragePolicy);
+ Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(currentStoragePolicy);
olapTable.setStoragePolicy(currentStoragePolicy);
needProcessOutsideTableLock = true;
@@ -713,7 +712,7 @@ public class Alter {
String currentStoragePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties);
if (!currentStoragePolicy.equals("")) {
// check currentStoragePolicy resource exist.
- InternalDataSource.checkStoragePolicyExist(currentStoragePolicy);
+ Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(currentStoragePolicy);
partitionInfo.setStoragePolicy(partition.getId(), currentStoragePolicy);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java
index d0fea950c8..ab94a30830 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterPolicyStmt.java
@@ -19,7 +19,6 @@ package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
@@ -77,10 +76,10 @@ public class AlterPolicyStmt extends DdlStmt {
StoragePolicy storagePolicy = (StoragePolicy) hasPolicy.get();
// default storage policy use alter storage policy to add s3 resource.
- if (!policyName.equalsIgnoreCase(Config.default_storage_policy)
- && properties.containsKey(StoragePolicy.STORAGE_RESOURCE)) {
+ if (!policyName.equalsIgnoreCase(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME) && properties.containsKey(
+ StoragePolicy.STORAGE_RESOURCE)) {
throw new AnalysisException("not support change storage policy's storage resource"
- + ", you can change s3 properties by alter resource");
+ + ", you can change s3 properties by alter resource");
}
boolean hasCooldownDatetime = false;
@@ -114,7 +113,7 @@ public class AlterPolicyStmt extends DdlStmt {
}
do {
- if (policyName.equalsIgnoreCase(Config.default_storage_policy)) {
+ if (policyName.equalsIgnoreCase(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME)) {
// default storage policy
if (storagePolicy.getStorageResource() != null && hasCooldownDatetime) {
// alter cooldown datetime, can do
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index e4f67a461f..d865769094 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -1261,6 +1261,8 @@ public class Catalog {
initDefaultCluster();
}
+ getPolicyMgr().createDefaultStoragePolicy();
+
// MUST set master ip before starting checkpoint thread.
// because checkpoint thread need this info to select non-master FE to push image
this.masterIp = FrontendOptions.getLocalHostAddress();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
index afbf36d935..6f194180be 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
@@ -32,7 +32,6 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.persist.DropResourceOperationLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.policy.Policy;
-import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.qe.ConnectContext;
@@ -109,7 +108,7 @@ public class ResourceMgr implements Writable {
}
// Check whether the resource is in use before deleting it, except spark resource
- StoragePolicy checkedStoragePolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, null);
+ StoragePolicy checkedStoragePolicy = StoragePolicy.ofCheck(null);
checkedStoragePolicy.setStorageResource(resourceName);
if (Catalog.getCurrentCatalog().getPolicyMgr().existPolicy(checkedStoragePolicy)) {
Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedStoragePolicy);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 307d0be79a..7367f97e8f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1691,9 +1691,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static boolean use_date_v2_by_default = false;
- @ConfField
- public static String default_storage_policy = "default_storage_policy";
-
@ConfField(mutable = false, masterOnly = true)
public static boolean enable_multi_tags = false;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
index e308b93b9c..173a47f9aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/DynamicPartitionUtil.java
@@ -38,7 +38,6 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
-import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.policy.StoragePolicy;
import com.google.common.base.Preconditions;
@@ -343,23 +342,24 @@ public class DynamicPartitionUtil {
}
}
- private static void checkRemoteStoragePolicy(String val) throws DdlException {
- if (Strings.isNullOrEmpty(val)) {
+ private static void checkRemoteStoragePolicy(String policyName) throws DdlException {
+ if (Strings.isNullOrEmpty(policyName)) {
LOG.info(DynamicPartitionProperty.REMOTE_STORAGE_POLICY + " is null, remove this key");
return;
}
- if (val.isEmpty()) {
+ if (policyName.isEmpty()) {
throw new DdlException(DynamicPartitionProperty.REMOTE_STORAGE_POLICY + " is empty.");
}
- StoragePolicy checkedPolicyCondition = new StoragePolicy(PolicyTypeEnum.STORAGE, val);
+ StoragePolicy checkedPolicyCondition = StoragePolicy.ofCheck(policyName);
if (!Catalog.getCurrentCatalog().getPolicyMgr().existPolicy(checkedPolicyCondition)) {
- throw new DdlException(DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + val + " doesn't exist.");
+ throw new DdlException(
+ DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + policyName + " doesn't exist.");
}
StoragePolicy storagePolicy = (StoragePolicy) Catalog.getCurrentCatalog()
.getPolicyMgr().getPolicy(checkedPolicyCondition);
if (Strings.isNullOrEmpty(storagePolicy.getCooldownTtl())) {
throw new DdlException("Storage policy cooldown type need to be cooldownTtl for properties "
- + DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + val);
+ + DynamicPartitionProperty.REMOTE_STORAGE_POLICY + ": " + policyName);
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 76df8ad09d..0f2b861ba3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -31,7 +31,6 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.policy.Policy;
-import org.apache.doris.policy.PolicyTypeEnum;
import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.resource.Tag;
import org.apache.doris.thrift.TCompressionType;
@@ -202,7 +201,7 @@ public class PropertyAnalyzer {
if (hasRemoteStoragePolicy) {
// check remote storage policy
- StoragePolicy checkedPolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, remoteStoragePolicy);
+ StoragePolicy checkedPolicy = StoragePolicy.ofCheck(remoteStoragePolicy);
Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedPolicy);
if (!(policy instanceof StoragePolicy)) {
throw new AnalysisException("No PolicyStorage: " + remoteStoragePolicy);
@@ -527,7 +526,7 @@ public class PropertyAnalyzer {
if (properties != null && properties.containsKey(PROPERTIES_REMOTE_STORAGE_POLICY)) {
remoteStoragePolicy = properties.get(PROPERTIES_REMOTE_STORAGE_POLICY);
// check remote storage policy existence
- StoragePolicy checkedStoragePolicy = new StoragePolicy(PolicyTypeEnum.STORAGE, remoteStoragePolicy);
+ StoragePolicy checkedStoragePolicy = StoragePolicy.ofCheck(remoteStoragePolicy);
Policy policy = Catalog.getCurrentCatalog().getPolicyMgr().getPolicy(checkedStoragePolicy);
if (!(policy instanceof StoragePolicy)) {
throw new AnalysisException("StoragePolicy: " + remoteStoragePolicy + " does not exist.");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
index 51f97d0116..434552ad62 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
@@ -150,9 +150,6 @@ import org.apache.doris.persist.PartitionPersistInfo;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.persist.TruncateTableInfo;
-import org.apache.doris.policy.Policy;
-import org.apache.doris.policy.PolicyTypeEnum;
-import org.apache.doris.policy.StoragePolicy;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
@@ -191,7 +188,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
@@ -1797,7 +1793,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
// set storage policy
String storagePolicy = PropertyAnalyzer.analyzeStoragePolicy(properties);
- checkStoragePolicyExist(storagePolicy);
+ Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(storagePolicy);
olapTable.setStoragePolicy(storagePolicy);
@@ -1992,7 +1988,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
if (!partionStoragePolicy.equals("")) {
storagePolicy = partionStoragePolicy;
}
- checkStoragePolicyExist(storagePolicy);
+ Catalog.getCurrentCatalog().getPolicyMgr().checkStoragePolicyExist(storagePolicy);
Partition partition = createPartitionWithIndices(db.getClusterName(), db.getId(), olapTable.getId(),
olapTable.getBaseIndexId(), entry.getValue(), entry.getKey(), olapTable.getIndexIdToMeta(),
partitionDistributionInfo, dataProperty.getStorageMedium(),
@@ -2051,24 +2047,6 @@ public class InternalDataSource implements DataSourceIf<Database> {
}
}
- public static void checkStoragePolicyExist(String storagePolicy) throws DdlException {
- if (!storagePolicy.equals("")) {
- // when create table use storage policy
- // if not exist default storage policy, create it
- // if exist, just return.
- Catalog.getCurrentCatalog().getPolicyMgr().createDefaultStoragePolicy();
-
- List<Policy> policiesByType = Catalog.getCurrentCatalog()
- .getPolicyMgr().getPoliciesByType(PolicyTypeEnum.STORAGE);
- policiesByType.stream().filter(policy -> policy.getPolicyName().equals(storagePolicy)).findAny()
- .orElseThrow(() -> new DdlException("Storage policy does not exist. name: " + storagePolicy));
- Optional<Policy> hasDefaultPolicy = policiesByType.stream()
- .filter(policy -> policy.getPolicyName().equals(Config.default_storage_policy)).findAny();
-
- StoragePolicy.checkDefaultStoragePolicyValid(storagePolicy, hasDefaultPolicy);
- }
- }
-
private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
index acc3d8b0c2..a46773a39d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/Policy.java
@@ -57,10 +57,8 @@ public abstract class Policy implements Writable, GsonPostProcessable {
@SerializedName(value = "policyName")
protected String policyName = null;
- public Policy() {
- if (Catalog.getCurrentCatalog().isMaster()) {
- policyId = Catalog.getCurrentCatalog().getNextId();
- }
+ public Policy(PolicyTypeEnum type) {
+ this.type = type;
}
/**
@@ -69,8 +67,8 @@ public abstract class Policy implements Writable, GsonPostProcessable {
* @param type policy type
* @param policyName policy name
*/
- public Policy(final PolicyTypeEnum type, final String policyName) {
- policyId = Catalog.getCurrentCatalog().getNextId();
+ public Policy(long policyId, final PolicyTypeEnum type, final String policyName) {
+ this.policyId = policyId;
this.type = type;
this.policyName = policyName;
}
@@ -79,13 +77,13 @@ public abstract class Policy implements Writable, GsonPostProcessable {
* Trans stmt to Policy.
**/
public static Policy fromCreateStmt(CreatePolicyStmt stmt) throws AnalysisException {
+ long policyId = Catalog.getCurrentCatalog().getNextId();
switch (stmt.getType()) {
case STORAGE:
- StoragePolicy storagePolicy = new StoragePolicy(stmt.getType(), stmt.getPolicyName());
+ StoragePolicy storagePolicy = new StoragePolicy(policyId, stmt.getPolicyName());
storagePolicy.init(stmt.getProperties(), stmt.isIfNotExists());
return storagePolicy;
case ROW:
- default:
// stmt must be analyzed.
DatabaseIf db = Catalog.getCurrentCatalog().getDataSourceMgr()
.getCatalogOrAnalysisException(stmt.getTableName().getCtl())
@@ -93,9 +91,10 @@ public abstract class Policy implements Writable, GsonPostProcessable {
UserIdentity userIdent = stmt.getUser();
userIdent.analyze(ConnectContext.get().getClusterName());
TableIf table = db.getTableOrAnalysisException(stmt.getTableName().getTbl());
- return new RowPolicy(stmt.getType(), stmt.getPolicyName(), db.getId(), userIdent,
- stmt.getOrigStmt().originStmt, table.getId(), stmt.getFilterType(),
- stmt.getWherePredicate());
+ return new RowPolicy(policyId, stmt.getPolicyName(), db.getId(), userIdent,
+ stmt.getOrigStmt().originStmt, table.getId(), stmt.getFilterType(), stmt.getWherePredicate());
+ default:
+ throw new AnalysisException("Unknown policy type: " + stmt.getType());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
index 7bdbf7b203..7759ab381b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java
@@ -24,7 +24,6 @@ import org.apache.doris.analysis.DropPolicyStmt;
import org.apache.doris.analysis.ShowPolicyStmt;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
@@ -40,6 +39,7 @@ import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import org.apache.parquet.Strings;
import java.io.DataInput;
import java.io.DataOutput;
@@ -89,17 +89,16 @@ public class PolicyMgr implements Writable {
lock.readLock().unlock();
}
- public void createDefaultStoragePolicy() throws DdlException {
- Optional<Policy> hasDefault = findPolicy(Config.default_storage_policy, PolicyTypeEnum.STORAGE);
- if (hasDefault.isPresent()) {
- // already exist default storage policy, just return.
- return;
- }
-
+ public void createDefaultStoragePolicy() {
writeLock();
try {
- StoragePolicy defaultStoragePolicy =
- new StoragePolicy(PolicyTypeEnum.STORAGE, Config.default_storage_policy);
+ Optional<Policy> hasDefault = findPolicy(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME, PolicyTypeEnum.STORAGE);
+ if (hasDefault.isPresent()) {
+ // already exist default storage policy, just return.
+ return;
+ }
+ long policyId = Catalog.getCurrentCatalog().getNextId();
+ StoragePolicy defaultStoragePolicy = new StoragePolicy(policyId, StoragePolicy.DEFAULT_STORAGE_POLICY_NAME);
unprotectedAdd(defaultStoragePolicy);
Catalog.getCurrentCatalog().getEditLog().logCreatePolicy(defaultStoragePolicy);
} finally {
@@ -413,10 +412,6 @@ public class PolicyMgr implements Writable {
throw new DdlException("Current not support alter row policy");
}
- if (storagePolicyName.equalsIgnoreCase(Config.default_storage_policy)) {
- createDefaultStoragePolicy();
- }
-
Optional<Policy> policy = findPolicy(storagePolicyName, PolicyTypeEnum.STORAGE);
if (!policy.isPresent()) {
@@ -429,4 +424,23 @@ public class PolicyMgr implements Writable {
Catalog.getCurrentCatalog().getEditLog().logAlterStoragePolicy(storagePolicy);
LOG.info("Alter storage policy success. policy: {}", storagePolicy);
}
+
+ public void checkStoragePolicyExist(String storagePolicyName) throws DdlException {
+ if (Strings.isNullOrEmpty(storagePolicyName)) {
+ return;
+ }
+ readLock();
+ try {
+ List<Policy> policiesByType = Catalog.getCurrentCatalog().getPolicyMgr()
+ .getPoliciesByType(PolicyTypeEnum.STORAGE);
+ policiesByType.stream().filter(policy -> policy.getPolicyName().equals(storagePolicyName)).findAny()
+ .orElseThrow(() -> new DdlException("Storage policy does not exist. name: " + storagePolicyName));
+ Optional<Policy> hasDefaultPolicy = policiesByType.stream()
+ .filter(policy -> policy.getPolicyName().equals(StoragePolicy.DEFAULT_STORAGE_POLICY_NAME))
+ .findAny();
+ StoragePolicy.checkDefaultStoragePolicyValid(storagePolicyName, hasDefaultPolicy);
+ } finally {
+ readUnlock();
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java
index 94c0458069..925463d279 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/RowPolicy.java
@@ -88,12 +88,14 @@ public class RowPolicy extends Policy {
private Expr wherePredicate = null;
- public RowPolicy() {}
+ public RowPolicy() {
+ super(PolicyTypeEnum.ROW);
+ }
/**
* Policy for Table. Policy of ROW or others.
*
- * @param type PolicyType
+ * @param policyId policy id
* @param policyName policy name
* @param dbId database i
* @param user username
@@ -102,10 +104,9 @@ public class RowPolicy extends Policy {
* @param filterType filter type
* @param wherePredicate where predicate
*/
- public RowPolicy(final PolicyTypeEnum type, final String policyName, long dbId,
- UserIdentity user, String originStmt, final long tableId,
- final FilterType filterType, final Expr wherePredicate) {
- super(type, policyName);
+ public RowPolicy(long policyId, final String policyName, long dbId, UserIdentity user, String originStmt,
+ final long tableId, final FilterType filterType, final Expr wherePredicate) {
+ super(policyId, PolicyTypeEnum.ROW, policyName);
this.user = user;
this.dbId = dbId;
this.tableId = tableId;
@@ -141,8 +142,8 @@ public class RowPolicy extends Policy {
@Override
public RowPolicy clone() {
- return new RowPolicy(this.type, this.policyName, this.dbId, this.user, this.originStmt, this.tableId,
- this.filterType, this.wherePredicate);
+ return new RowPolicy(this.policyId, this.policyName, this.dbId, this.user, this.originStmt, this.tableId,
+ this.filterType, this.wherePredicate);
}
private boolean checkMatched(long dbId, long tableId, PolicyTypeEnum type,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
index 0a754e9ec6..b45ccba7e4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.S3Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
-import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.persist.gson.GsonUtils;
@@ -57,16 +56,18 @@ import java.util.Optional;
**/
@Data
public class StoragePolicy extends Policy {
- public static boolean checkDefaultStoragePolicyValid(final String storagePolicyName,
- Optional<Policy> defaultPolicy) throws DdlException {
+ public static final String DEFAULT_STORAGE_POLICY_NAME = "default_storage_policy";
+
+ public static boolean checkDefaultStoragePolicyValid(final String storagePolicyName, Optional<Policy> defaultPolicy)
+ throws DdlException {
if (!defaultPolicy.isPresent()) {
return false;
}
- if (storagePolicyName.equalsIgnoreCase(Config.default_storage_policy)
- && (((StoragePolicy) defaultPolicy.get()).getStorageResource() == null)) {
+ if (storagePolicyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) && (
+ ((StoragePolicy) defaultPolicy.get()).getStorageResource() == null)) {
throw new DdlException("Use default storage policy, but not give s3 info,"
- + " please use alter resource to add default storage policy S3 info.");
+ + " please use alter resource to add default storage policy S3 info.");
}
return true;
}
@@ -116,21 +117,23 @@ public class StoragePolicy extends Policy {
private Map<String, String> props;
- public StoragePolicy() {}
+ public StoragePolicy() {
+ super(PolicyTypeEnum.STORAGE);
+ }
/**
* Policy for Storage Migration.
*
- * @param type PolicyType
+ * @param policyId policy id
* @param policyName policy name
* @param storageResource resource name for storage
* @param cooldownDatetime cool down time
* @param cooldownTtl cool down time cost after partition is created
* @param cooldownTtlMs seconds for cooldownTtl
*/
- public StoragePolicy(final PolicyTypeEnum type, final String policyName, final String storageResource,
- final Date cooldownDatetime, final String cooldownTtl, long cooldownTtlMs) {
- super(type, policyName);
+ public StoragePolicy(long policyId, final String policyName, final String storageResource,
+ final Date cooldownDatetime, final String cooldownTtl, long cooldownTtlMs) {
+ super(policyId, PolicyTypeEnum.STORAGE, policyName);
this.storageResource = storageResource;
this.cooldownDatetime = cooldownDatetime;
this.cooldownTtl = cooldownTtl;
@@ -140,11 +143,17 @@ public class StoragePolicy extends Policy {
/**
* Policy for Storage Migration.
*
- * @param type PolicyType
+ * @param policyId policy id
* @param policyName policy name
*/
- public StoragePolicy(final PolicyTypeEnum type, final String policyName) {
- super(type, policyName);
+ public StoragePolicy(long policyId, final String policyName) {
+ super(policyId, PolicyTypeEnum.STORAGE, policyName);
+ }
+
+ public static StoragePolicy ofCheck(String policyName) {
+ StoragePolicy storagePolicy = new StoragePolicy();
+ storagePolicy.policyName = policyName;
+ return storagePolicy;
}
/**
@@ -240,8 +249,8 @@ public class StoragePolicy extends Policy {
@Override
public StoragePolicy clone() {
- return new StoragePolicy(this.type, this.policyName, this.storageResource,
- this.cooldownDatetime, this.cooldownTtl, this.cooldownTtlMs);
+ return new StoragePolicy(this.policyId, this.policyName, this.storageResource, this.cooldownDatetime,
+ this.cooldownTtl, this.cooldownTtlMs);
}
@Override
@@ -356,11 +365,10 @@ public class StoragePolicy extends Policy {
}
});
- if (policyName.equalsIgnoreCase(Config.default_storage_policy) && storageResource == null) {
+ if (policyName.equalsIgnoreCase(DEFAULT_STORAGE_POLICY_NAME) && storageResource == null) {
// here first time set S3 resource to default storage policy.
- String alterStorageResource = Optional.ofNullable(properties.get(STORAGE_RESOURCE))
- .orElseThrow(() ->
- new DdlException("first time set default storage policy, but not give storageResource"));
+ String alterStorageResource = Optional.ofNullable(properties.get(STORAGE_RESOURCE)).orElseThrow(
+ () -> new DdlException("first time set default storage policy, but not give storageResource"));
// check alterStorageResource resource exist.
checkIsS3ResourceAndExist(alterStorageResource);
storageResource = alterStorageResource;
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
index 3621cbcbad..25937af53f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CancelLoadStmtTest.java
@@ -18,6 +18,9 @@
package org.apache.doris.analysis;
import org.apache.doris.analysis.CompoundPredicate.Operator;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
@@ -36,13 +39,15 @@ import java.util.List;
public class CancelLoadStmtTest extends TestWithFeService {
private Analyzer analyzer;
+ private String dbName = "testDb";
+ private String tblName = "table1";
@Override
protected void runBeforeAll() throws Exception {
FeConstants.runningUnitTest = true;
- createDatabase("testDb");
- useDatabase("testDb");
- createTable("create table table1\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+ createDatabase(dbName);
+ useDatabase(dbName);
+ createTable("create table " + tblName + "\n" + "(k1 int, k2 int) distributed by hash(k1) buckets 1\n"
+ "properties(\"replication_num\" = \"1\");");
analyzer = new Analyzer(connectContext.getCatalog(), connectContext);
}
@@ -55,36 +60,36 @@ public class CancelLoadStmtTest extends TestWithFeService {
SlotRef stateSlotRef = new SlotRef(null, "state");
StringLiteral stateStringLiteral = new StringLiteral("FINISHED");
- BinaryPredicate labelBinaryPredicate =
- new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef, labelStringLiteral);
+ BinaryPredicate labelBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, labelSlotRef,
+ labelStringLiteral);
CancelLoadStmt stmt = new CancelLoadStmt(null, labelBinaryPredicate);
stmt.analyze(analyzer);
Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label'",
stmt.toString());
- BinaryPredicate stateBinaryPredicate =
- new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral);
+ BinaryPredicate stateBinaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef,
+ stateStringLiteral);
stmt = new CancelLoadStmt(null, stateBinaryPredicate);
stmt.analyze(analyzer);
Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `state` = 'FINISHED'", stmt.toString());
- LikePredicate labelLikePredicate =
- new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef, labelStringLiteral);
+ LikePredicate labelLikePredicate = new LikePredicate(LikePredicate.Operator.LIKE, labelSlotRef,
+ labelStringLiteral);
stmt = new CancelLoadStmt(null, labelLikePredicate);
stmt.analyze(analyzer);
Assertions.assertEquals("CANCEL LOAD FROM default_cluster:testDb WHERE `label` LIKE 'doris_test_label'",
stmt.toString());
- CompoundPredicate compoundAndPredicate =
- new CompoundPredicate(Operator.AND, labelBinaryPredicate, stateBinaryPredicate);
+ CompoundPredicate compoundAndPredicate = new CompoundPredicate(Operator.AND, labelBinaryPredicate,
+ stateBinaryPredicate);
stmt = new CancelLoadStmt(null, compoundAndPredicate);
stmt.analyze(analyzer);
Assertions.assertEquals(
"CANCEL LOAD FROM default_cluster:testDb WHERE `label` = 'doris_test_label' AND `state` = 'FINISHED'",
stmt.toString());
- CompoundPredicate compoundOrPredicate =
- new CompoundPredicate(Operator.OR, labelBinaryPredicate, stateBinaryPredicate);
+ CompoundPredicate compoundOrPredicate = new CompoundPredicate(Operator.OR, labelBinaryPredicate,
+ stateBinaryPredicate);
stmt = new CancelLoadStmt(null, compoundOrPredicate);
stmt.analyze(analyzer);
Assertions.assertEquals(
@@ -93,11 +98,15 @@ public class CancelLoadStmtTest extends TestWithFeService {
// test match
List<LoadJob> loadJobs = new ArrayList<>();
- InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, 10003L, 10005L, 0, "", "");
+ Database db = Catalog.getCurrentInternalCatalog().getDbOrMetaException("default_cluster:testDb");
+ long dbId = db.getId();
+ Table tbl = db.getTableNullable(tblName);
+ long tblId = tbl.getId();
+ InsertLoadJob insertLoadJob1 = new InsertLoadJob("doris_test_label", 1L, dbId, tblId, 0, "", "");
loadJobs.add(insertLoadJob1);
- InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, 10003L, 10005L, 0, "", "");
+ InsertLoadJob insertLoadJob2 = new InsertLoadJob("doris_test_label_1", 2L, dbId, tblId, 0, "", "");
loadJobs.add(insertLoadJob2);
- InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, 10003L, 10005L, 0, "", "");
+ InsertLoadJob insertLoadJob3 = new InsertLoadJob("doris_test_label_2", 3L, dbId, tblId, 0, "", "");
loadJobs.add(insertLoadJob3);
// label
stmt = new CancelLoadStmt(null, labelBinaryPredicate);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java
index 800b520836..5fa893b52b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/policy/PolicyTest.java
@@ -201,13 +201,13 @@ public class PolicyTest extends TestWithFeService {
long dbId = 10;
UserIdentity user = new UserIdentity("test_policy", "%");
String originStmt = "CREATE ROW POLICY test_row_policy ON test.table1"
- + " AS PERMISSIVE TO test_policy USING (k1 = 1)";
+ + " AS PERMISSIVE TO test_policy USING (k1 = 1)";
long tableId = 100;
FilterType filterType = FilterType.PERMISSIVE;
Expr wherePredicate = null;
- Policy rowPolicy = new RowPolicy(type, policyName, dbId, user,
- originStmt, tableId, filterType, wherePredicate);
+ Policy rowPolicy = new RowPolicy(10000, policyName, dbId, user, originStmt, tableId, filterType,
+ wherePredicate);
ByteArrayOutputStream emptyOutputStream = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(emptyOutputStream);
@@ -218,6 +218,7 @@ public class PolicyTest extends TestWithFeService {
Policy newPolicy = Policy.read(input);
Assertions.assertTrue(newPolicy instanceof RowPolicy);
RowPolicy newRowPolicy = (RowPolicy) newPolicy;
+ Assertions.assertEquals(rowPolicy.getPolicyId(), newRowPolicy.getPolicyId());
Assertions.assertEquals(type, newRowPolicy.getType());
Assertions.assertEquals(policyName, newRowPolicy.getPolicyName());
Assertions.assertEquals(dbId, newRowPolicy.getDbId());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org