You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/07/11 03:03:07 UTC

[GitHub] [doris] pengxiangyu commented on a diff in pull request #10693: [Feature] move cold data to object storage without losing any feature…

pengxiangyu commented on code in PR #10693:
URL: https://github.com/apache/doris/pull/10693#discussion_r917497115


##########
fe/fe-core/src/main/java/org/apache/doris/task/NotifyUpdateStoragePolicyTask.java:
##########
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.catalog.S3Resource;
+import org.apache.doris.policy.StoragePolicy;
+import org.apache.doris.thrift.TGetStoragePolicy;
+import org.apache.doris.thrift.TS3StorageParam;
+import org.apache.doris.thrift.TTaskType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+
+public class NotifyUpdateStoragePolicyTask extends AgentTask {
+    private static final Logger LOG = LogManager.getLogger(NotifyUpdateStoragePolicyTask.class);
+    private String policyName;
+    private Map<String, String> properties;
+
+    public NotifyUpdateStoragePolicyTask(long backendId, String name, Map<String, String> properties) {
+        super(null, backendId, TTaskType.NOTIFY_UPDATE_STORAGE_POLICY, -1, -1, -1, -1, -1, -1, -1);
+        this.policyName = name;
+        this.properties = properties;
+    }
+
+    public TGetStoragePolicy toThrift() {
+        TGetStoragePolicy ret = new TGetStoragePolicy();
+
+        ret.policy_name = policyName;
+        ret.cooldown_datetime = Long.parseLong(properties.get(StoragePolicy.COOLDOWN_DATETIME));
+        ret.cooldown_ttl = Long.parseLong(properties.get(StoragePolicy.COOLDOWN_TTL));
+        ret.s3_storage_param = new TS3StorageParam();
+        ret.s3_storage_param.s3_max_conn = Integer.parseInt(properties.get(S3Resource.S3_MAX_CONNECTIONS));
+        ret.s3_storage_param.s3_request_timeout_ms = Integer.parseInt(properties.get(S3Resource.S3_REQUEST_TIMEOUT_MS));
+        ret.s3_storage_param.s3_conn_timeout_ms = Integer.parseInt(properties.get(S3Resource.S3_CONNECTION_TIMEOUT_MS));

Review Comment:
   It will cause an exception when the value is not an integer, check it and set a default value for invalid resource. 



##########
fe/fe-core/src/main/java/org/apache/doris/task/DropReplicaTask.java:
##########
@@ -23,11 +23,14 @@
 public class DropReplicaTask extends AgentTask {
     private int schemaHash; // set -1L as unknown
     private long replicaId;
+    private boolean isDropTableOrPartition;

Review Comment:
   what is the use case for isDropTableOrPartition? Add some annotation.



##########
fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java:
##########
@@ -19,29 +19,58 @@
 
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Resource;
+import org.apache.doris.catalog.S3Resource;
 import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ShowResultSetMetaData;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
+import org.apache.doris.task.NotifyUpdateStoragePolicyTask;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 import lombok.Data;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.io.DataInput;
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
 
 /**
  * Save policy for storage migration.
  **/
 @Data
 public class StoragePolicy extends Policy {
+    public static boolean checkDefaultStoragePolicyValid(final String storagePolicyName,
+                                                         Optional<Policy> defaultPolicy) throws DdlException {
+        if (!defaultPolicy.isPresent()) {
+            return false;
+        }
+

Review Comment:
   Please don't check s3 type in StoragePolicy, it's not for s3 resource only.



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -992,6 +999,57 @@ public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request
 
     @Override
     public TGetStoragePolicyResult refreshStoragePolicy() throws TException {
-        throw new TException("not implement");
+        LOG.debug("refresh storage policy request");
+        TGetStoragePolicyResult result = new TGetStoragePolicyResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+
+        List<Policy> policyList = Catalog.getCurrentCatalog().getPolicyMgr().getPoliciesByType(PolicyTypeEnum.STORAGE);
+        policyList.forEach(
+                iter -> {
+                    // default policy not init.
+                    if (((StoragePolicy) iter).getStorageResource() == null) {
+                        return;
+                    }
+                    TGetStoragePolicy rEntry = new TGetStoragePolicy();
+                    rEntry.setPolicyName(iter.getPolicyName());
+                    //java 8 not support ifPresentOrElse
+                    final long[] ttlCoolDown = {-1};
+                    Optional.ofNullable(((StoragePolicy) iter).getCooldownTtl())
+                            .ifPresent(ttl -> ttlCoolDown[0] = Integer.parseInt(ttl));
+                    rEntry.setCooldownTtl(ttlCoolDown[0]);
+
+                    final long[] secondTimestamp = {-1};
+                    Optional.ofNullable(((StoragePolicy) iter).getCooldownDatetime())
+                        .ifPresent(date -> secondTimestamp[0] = date.getTime() / 1000);
+                    rEntry.setCooldownDatetime(secondTimestamp[0]);
+
+                    Optional.ofNullable(((StoragePolicy) iter).getMd5Checksum()).ifPresent(rEntry::setMd5Checksum);
+
+                    TS3StorageParam s3Info = new TS3StorageParam();
+                    Optional.ofNullable(((StoragePolicy) iter).getStorageResource()).ifPresent(resource -> {

Review Comment:
   Only s3?what abort hdfs and others?



##########
fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java:
##########
@@ -149,26 +190,55 @@ public void init(final Map<String, String> props) throws AnalysisException {
         if (!hasCooldownDatetime && !hasCooldownTtl) {
             throw new AnalysisException(COOLDOWN_DATETIME + " or " + COOLDOWN_TTL + " must be set");
         }
-        if (!Catalog.getCurrentCatalog().getResourceMgr().containsResource(this.storageResource)) {
-            throw new AnalysisException("storage resource doesn't exist: " + this.storageResource);
+
+        // no set ttl use -1
+        if (!hasCooldownTtl) {
+            this.cooldownTtl = "-1";
+        }
+
+        Resource r = checkIsS3ResourceAndExist(this.storageResource);
+        if (!((S3Resource) r).policyAddToSet(super.getPolicyName()) && !ifNotExists) {
+            throw new AnalysisException("this policy has been added to s3 resource once, policy has been created.");
+        }
+        this.md5Checksum = calcPropertiesMd5();
+    }
+
+    private static Resource checkIsS3ResourceAndExist(final String storageResource) throws AnalysisException {
+        // check storage_resource type is S3, current just support S3
+        Resource resource =
+                Optional.ofNullable(Catalog.getCurrentCatalog().getResourceMgr().getResource(storageResource))
+                    .orElseThrow(() -> new AnalysisException("storage resource doesn't exist: " + storageResource));
+
+        if (resource.getType() != Resource.ResourceType.S3) {
+            throw new AnalysisException("current storage policy just support resource type S3");
         }
+        return resource;
     }
 
     /**
      * Use for SHOW POLICY.
      **/
     public List<String> getShowInfo() throws AnalysisException {
-        String props = "";
+        final String[] props = {""};

Review Comment:
   where is props[1], props[2]....? Why do you change it to String[] ?



##########
fe/fe-core/src/main/java/org/apache/doris/task/NotifyUpdateStoragePolicyTask.java:
##########
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.catalog.S3Resource;
+import org.apache.doris.policy.StoragePolicy;
+import org.apache.doris.thrift.TGetStoragePolicy;
+import org.apache.doris.thrift.TS3StorageParam;
+import org.apache.doris.thrift.TTaskType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Map;
+
+public class NotifyUpdateStoragePolicyTask extends AgentTask {
+    private static final Logger LOG = LogManager.getLogger(NotifyUpdateStoragePolicyTask.class);
+    private String policyName;
+    private Map<String, String> properties;
+
+    public NotifyUpdateStoragePolicyTask(long backendId, String name, Map<String, String> properties) {
+        super(null, backendId, TTaskType.NOTIFY_UPDATE_STORAGE_POLICY, -1, -1, -1, -1, -1, -1, -1);
+        this.policyName = name;
+        this.properties = properties;
+    }
+
+    public TGetStoragePolicy toThrift() {
+        TGetStoragePolicy ret = new TGetStoragePolicy();
+
+        ret.policy_name = policyName;
+        ret.cooldown_datetime = Long.parseLong(properties.get(StoragePolicy.COOLDOWN_DATETIME));
+        ret.cooldown_ttl = Long.parseLong(properties.get(StoragePolicy.COOLDOWN_TTL));

Review Comment:
   Only s3?what abort the other resources?



##########
fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java:
##########
@@ -244,4 +314,100 @@ private static long getMsByCooldownTtl(String cooldownTtl) throws AnalysisExcept
         }
         return cooldownTtlMs;
     }
+

Review Comment:
   Add annotation for this function



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -992,6 +999,57 @@ public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request
 
     @Override
     public TGetStoragePolicyResult refreshStoragePolicy() throws TException {
-        throw new TException("not implement");
+        LOG.debug("refresh storage policy request");
+        TGetStoragePolicyResult result = new TGetStoragePolicyResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+
+        List<Policy> policyList = Catalog.getCurrentCatalog().getPolicyMgr().getPoliciesByType(PolicyTypeEnum.STORAGE);
+        policyList.forEach(
+                iter -> {
+                    // default policy not init.

Review Comment:
   Check the type of iter by 'instance of' is needed.



##########
fe/fe-core/src/main/java/org/apache/doris/policy/PolicyMgr.java:
##########
@@ -362,4 +398,30 @@ public static PolicyMgr read(DataInput in) throws IOException {
         policyMgr.updateMergeTablePolicyMap();
         return policyMgr;
     }
+
+    public Optional<Policy> findStoragePolicy(final String storagePolicyName) {

Review Comment:
   findPolicy() is better. This is not a Manager for StoragePolicy only.



##########
fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java:
##########
@@ -304,6 +305,8 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception {
             catalog.getPolicyMgr().createPolicy((CreatePolicyStmt) ddlStmt);
         } else if (ddlStmt instanceof DropPolicyStmt) {
             catalog.getPolicyMgr().dropPolicy((DropPolicyStmt) ddlStmt);
+        } else if (ddlStmt instanceof AlterStoragePolicyStmt) {

Review Comment:
   AlterPolicyStmt is better, it's not for storage only.



##########
fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java:
##########
@@ -115,12 +148,17 @@ public StoragePolicy(final PolicyTypeEnum type, final String policyName) {
         super(type, policyName);
     }
 
+    public static boolean isInteger(String str) {

Review Comment:
   Interger.parse() is enough. please don't create a basic function here.



##########
fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java:
##########
@@ -992,6 +999,57 @@ public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request
 
     @Override
     public TGetStoragePolicyResult refreshStoragePolicy() throws TException {
-        throw new TException("not implement");
+        LOG.debug("refresh storage policy request");
+        TGetStoragePolicyResult result = new TGetStoragePolicyResult();
+        TStatus status = new TStatus(TStatusCode.OK);
+        result.setStatus(status);
+
+        List<Policy> policyList = Catalog.getCurrentCatalog().getPolicyMgr().getPoliciesByType(PolicyTypeEnum.STORAGE);
+        policyList.forEach(
+                iter -> {
+                    // default policy not init.
+                    if (((StoragePolicy) iter).getStorageResource() == null) {
+                        return;
+                    }
+                    TGetStoragePolicy rEntry = new TGetStoragePolicy();
+                    rEntry.setPolicyName(iter.getPolicyName());
+                    //java 8 not support ifPresentOrElse
+                    final long[] ttlCoolDown = {-1};
+                    Optional.ofNullable(((StoragePolicy) iter).getCooldownTtl())

Review Comment:
   Create a not instance storagePolicy is better. Don't cast type every time.



##########
fe/fe-core/src/main/java/org/apache/doris/policy/StoragePolicy.java:
##########
@@ -149,26 +190,55 @@ public void init(final Map<String, String> props) throws AnalysisException {
         if (!hasCooldownDatetime && !hasCooldownTtl) {
             throw new AnalysisException(COOLDOWN_DATETIME + " or " + COOLDOWN_TTL + " must be set");
         }
-        if (!Catalog.getCurrentCatalog().getResourceMgr().containsResource(this.storageResource)) {
-            throw new AnalysisException("storage resource doesn't exist: " + this.storageResource);
+
+        // no set ttl use -1
+        if (!hasCooldownTtl) {
+            this.cooldownTtl = "-1";
+        }
+
+        Resource r = checkIsS3ResourceAndExist(this.storageResource);
+        if (!((S3Resource) r).policyAddToSet(super.getPolicyName()) && !ifNotExists) {
+            throw new AnalysisException("this policy has been added to s3 resource once, policy has been created.");
+        }
+        this.md5Checksum = calcPropertiesMd5();
+    }
+
+    private static Resource checkIsS3ResourceAndExist(final String storageResource) throws AnalysisException {

Review Comment:
   Don't check s3 resource in policy, Won't you add the other resource?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org