You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/11/13 03:23:02 UTC
[incubator-doris] branch master updated: Sending clear txn task
explicitly after transaction being aborted (#2182)
This is an automated email from the ASF dual-hosted git repository.
zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 11872d5 Sending clear txn task explicitly after transaction being aborted (#2182)
11872d5 is described below
commit 11872d5cf60afa9a165cfd0f19a55f8863e3aaae
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Wed Nov 13 11:22:45 2019 +0800
Sending clear txn task explicitly after transaction being aborted (#2182)
---
be/src/agent/task_worker_pool.cpp | 9 ++++-
be/src/common/config.h | 5 +++
be/src/olap/delta_writer.cpp | 2 +-
be/src/olap/olap_define.h | 1 +
be/src/olap/push_handler.cpp | 9 +++--
be/src/olap/storage_engine.cpp | 9 ++++-
be/src/olap/storage_engine.h | 3 +-
be/src/olap/txn_manager.cpp | 44 +++++++++++++++++++++-
be/src/olap/txn_manager.h | 15 ++++++++
be/test/olap/txn_manager_test.cpp | 1 +
.../java/org/apache/doris/master/MasterImpl.java | 12 ++----
.../org/apache/doris/master/ReportHandler.java | 4 +-
.../java/org/apache/doris/task/ClearAlterTask.java | 9 -----
.../apache/doris/task/ClearTransactionTask.java | 25 +++---------
.../doris/transaction/GlobalTransactionMgr.java | 43 +++++++++++++++++++++
15 files changed, 141 insertions(+), 50 deletions(-)
diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 9e51d17..dbb60fd 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -893,8 +893,13 @@ void* TaskWorkerPool::_clear_transaction_task_worker_thread_callback(void* arg_t
// transaction_id should be greater than zero.
// If it is not greater than zero, no need to execute
// the following clear_transaction_task() function.
- worker_pool_this->_env->storage_engine()->clear_transaction_task(
- clear_transaction_task_req.transaction_id, clear_transaction_task_req.partition_id);
+ if (clear_transaction_task_req.partition_id.empty()) {
+ worker_pool_this->_env->storage_engine()->clear_transaction_task(
+ clear_transaction_task_req.transaction_id, clear_transaction_task_req.partition_id);
+ } else {
+ worker_pool_this->_env->storage_engine()->clear_transaction_task(
+ clear_transaction_task_req.transaction_id);
+ }
LOG(INFO) << "finish to clear transaction task. signature:" << agent_task_req.signature
<< ", transaction_id:" << clear_transaction_task_req.transaction_id;
} else {
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9aa7d2b..e335000 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -491,6 +491,11 @@ namespace config {
// brpc config
CONF_Int64(brpc_max_body_size, "67108864")
+
+ // max number of txns in txn manager
+ // this is a self protection to avoid too many txns saving in manager
+ CONF_Int64(max_runnings_transactions, "1000");
+
} // namespace config
} // namespace doris
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 711d50c..57179b7 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -123,7 +123,7 @@ OLAPStatus DeltaWriter::init() {
if (!new_migration_rlock.own_lock()) {
return OLAP_ERR_RWLOCK_ERROR;
}
- _storage_engine->txn_manager()->prepare_txn(_req.partition_id, _new_tablet, _req.txn_id, _req.load_id);
+ RETURN_NOT_OK(_storage_engine->txn_manager()->prepare_txn(_req.partition_id, _new_tablet, _req.txn_id, _req.load_id));
}
}
}
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index f4b40f1..f1efdb0 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -166,6 +166,7 @@ enum OLAPStatus {
OLAP_ERR_VERSION_ALREADY_MERGED = -230,
OLAP_ERR_LZO_DISABLED = -231,
OLAP_ERR_DISK_REACH_CAPACITY_LIMIT = -232,
+ OLAP_ERR_TOO_MANY_TRANSACTIONS = -233,
// CommandExecutor
// [-300, -400)
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 298634d..614b366 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -93,8 +93,8 @@ OLAPStatus PushHandler::_do_streaming_ingestion(
PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
- OLAPStatus res = StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id,
- tablet, request.transaction_id, load_id);
+ RETURN_NOT_OK(StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id,
+ tablet, request.transaction_id, load_id));
// prepare txn will be always successful
// if current tablet is under schema change, origin tablet is successful and
@@ -140,8 +140,8 @@ OLAPStatus PushHandler::_do_streaming_ingestion(
PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
- res = StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id,
- related_tablet, request.transaction_id, load_id);
+ RETURN_NOT_OK(StorageEngine::instance()->txn_manager()->prepare_txn(request.partition_id,
+ related_tablet, request.transaction_id, load_id));
// prepare txn will always be successful
tablet_vars->push_back(TabletVars());
TabletVars& new_item = tablet_vars->back();
@@ -158,6 +158,7 @@ OLAPStatus PushHandler::_do_streaming_ingestion(
// not call validate request here, because realtime load does not
// contain version info
+ OLAPStatus res;
// check delete condition if push for delete
std::queue<DeletePredicatePB> del_preds;
if (push_type == PUSH_FOR_DELETE) {
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 99cb877..4479451 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -477,8 +477,15 @@ OLAPStatus StorageEngine::clear() {
return OLAP_SUCCESS;
}
+void StorageEngine::clear_transaction_task(const TTransactionId transaction_id) {
+ // clear transaction task may not contains partitions ids, we should get partition id from txn manager.
+ std::vector<int64_t> partition_ids;
+ StorageEngine::instance()->txn_manager()->get_partition_ids(transaction_id, &partition_ids);
+ clear_transaction_task(transaction_id, partition_ids);
+}
+
void StorageEngine::clear_transaction_task(const TTransactionId transaction_id,
- const vector<TPartitionId> partition_ids) {
+ const vector<TPartitionId>& partition_ids) {
LOG(INFO) << "begin to clear transaction task. transaction_id=" << transaction_id;
for (const TPartitionId& partition_id : partition_ids) {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index c6634b0..a360f22 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -86,8 +86,9 @@ public:
const bool is_schema_change_tablet,
const TabletSharedPtr ref_tablet);
+ void clear_transaction_task(const TTransactionId transaction_id);
void clear_transaction_task(const TTransactionId transaction_id,
- const std::vector<TPartitionId> partition_ids);
+ const std::vector<TPartitionId>& partition_ids);
// Instance should be inited from create_instance
// MUST NOT be called in other circumstances.
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index e103b27..44fe9b2 100755
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -134,11 +134,21 @@ OLAPStatus TxnManager::prepare_txn(
}
}
}
+
+ // check if there are too many transactions on running.
+ // if yes, reject the request.
+ if (_txn_partition_map.size() > config::max_runnings_transactions) {
+ LOG(WARNING) << "too many transactions: " << _txn_tablet_map.size() << ", limit: " << config::max_runnings_transactions;
+ return OLAP_ERR_TOO_MANY_TRANSACTIONS;
+ }
+
// not found load id
// case 1: user start a new txn, rowset_ptr = null
// case 2: loading txn from meta env
TabletTxnInfo load_info(load_id, nullptr);
_txn_tablet_map[key][tablet_info] = load_info;
+ _insert_txn_partition_map_unlocked(transaction_id, partition_id);
+
LOG(INFO) << "add transaction to engine successfully."
<< "partition_id: " << key.first
<< ", transaction_id: " << key.second
@@ -225,6 +235,7 @@ OLAPStatus TxnManager::commit_txn(
WriteLock wrlock(&_txn_map_lock);
TabletTxnInfo load_info(load_id, rowset_ptr);
_txn_tablet_map[key][tablet_info] = load_info;
+ _insert_txn_partition_map_unlocked(transaction_id, partition_id);
LOG(INFO) << "commit transaction to engine successfully."
<< " partition_id: " << key.first
<< ", transaction_id: " << key.second
@@ -289,6 +300,7 @@ OLAPStatus TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, TT
_txn_tablet_map.erase(it);
}
}
+ _clear_txn_partition_map_unlocked(transaction_id, partition_id);
return OLAP_SUCCESS;
}
}
@@ -324,7 +336,7 @@ OLAPStatus TxnManager::rollback_txn(TPartitionId partition_id, TTransactionId tr
if (it->second.empty()) {
_txn_tablet_map.erase(it);
}
- return OLAP_SUCCESS;
+ _clear_txn_partition_map_unlocked(transaction_id, partition_id);
}
return OLAP_SUCCESS;
}
@@ -374,6 +386,7 @@ OLAPStatus TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id, TTr
if (it->second.empty()) {
_txn_tablet_map.erase(it);
}
+ _clear_txn_partition_map_unlocked(transaction_id, partition_id);
return OLAP_SUCCESS;
}
@@ -423,6 +436,7 @@ void TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId ta
if (it.second.empty()) {
_txn_tablet_map.erase(it.first);
}
+ _clear_txn_partition_map_unlocked(it.first.second, it.first.first);
}
}
@@ -500,4 +514,32 @@ bool TxnManager::get_expire_txns(TTabletId tablet_id, SchemaHash schema_hash, Ta
return true;
}
+void TxnManager::get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids) {
+ ReadLock txn_rdlock(&_txn_map_lock);
+ auto it = _txn_partition_map.find(transaction_id);
+ if (it != _txn_partition_map.end()) {
+ for (int64_t partition_id : it->second) {
+ partition_ids->push_back(partition_id);
+ }
+ }
+}
+
+void TxnManager::_insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
+ auto find = _txn_partition_map.find(transaction_id);
+ if (find == _txn_partition_map.end()) {
+ _txn_partition_map[transaction_id] = std::unordered_set<int64_t>();
+ }
+ _txn_partition_map[transaction_id].insert(partition_id);
+}
+
+void TxnManager::_clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id) {
+ auto it = _txn_partition_map.find(transaction_id);
+ if (it != _txn_partition_map.end()) {
+ it->second.erase(partition_id);
+ if (it->second.empty()) {
+ _txn_partition_map.erase(it);
+ }
+ }
+}
+
} // namespace doris
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index ef7f46e..8ea816e 100755
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -27,6 +27,8 @@
#include <string>
#include <vector>
#include <thread>
+#include <unordered_map>
+#include <unordered_set>
#include <rapidjson/document.h>
#include <pthread.h>
@@ -70,6 +72,7 @@ public:
~TxnManager() {
_txn_tablet_map.clear();
+ _txn_partition_map.clear();
_txn_locks.clear();
}
@@ -132,16 +135,28 @@ public:
bool get_expire_txns(TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid, std::vector<int64_t>* transaction_ids);
void force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid);
+
+ void get_partition_ids(const TTransactionId transaction_id, std::vector<TPartitionId>* partition_ids);
private:
RWMutex* _get_txn_lock(TTransactionId txn_id) {
return _txn_locks[txn_id % _txn_lock_num].get();
}
+ // insert or remove (transaction_id, partition_id) from _txn_partition_map
+ // get _txn_map_lock before calling
+ void _insert_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
+ void _clear_txn_partition_map_unlocked(int64_t transaction_id, int64_t partition_id);
+
private:
RWMutex _txn_map_lock;
using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id;
std::map<TxnKey, std::map<TabletInfo, TabletTxnInfo>> _txn_tablet_map;
+ // transaction_id -> corresponding partition ids
+ // This is mainly for the clear txn task received from FE, which may only has transaction id,
+ // so we need this map to find out which partitions are corresponding to a transaction id.
+ // This map should be constructed/deconstructed/modified alongside with '_txn_tablet_map'
+ std::unordered_map<int64_t, std::unordered_set<int64_t>> _txn_partition_map;
const int32_t _txn_lock_num = 100;
std::map<int32_t, std::shared_ptr<RWMutex>> _txn_locks;
diff --git a/be/test/olap/txn_manager_test.cpp b/be/test/olap/txn_manager_test.cpp
index be11224..0c93e5a 100644
--- a/be/test/olap/txn_manager_test.cpp
+++ b/be/test/olap/txn_manager_test.cpp
@@ -92,6 +92,7 @@ public:
}
virtual void SetUp() {
+ config::max_runnings_transactions = 1000;
std::vector<StorePath> paths;
paths.emplace_back("_engine_data_path", -1);
diff --git a/fe/src/main/java/org/apache/doris/master/MasterImpl.java b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
index 1393fd6..db92f77 100644
--- a/fe/src/main/java/org/apache/doris/master/MasterImpl.java
+++ b/fe/src/main/java/org/apache/doris/master/MasterImpl.java
@@ -558,18 +558,14 @@ public class MasterImpl {
private void finishClearAlterTask(AgentTask task, TFinishTaskRequest request) {
ClearAlterTask clearAlterTask = (ClearAlterTask) task;
- clearAlterTask.setFinished();
- AgentTaskQueue.removeTask(task.getBackendId(),
- task.getTaskType(),
- task.getSignature());
+ clearAlterTask.setFinished(true);
+ AgentTaskQueue.removeTask(task.getBackendId(), task.getTaskType(), task.getSignature());
}
private void finishClearTransactionTask(AgentTask task, TFinishTaskRequest request) {
ClearTransactionTask clearTransactionTask = (ClearTransactionTask) task;
- clearTransactionTask.setFinished();
- AgentTaskQueue.removeTask(task.getBackendId(),
- task.getTaskType(),
- task.getSignature());
+ clearTransactionTask.setFinished(true);
+ AgentTaskQueue.removeTask(task.getBackendId(), task.getTaskType(), task.getSignature());
}
private void finishPublishVersion(AgentTask task, TFinishTaskRequest request) {
diff --git a/fe/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
index 2a40961..213da10 100644
--- a/fe/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -81,7 +81,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -896,8 +895,7 @@ public class ReportHandler extends Daemon {
AgentBatchTask batchTask = new AgentBatchTask();
for (Long transactionId : transactionsToClear.keySet()) {
ClearTransactionTask clearTransactionTask = new ClearTransactionTask(backendId,
- transactionId,
- new ArrayList<Long>(transactionsToClear.get(transactionId)));
+ transactionId, transactionsToClear.get(transactionId));
batchTask.addTask(clearTransactionTask);
AgentTaskQueue.addTask(clearTransactionTask);
}
diff --git a/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java b/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java
index 53e8015..1417861 100644
--- a/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java
+++ b/fe/src/main/java/org/apache/doris/task/ClearAlterTask.java
@@ -22,7 +22,6 @@ import org.apache.doris.thrift.TTaskType;
public class ClearAlterTask extends AgentTask {
private int schemaHash;
- private boolean isFinished;
public ClearAlterTask(long backendId, long dbId, long tableId, long partitionId, long indexId,
long tabletId, int schemaHash) {
@@ -40,12 +39,4 @@ public class ClearAlterTask extends AgentTask {
public int getSchemaHash() {
return schemaHash;
}
-
- public void setFinished() {
- this.isFinished = true;
- }
-
- public boolean isFinished() {
- return isFinished;
- }
}
diff --git a/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java b/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java
index 7b1caea..6f07898 100644
--- a/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java
+++ b/fe/src/main/java/org/apache/doris/task/ClearTransactionTask.java
@@ -17,24 +17,17 @@
package org.apache.doris.task;
-import java.util.List;
-
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
import org.apache.doris.thrift.TClearTransactionTaskRequest;
import org.apache.doris.thrift.TTaskType;
-public class ClearTransactionTask extends AgentTask {
+import java.util.List;
- private static final Logger LOG = LogManager.getLogger(ClearTransactionTask.class);
+public class ClearTransactionTask extends AgentTask {
private long transactionId;
private List<Long> partitionIds;
- private boolean isFinished;
- public ClearTransactionTask(long backendId, long transactionId,
- List<Long> partitionIds) {
+ public ClearTransactionTask(long backendId, long transactionId, List<Long> partitionIds) {
super(null, backendId, TTaskType.CLEAR_TRANSACTION_TASK, -1L, -1L, -1L, -1L, -1L, transactionId);
this.transactionId = transactionId;
this.partitionIds = partitionIds;
@@ -42,16 +35,8 @@ public class ClearTransactionTask extends AgentTask {
}
public TClearTransactionTaskRequest toThrift() {
- TClearTransactionTaskRequest clearTransactionTaskRequest = new TClearTransactionTaskRequest(transactionId,
- partitionIds);
+ TClearTransactionTaskRequest clearTransactionTaskRequest = new TClearTransactionTaskRequest(
+ transactionId, partitionIds);
return clearTransactionTaskRequest;
}
-
- public void setFinished() {
- this.isFinished = true;
- }
-
- public boolean isFinished() {
- return this.isFinished;
- }
}
diff --git a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
index 5908d18..15d3e19 100644
--- a/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
+++ b/fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java
@@ -39,7 +39,10 @@ import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.EditLog;
+import org.apache.doris.task.AgentBatchTask;
+import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.ClearTransactionTask;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUniqueId;
@@ -95,6 +98,8 @@ public class GlobalTransactionMgr {
private Catalog catalog;
+ private List<ClearTransactionTask> clearTransactionTasks = Lists.newArrayList();
+
public GlobalTransactionMgr(Catalog catalog) {
this.catalog = catalog;
}
@@ -484,6 +489,7 @@ public class GlobalTransactionMgr {
return transactionState.getTransactionStatus() == TransactionStatus.VISIBLE;
}
+ // for http cancel stream load api
public void abortTransaction(Long dbId, String label, String reason) throws UserException {
Preconditions.checkNotNull(label);
Long transactionId = null;
@@ -532,9 +538,46 @@ public class GlobalTransactionMgr {
writeUnlock();
transactionState.afterStateTransform(TransactionStatus.ABORTED, txnOperated, reason);
}
+
+ // send clear txn task to BE to clear the transactions on BE.
+ // This is because parts of a txn may succeed in some BE, and these parts of txn should be cleared
+ // explicitly, or it will be remained on BE forever
+ // (However the report process will do the diff and send clear txn tasks to BE, but that is our
+ // last defense)
+ if (txnOperated && transactionState.getTransactionStatus() == TransactionStatus.ABORTED) {
+ clearBackendTransactions(transactionState);
+ }
+
return;
}
+ private void clearBackendTransactions(TransactionState transactionState) {
+ Preconditions.checkState(transactionState.getTransactionStatus() == TransactionStatus.ABORTED);
+ // for aborted transaction, we don't know which backends are involved, so we have to send clear task
+ // to all backends.
+ List<Long> allBeIds = Catalog.getCurrentSystemInfo().getBackendIds(false);
+ AgentBatchTask batchTask = null;
+ synchronized (clearTransactionTasks) {
+ for (Long beId : allBeIds) {
+ ClearTransactionTask task = new ClearTransactionTask(beId, transactionState.getTransactionId(), Lists.newArrayList());
+ clearTransactionTasks.add(task);
+ }
+
+ // try to group send tasks, not sending every time a txn is aborted. to avoid too many task rpc.
+ if (clearTransactionTasks.size() > allBeIds.size() * 2) {
+ batchTask = new AgentBatchTask();
+ for (ClearTransactionTask clearTransactionTask : clearTransactionTasks) {
+ batchTask.addTask(clearTransactionTask);
+ }
+ clearTransactionTasks.clear();
+ }
+ }
+
+ if (batchTask != null) {
+ AgentTaskExecutor.submit(batchTask);
+ }
+ }
+
/*
* get all txns which is ready to publish
* a ready-to-publish txn's partition's visible version should be ONE less than txn's commit version.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org