You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2020/07/24 02:03:54 UTC

[incubator-doris] branch master updated: Support checking database used data quota when data load job begin a new txn (#3955)

This is an automated email from the ASF dual-hosted git repository.

lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4608f97  Support checking database used data quota when data load job begin a new txn (#3955)
4608f97 is described below

commit 4608f9786ebaf0ab10b1b415d6a0b919f7b130ee
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Thu Jul 23 21:03:43 2020 -0500

    Support checking database used data quota when data load job begin a new txn (#3955)
    
    Now, we only check database used data quota when create or alter table, or in some old type load job, but not for routine load job and stream load job. This PR provide a uniform solution to check db used data quota when data load job begin a new txn.
---
 docs/en/administrator-guide/config/fe_config.md    |  6 +++
 docs/zh-CN/administrator-guide/config/fe_config.md |  6 +++
 .../java/org/apache/doris/catalog/Catalog.java     |  5 ++
 .../java/org/apache/doris/catalog/Database.java    | 17 +++---
 .../main/java/org/apache/doris/common/Config.java  |  6 +++
 .../doris/transaction/DatabaseTransactionMgr.java  | 28 ++++++++++
 .../doris/transaction/GlobalTransactionMgr.java    |  5 ++
 .../transaction/UpdateDbUsedDataQuotaDaemon.java   | 63 ++++++++++++++++++++++
 8 files changed, 127 insertions(+), 9 deletions(-)

diff --git a/docs/en/administrator-guide/config/fe_config.md b/docs/en/administrator-guide/config/fe_config.md
index 7612067..9d4f6ff 100644
--- a/docs/en/administrator-guide/config/fe_config.md
+++ b/docs/en/administrator-guide/config/fe_config.md
@@ -210,6 +210,12 @@ But at the same time, it will cause the submission of failed or failed execution
 
 ### `consistency_check_start_time`
 
+### `db_used_data_quota_update_interval_secs`
+
+For better data load performance, in the check of whether the amount of data used by the database before data load exceeds the quota, we do not calculate the amount of data already used by the database in real time, but obtain the periodically updated value of the daemon thread.
+
+This configuration is used to set the time interval for updating the value of the amount of data used by the database.
+
 ### `default_rowset_type`
 
 ### `default_storage_medium`
diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md b/docs/zh-CN/administrator-guide/config/fe_config.md
index ec41490..e3b90c4 100644
--- a/docs/zh-CN/administrator-guide/config/fe_config.md
+++ b/docs/zh-CN/administrator-guide/config/fe_config.md
@@ -208,6 +208,12 @@ FE 的配置项有两种方式进行配置:
 
 ### `consistency_check_start_time`
 
+### `db_used_data_quota_update_interval_secs`
+
+为了更好的数据导入性能,在数据导入之前的数据库已使用的数据量是否超出配额的检查中,我们并不实时计算数据库已经使用的数据量,而是获取后台线程周期性更新的值。
+
+该配置用于设置更新数据库使用的数据量的值的时间间隔。
+
 ### `default_rowset_type`
 
 ### `default_storage_medium`
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index aa61879..583b706 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -224,6 +224,7 @@ import com.sleepycat.je.rep.NetworkRestore;
 import com.sleepycat.je.rep.NetworkRestoreConfig;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.doris.transaction.UpdateDbUsedDataQuotaDaemon;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.codehaus.jackson.map.ObjectMapper;
@@ -294,6 +295,7 @@ public class Catalog {
     private BackupHandler backupHandler;
     private PublishVersionDaemon publishVersionDaemon;
     private DeleteHandler deleteHandler;
+    private UpdateDbUsedDataQuotaDaemon updateDbUsedDataQuotaDaemon;
 
     private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
     private MasterDaemon txnCleaner; // To clean aborted or timeout txns
@@ -478,6 +480,7 @@ public class Catalog {
         this.metaDir = Config.meta_dir;
         this.publishVersionDaemon = new PublishVersionDaemon();
         this.deleteHandler = new DeleteHandler();
+        this.updateDbUsedDataQuotaDaemon = new UpdateDbUsedDataQuotaDaemon();
 
         this.replayedJournalId = new AtomicLong(0L);
         this.isElectable = false;
@@ -1294,6 +1297,8 @@ public class Catalog {
         routineLoadTaskScheduler.start();
         // start dynamic partition task
         dynamicPartitionScheduler.start();
+        // start daemon thread to update db used data quota for db txn manager periodly
+        updateDbUsedDataQuotaDaemon.start();
     }
 
     // start threads that should running on all FE
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index 3c0fb15..2362879 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -87,9 +87,9 @@ public class Database extends MetaObject implements Writable {
     // user define function
     private ConcurrentMap<String, ImmutableList<Function>> name2Function = Maps.newConcurrentMap();
 
-    private long dataQuotaBytes;
+    private volatile long dataQuotaBytes;
 
-    private long replicaQuotaSize;
+    private volatile long replicaQuotaSize;
 
     public enum DbState {
         NORMAL, LINK, MOVE
@@ -203,7 +203,7 @@ public class Database extends MetaObject implements Writable {
         return replicaQuotaSize;
     }
 
-    public long getDataQuotaLeftWithLock() {
+    public long getUsedDataQuotaWithLock() {
         long usedDataQuota = 0;
         readLock();
         try {
@@ -215,9 +215,7 @@ public class Database extends MetaObject implements Writable {
                 OlapTable olapTable = (OlapTable) table;
                 usedDataQuota = usedDataQuota + olapTable.getDataSize();
             }
-
-            long leftDataQuota = dataQuotaBytes - usedDataQuota;
-            return Math.max(leftDataQuota, 0L);
+            return usedDataQuota;
         } finally {
             readUnlock();
         }
@@ -248,16 +246,17 @@ public class Database extends MetaObject implements Writable {
         Pair<Double, String> quotaUnitPair = DebugUtil.getByteUint(dataQuotaBytes);
         String readableQuota = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(quotaUnitPair.first) + " "
                 + quotaUnitPair.second;
-        long leftQuota = getDataQuotaLeftWithLock();
+        long usedDataQuota = getUsedDataQuotaWithLock();
+        long leftDataQuota = Math.max(dataQuotaBytes - usedDataQuota, 0);
 
-        Pair<Double, String> leftQuotaUnitPair = DebugUtil.getByteUint(leftQuota);
+        Pair<Double, String> leftQuotaUnitPair = DebugUtil.getByteUint(leftDataQuota);
         String readableLeftQuota = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(leftQuotaUnitPair.first) + " "
                 + leftQuotaUnitPair.second;
 
         LOG.info("database[{}] data quota: left bytes: {} / total: {}",
                  fullQualifiedName, readableLeftQuota, readableQuota);
 
-        if (leftQuota <= 0L) {
+        if (leftDataQuota <= 0L) {
             throw new DdlException("Database[" + fullQualifiedName
                     + "] data size exceeds quota[" + readableQuota + "]");
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 714c4ef..9c14647 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -884,6 +884,12 @@ public class Config extends ConfigBase {
      */
     @ConfField(mutable = true, masterOnly = true)
     public static boolean disable_load_job = false;
+
+    /*
+     * One master daemon thread will update database used data quota for db txn manager every db_used_data_quota_update_interval_secs
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static int db_used_data_quota_update_interval_secs = 300;
     
     /**
      * Load using hadoop cluster will be deprecated in future.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
index 4aad965..2c40344 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/DatabaseTransactionMgr.java
@@ -131,6 +131,9 @@ public class DatabaseTransactionMgr {
 
     private List<ClearTransactionTask> clearTransactionTasks = Lists.newArrayList();
 
+    // not realtime usedQuota value to make a fast check for database data quota
+    private volatile long usedQuotaDataBytes = -1;
+
     protected void readLock() {
         this.transactionLock.readLock().lock();
     }
@@ -246,6 +249,7 @@ public class DatabaseTransactionMgr {
     public long beginTransaction(List<Long> tableIdList, String label, TUniqueId requestId,
                                  TransactionState.TxnCoordinator coordinator, TransactionState.LoadJobSourceType sourceType, long listenerId, long timeoutSecond)
             throws DuplicatedRequestException, LabelAlreadyUsedException, BeginTransactionException, AnalysisException {
+        checkDatabaseDataQuota();
         writeLock();
         try {
             Preconditions.checkNotNull(coordinator);
@@ -310,6 +314,30 @@ public class DatabaseTransactionMgr {
     }
 
 
+    private void checkDatabaseDataQuota() throws AnalysisException {
+        Database db = catalog.getDb(dbId);
+        if (db == null) {
+            throw new AnalysisException("Database[" + dbId + "] does not exist");
+        }
+
+        if (usedQuotaDataBytes == -1) {
+            usedQuotaDataBytes = db.getUsedDataQuotaWithLock();
+        }
+
+        long dataQuotaBytes = db.getDataQuota();
+        if (usedQuotaDataBytes >= dataQuotaBytes) {
+            Pair<Double, String> quotaUnitPair = DebugUtil.getByteUint(dataQuotaBytes);
+            String readableQuota = DebugUtil.DECIMAL_FORMAT_SCALE_3.format(quotaUnitPair.first) + " "
+                    + quotaUnitPair.second;
+            throw new AnalysisException("Database[" + db.getFullName()
+                    + "] data size exceeds quota[" + readableQuota + "]");
+        }
+    }
+
+    public void updateDatabaseUsedQuotaData(long usedQuotaDataBytes) {
+        this.usedQuotaDataBytes = usedQuotaDataBytes;
+    }
+
     /**
      * commit transaction process as follows:
      * 1. validate whether `Load` is cancelled
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 9cb7254..ce35a81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -441,4 +441,9 @@ public class GlobalTransactionMgr implements Writable {
             }
         }
     }
+
+    public void updateDatabaseUsedQuotaData(long dbId, long usedQuotaDataBytes) throws AnalysisException {
+        DatabaseTransactionMgr dbTransactionMgr = getDatabaseTransactionMgr(dbId);
+        dbTransactionMgr.updateDatabaseUsedQuotaData(usedQuotaDataBytes);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/UpdateDbUsedDataQuotaDaemon.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/UpdateDbUsedDataQuotaDaemon.java
new file mode 100644
index 0000000..e8336a1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/UpdateDbUsedDataQuotaDaemon.java
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.transaction;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.util.MasterDaemon;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+public class UpdateDbUsedDataQuotaDaemon extends MasterDaemon {
+    private static final Logger LOG = LogManager.getLogger(UpdateDbUsedDataQuotaDaemon.class);
+
+    public UpdateDbUsedDataQuotaDaemon() {
+        super("UPDATE_DB_USED_QUOTA", Config.db_used_data_quota_update_interval_secs);
+    }
+
+    @Override
+    protected void runAfterCatalogReady() {
+        updateAllDatabaseUsedDataQuota();
+    }
+
+    private void updateAllDatabaseUsedDataQuota() {
+        Catalog catalog = Catalog.getCurrentCatalog();
+        List<Long> dbIdList = catalog.getDbIds();
+        GlobalTransactionMgr globalTransactionMgr = catalog.getGlobalTransactionMgr();
+        for (Long dbId : dbIdList) {
+            Database db = catalog.getDb(dbId);
+            if (db == null) {
+                LOG.warn("Database [" + dbId + "] doese not exist, skip to update database used data quota");
+                continue;
+            }
+            try {
+                long usedDataQuotaBytes = db.getUsedDataQuotaWithLock();
+                globalTransactionMgr.updateDatabaseUsedQuotaData(dbId, usedDataQuotaBytes);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Update database[{}] used data quota bytes : {}.", db.getFullName(), usedDataQuotaBytes);
+                }
+            } catch (AnalysisException e) {
+                LOG.warn("Update database[" + dbId + "] used data quota failed", e);
+            }
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org