You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2019/11/15 15:03:28 UTC

[incubator-doris] branch master updated: Optimize tablet report with expired transaction. (#2215)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c5ce722  Optimize tablet report with expired transaction. (#2215)
c5ce722 is described below

commit c5ce72215de92bd78e8376bd9f7072c0705c93cf
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Fri Nov 15 23:03:21 2019 +0800

    Optimize tablet report with expired transaction. (#2215)
    
    When there are lots of expired transactions on BE, and with large
    number of tablet, the report thread may become to slow. Because it
    has to iterate the whole transaction map for each tablet.
    
    But this is unnecessary. We should first build a expired transaction
    map with 'tablet id' as key. And for each tablet, we only need to seek
    the expired transaction map once with tablet id, instead of traversing
    the whole transaction map.
---
 be/src/agent/task_worker_pool.cpp                  |  9 +++---
 be/src/common/config.h                             |  2 +-
 be/src/olap/rowset/rowset_meta_manager.cpp         |  2 +-
 be/src/olap/tablet_manager.cpp                     | 17 +++++++---
 be/src/olap/txn_manager.cpp                        | 37 +++++++++-------------
 be/src/olap/txn_manager.h                          |  4 ++-
 .../java/org/apache/doris/master/MasterImpl.java   | 14 ++------
 .../org/apache/doris/master/ReportHandler.java     |  1 -
 .../java/org/apache/doris/qe/SimpleScheduler.java  |  8 ++---
 9 files changed, 44 insertions(+), 50 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index dbb60fd..5fec93f 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -883,7 +883,8 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t
             worker_pool_this->_tasks.pop_front();
         }
         LOG(INFO) << "get clear transaction task task, signature:" << agent_task_req.signature
-                  << ", transaction_id:" << clear_transaction_task_req.transaction_id;
+                  << ", transaction_id: " << clear_transaction_task_req.transaction_id
+                  << ", partition id size: " << clear_transaction_task_req.partition_id.size();
 
         TStatusCode::type status_code = TStatusCode::OK;
         vector<string> error_msgs;
@@ -893,7 +894,7 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t
             // transaction_id should be greater than zero.
             // If it is not greater than zero, no need to execute
             // the following clear_transaction_task() function.
-            if (clear_transaction_task_req.partition_id.empty()) {
+            if (!clear_transaction_task_req.partition_id.empty()) {
                 worker_pool_this->_env->storage_engine()->clear_transaction_task(
                         clear_transaction_task_req.transaction_id, clear_transaction_task_req.partition_id);
             } else {
@@ -901,10 +902,10 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t
                         clear_transaction_task_req.transaction_id);
             }
             LOG(INFO) << "finish to clear transaction task. signature:" << agent_task_req.signature
-                      << ", transaction_id:" << clear_transaction_task_req.transaction_id;
+                      << ", transaction_id: " << clear_transaction_task_req.transaction_id;
         } else {
             LOG(WARNING) << "invalid transaction id: " << clear_transaction_task_req.transaction_id
-                         << ", signature:" << agent_task_req.signature;
+                         << ", signature: " << agent_task_req.signature;
         }
 
         task_status.__set_status_code(status_code);
diff --git a/be/src/common/config.h b/be/src/common/config.h
index e335000..427cf53 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -494,7 +494,7 @@ namespace config {
 
     // max number of txns in txn manager
     // this is a self protection to avoid too many txns saving in manager
-    CONF_Int64(max_runnings_transactions, "1000");
+    CONF_Int64(max_runnings_transactions, "200");
 
 } // namespace config
 
diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp b/be/src/olap/rowset/rowset_meta_manager.cpp
index e3144c8..3ba1c76 100644
--- a/be/src/olap/rowset/rowset_meta_manager.cpp
+++ b/be/src/olap/rowset/rowset_meta_manager.cpp
@@ -94,7 +94,7 @@ OLAPStatus RowsetMetaManager::save(OlapMeta* meta, TabletUid tablet_uid, const R
 
 OLAPStatus RowsetMetaManager::remove(OlapMeta* meta, TabletUid tablet_uid, const RowsetId& rowset_id) {
     std::string key = ROWSET_PREFIX + tablet_uid.to_string() + "_" + rowset_id.to_string();
-    LOG(INFO) << "start to remove rowset, key:" << key;
+    VLOG(3) << "start to remove rowset, key:" << key;
     OLAPStatus status = meta->remove(META_COLUMN_FAMILY_INDEX, key);
     LOG(INFO) << "remove rowset key:" << key << " finished";
     return status;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index f47f7c3..aed6215 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -920,6 +920,11 @@ OLAPStatus TabletManager::report_tablet_info(TTabletInfo* tablet_info) {
 
 OLAPStatus TabletManager::report_all_tablets_info(std::map<TTabletId, TTablet>* tablets_info) {
     LOG(INFO) << "begin to process report all tablets info.";
+
+    // build the expired txn map first, outside the tablet map lock
+    std::map<TabletInfo, std::set<int64_t>> expire_txn_map;
+    StorageEngine::instance()->txn_manager()->build_expire_txn_map(&expire_txn_map);
+
     ReadLock rlock(&_tablet_map_lock);
     DorisMetrics::report_all_tablets_requests_total.increment(1);
 
@@ -941,11 +946,15 @@ OLAPStatus TabletManager::report_all_tablets_info(std::map<TTabletId, TTablet>*
             TTabletInfo tablet_info;
             tablet_ptr->build_tablet_report_info(&tablet_info);
 
-            // report expire transaction
+            // find expire transaction corresponding to this tablet
+            TabletInfo tinfo = TabletInfo(tablet_ptr->tablet_id(), tablet_ptr->schema_hash(), tablet_ptr->tablet_uid());
             vector<int64_t> transaction_ids;
-            // TODO(ygl): tablet manager and txn manager may be dead lock
-            StorageEngine::instance()->txn_manager()->get_expire_txns(tablet_ptr->tablet_id(), 
-                tablet_ptr->schema_hash(), tablet_ptr->tablet_uid(), &transaction_ids);
+            auto find = expire_txn_map.find(tinfo);
+            if (find != expire_txn_map.end()) {
+                for(auto& it : find->second) {
+                    transaction_ids.push_back(it);
+                }
+            }
             tablet_info.__set_transaction_ids(transaction_ids);
 
             tablet.tablet_infos.push_back(tablet_info);
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 44fe9b2..79124be 100755
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -298,9 +298,9 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT
                       << ", rowsetid: " << rowset_ptr->rowset_id();
             if (it->second.empty()) {
                 _txn_tablet_map.erase(it);
+                _clear_txn_partition_map_unlocked(transaction_id, partition_id);
             }
         }
-        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
         return OLAP_SUCCESS;
     }
 }
@@ -335,8 +335,8 @@ OLAPStatus TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId tr
                   << ", tablet: " << tablet_info.to_string();
         if (it->second.empty()) {
             _txn_tablet_map.erase(it);
+            _clear_txn_partition_map_unlocked(transaction_id, partition_id);
         }
-        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
     }
     return OLAP_SUCCESS;
 }
@@ -385,8 +385,8 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, TTr
     it->second.erase(tablet_info);
     if (it->second.empty()) {
         _txn_tablet_map.erase(it);
+        _clear_txn_partition_map_unlocked(transaction_id, partition_id);
     }
-    _clear_txn_partition_map_unlocked(transaction_id, partition_id);
     return OLAP_SUCCESS;
 }
 
@@ -435,8 +435,8 @@ void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId ta
         }
         if (it.second.empty()) {
             _txn_tablet_map.erase(it.first);
+            _clear_txn_partition_map_unlocked(it.first.second, it.first.first);
         }
-        _clear_txn_partition_map_unlocked(it.first.second, it.first.first);
     }
 }
 
@@ -487,31 +487,24 @@ bool TxnManager::has_txn(TPartitionId partition_id, TTransactionId transaction_i
     return found;
 }
 
-bool TxnManager::get_expire_txns(TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid, 
-    std::vector<int64_t>* transaction_ids) {
-    if (transaction_ids == nullptr) {
-        LOG(WARNING) << "parameter is null when get_expire_txns by tablet";
-        return false;
-    }
+void TxnManager::build_expire_txn_map(std::map<TabletInfo, std::set<int64_t>>* expire_txn_map) {
     time_t now = time(nullptr);
-    TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);
+    int64_t counter = 0;
+    // traverse the txn map, and get all expired txns
     ReadLock txn_rdlock(&_txn_map_lock);
     for (auto& it : _txn_tablet_map) {
-        auto txn_info = it.second.find(tablet_info);
-        if (txn_info != it.second.end()) {
-            double diff = difftime(now, txn_info->second.creation_time);
+        for (auto& t_map : it.second) {
+            double diff = difftime(now, t_map.second.creation_time);
             if (diff >= config::pending_data_expire_time_sec) {
-                transaction_ids->push_back(it.first.second);
-                LOG(INFO) << "find expire pending data. " 
-                        << " tablet_id=" << tablet_id
-                        << " schema_hash=" << schema_hash 
-                        << " tablet_uid=" << tablet_uid.to_string()
-                        << " transaction_id=" << it.first.second 
-                        << " exist_sec=" << diff;
+                if (expire_txn_map->find(t_map.first) == expire_txn_map->end()) {
+                    (*expire_txn_map)[t_map.first] = std::set<int64_t>();
+                }
+                (*expire_txn_map)[t_map.first].insert(it.first.second);
+                counter++;
             }
         }
     }
-    return true;
+    LOG(INFO) << "get " << counter << " expired txns";
 }
 
 void TxnManager::get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids) {
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 8ea816e..4b22266 100755
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -132,7 +132,9 @@ public:
     bool has_txn(TPartitionId partition_id, TTransactionId transaction_id,
                  TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid);
 
-    bool get_expire_txns(TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid, std::vector<int64_t>* transaction_ids);
+    // get all expired txns and save tham in expire_txn_map.
+    // This is currently called before reporting all tablet info, to avoid iterating txn map for every tablets.
+    void build_expire_txn_map(std::map<TabletInfo, std::set<int64_t>>* expire_txn_map);
 
     void force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid);
 
diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
index db92f77..9ed27bc 100644
--- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -44,7 +44,6 @@ import org.apache.doris.task.AgentTaskQueue;
 import org.apache.doris.task.AlterReplicaTask;
 import org.apache.doris.task.CheckConsistencyTask;
 import org.apache.doris.task.ClearAlterTask;
-import org.apache.doris.task.ClearTransactionTask;
 import org.apache.doris.task.CloneTask;
 import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.task.CreateRollupTask;
@@ -118,7 +117,7 @@ public class MasterImpl {
         AgentTask task = AgentTaskQueue.getTask(backendId, taskType, signature);
         if (task == null) {
             if (taskType != TTaskType.DROP && taskType != TTaskType.STORAGE_MEDIUM_MIGRATE
-                    && taskType != TTaskType.RELEASE_SNAPSHOT) {
+                    && taskType != TTaskType.RELEASE_SNAPSHOT && taskType != TTaskType.CLEAR_TRANSACTION_TASK) {
                 String errMsg = "cannot find task. type: " + taskType + ", backendId: " + backendId
                         + ", signature: " + signature;
                 LOG.warn(errMsg);
@@ -164,9 +163,6 @@ public class MasterImpl {
                 case CLEAR_ALTER_TASK:
                     finishClearAlterTask(task, request);
                     break;
-                case CLEAR_TRANSACTION_TASK:
-                    finishClearTransactionTask(task, request);
-                    break;
                 case DROP:
                     finishDropReplica(task);
                     break;
@@ -390,7 +386,7 @@ public class MasterImpl {
             RollupHandler rollupHandler = Catalog.getInstance().getRollupHandler();
             AlterJob alterJob = rollupHandler.getAlterJob(olapTable.getId());
             if (alterJob == null && olapTable.getState() == OlapTableState.ROLLUP) {
-                // this happends when:
+                // this happens when:
                 // a rollup job is finish and a delete job is the next first job (no load job before)
                 // and delete task is first send to base tablet, so it will return 2 tablets info.
                 // the second tablet is rollup tablet and it is no longer exist in alterJobs queue.
@@ -562,12 +558,6 @@ public class MasterImpl {
         AgentTaskQueue.removeTask(task.getBackendId(), task.getTaskType(), task.getSignature());
     }
     
-    private void finishClearTransactionTask(AgentTask task, TFinishTaskRequest request) {
-        ClearTransactionTask clearTransactionTask = (ClearTransactionTask) task;
-        clearTransactionTask.setFinished(true);
-        AgentTaskQueue.removeTask(task.getBackendId(), task.getTaskType(), task.getSignature());
-    }
-    
     private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) {
         List<Long> errorTabletIds = null;
         if (request.isSetError_tablet_ids()) {
diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
index 213da10..099a0e1 100644
--- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -897,7 +897,6 @@ public class ReportHandler extends Daemon {
             ClearTransactionTask clearTransactionTask = new ClearTransactionTask(backendId, 
                     transactionId, transactionsToClear.get(transactionId));
             batchTask.addTask(clearTransactionTask);
-            AgentTaskQueue.addTask(clearTransactionTask);
         }
         
         AgentTaskExecutor.submit(batchTask);
diff --git a/fe/src/main/java/org/apache/doris/qe/SimpleScheduler.java b/fe/src/main/java/org/apache/doris/qe/SimpleScheduler.java
index 2e59ee1..8db4f01 100644
--- a/fe/src/main/java/org/apache/doris/qe/SimpleScheduler.java
+++ b/fe/src/main/java/org/apache/doris/qe/SimpleScheduler.java
@@ -29,6 +29,9 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -36,9 +39,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.LogManager;
-
 public class SimpleScheduler {
     private static AtomicLong nextId = new AtomicLong(0);
     private static final Logger LOG = LogManager.getLogger(SimpleScheduler.class);
@@ -173,7 +173,7 @@ public class SimpleScheduler {
                             Long backendId = entry.getKey();
                             
                             // remove from blacklist if
-                            // 1. backend does not exist antmore
+                            // 1. backend does not exist anymore
                             // 2. backend is alive
                             if (clusterInfoService.getBackend(backendId) == null
                                     || clusterInfoService.checkBackendAvailable(backendId)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org