You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/25 07:25:01 UTC

[doris] branch branch-2.0 updated (5c338515d9 -> e5e4efe088)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a change to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


    from 5c338515d9 [Bug](pipeline) access map may cause coredump in sink buffer (#21108)
     new e5997114bd [enhancement](merge-on-write) add async publish task when version is discontinuous for merge on write table when clone (#21025)
     new 5db5f6c70d [fix](catalog) do not call makeSureInitialized when create table from hms meta event (#21104)
     new e5e4efe088 [test](regression) update some case in p2  (#21094)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/agent/task_worker_pool.cpp                  |  9 +-
 be/src/olap/data_dir.cpp                           | 15 ++++
 be/src/olap/olap_server.cpp                        | 98 ++++++++++++++++++++++
 be/src/olap/rowset/rowset.h                        |  7 ++
 be/src/olap/storage_engine.cpp                     |  4 +
 be/src/olap/storage_engine.h                       | 12 +++
 be/src/olap/tablet_meta_manager.cpp                | 39 +++++++++
 be/src/olap/tablet_meta_manager.h                  | 12 +++
 be/src/olap/task/engine_clone_task.cpp             | 16 +++-
 be/src/olap/task/engine_publish_version_task.cpp   | 63 +++++++++++++-
 be/src/olap/task/engine_publish_version_task.h     | 30 ++++++-
 .../doris/catalog/external/ExternalDatabase.java   |  9 +-
 .../catalog/external/HMSExternalDatabase.java      |  3 +-
 .../catalog/external/IcebergExternalDatabase.java  |  3 +-
 .../catalog/external/PaimonExternalDatabase.java   |  3 +-
 .../org/apache/doris/datasource/CatalogMgr.java    |  9 +-
 .../datasource/hive/event/AlterTableEvent.java     |  6 +-
 .../datasource/hive/event/CreateTableEvent.java    |  3 +-
 .../java/org/apache/doris/persist/EditLog.java     |  2 +-
 gensrc/proto/olap_file.proto                       |  5 ++
 .../tpcds_sf100_dup_without_key_p2/sql/q04.out     |  2 +-
 .../tpcds_sf100_dup_without_key_p2/sql/q49.out     |  2 +-
 22 files changed, 323 insertions(+), 29 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/03: [fix](catalog) do not call makeSureInitialized when create table from hms meta event (#21104)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5db5f6c70d08664fb21ca0fd7c0e172fedf42b13
Author: Mingyu Chen <mo...@163.com>
AuthorDate: Sat Jun 24 21:50:36 2023 +0800

    [fix](catalog) do not call makeSureInitialized when create table from hms meta event (#21104)
    
    In this PR, I remove the `makeSureInitialized()` call in `createTable()` method, because it is wrong and useless.
    And also rename the methed's name to make it more clear.
---
 .../java/org/apache/doris/catalog/external/ExternalDatabase.java | 9 +++++----
 .../org/apache/doris/catalog/external/HMSExternalDatabase.java   | 3 +--
 .../apache/doris/catalog/external/IcebergExternalDatabase.java   | 3 +--
 .../apache/doris/catalog/external/PaimonExternalDatabase.java    | 3 +--
 .../src/main/java/org/apache/doris/datasource/CatalogMgr.java    | 9 +++++----
 .../org/apache/doris/datasource/hive/event/AlterTableEvent.java  | 6 +++---
 .../org/apache/doris/datasource/hive/event/CreateTableEvent.java | 3 ++-
 fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java   | 2 +-
 8 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
index dd8e7fa1e3..e1b582a540 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/ExternalDatabase.java
@@ -332,12 +332,13 @@ public abstract class ExternalDatabase<T extends ExternalTable>
         throw new NotImplementedException("dropTable() is not implemented");
     }
 
-    public void createTable(String tableName, long tableId) {
-        throw new NotImplementedException("createTable() is not implemented");
-    }
-
     @Override
     public CatalogIf getCatalog() {
         return extCatalog;
     }
+
+    // Only used for sync hive metastore event
+    public void replayCreateTableFromEvent(String tableName, long tableId) {
+        throw new NotImplementedException("createTable() is not implemented");
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
index d53c934052..093ebe8b40 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalDatabase.java
@@ -74,9 +74,8 @@ public class HMSExternalDatabase extends ExternalDatabase<HMSExternalTable> {
     }
 
     @Override
-    public void createTable(String tableName, long tableId) {
+    public void replayCreateTableFromEvent(String tableName, long tableId) {
         LOG.debug("create table [{}]", tableName);
-        makeSureInitialized();
         tableNameToId.put(tableName, tableId);
         HMSExternalTable table = getExternalTable(tableName, tableId, extCatalog);
         idToTbl.put(tableId, table);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
index 3e8a7beef9..8653c3e2dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/IcebergExternalDatabase.java
@@ -60,9 +60,8 @@ public class IcebergExternalDatabase extends ExternalDatabase<IcebergExternalTab
     }
 
     @Override
-    public void createTable(String tableName, long tableId) {
+    public void replayCreateTableFromEvent(String tableName, long tableId) {
         LOG.debug("create table [{}]", tableName);
-        makeSureInitialized();
         tableNameToId.put(tableName, tableId);
         IcebergExternalTable table = new IcebergExternalTable(tableId, tableName, name,
                 (IcebergExternalCatalog) extCatalog);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
index 4f6014241a..34cc7b96c0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/PaimonExternalDatabase.java
@@ -60,9 +60,8 @@ public class PaimonExternalDatabase extends ExternalDatabase<PaimonExternalTable
     }
 
     @Override
-    public void createTable(String tableName, long tableId) {
+    public void replayCreateTableFromEvent(String tableName, long tableId) {
         LOG.debug("create table [{}]", tableName);
-        makeSureInitialized();
         tableNameToId.put(tableName, tableId);
         PaimonExternalTable table = new PaimonExternalTable(tableId, tableName, name,
                 (PaimonExternalCatalog) extCatalog);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index 67b9bb6c8d..96a7c1eae3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -734,7 +734,8 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
         return ((ExternalCatalog) catalog).tableExistInLocal(dbName, tableName);
     }
 
-    public void createExternalTable(String dbName, String tableName, String catalogName, boolean ignoreIfExists)
+    public void createExternalTableFromEvent(String dbName, String tableName, String catalogName,
+            boolean ignoreIfExists)
             throws DdlException {
         CatalogIf catalog = nameToCatalog.get(catalogName);
         if (catalog == null) {
@@ -763,11 +764,11 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
         log.setDbId(db.getId());
         log.setTableName(tableName);
         log.setTableId(Env.getCurrentEnv().getNextId());
-        replayCreateExternalTable(log);
+        replayCreateExternalTableFromEvent(log);
         Env.getCurrentEnv().getEditLog().logCreateExternalTable(log);
     }
 
-    public void replayCreateExternalTable(ExternalObjectLog log) {
+    public void replayCreateExternalTableFromEvent(ExternalObjectLog log) {
         LOG.debug("ReplayCreateExternalTable,catalogId:[{}],dbId:[{}],tableId:[{}],tableName:[{}]", log.getCatalogId(),
                 log.getDbId(), log.getTableId(), log.getTableName());
         ExternalCatalog catalog = (ExternalCatalog) idToCatalog.get(log.getCatalogId());
@@ -782,7 +783,7 @@ public class CatalogMgr implements Writable, GsonPostProcessable {
         }
         db.writeLock();
         try {
-            db.createTable(log.getTableName(), log.getTableId());
+            db.replayCreateTableFromEvent(log.getTableName(), log.getTableId());
         } finally {
             db.writeUnlock();
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
index a0a86bf6c3..cbb1ee8478 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/AlterTableEvent.java
@@ -73,9 +73,9 @@ public class AlterTableEvent extends MetastoreTableEvent {
             return;
         }
         Env.getCurrentEnv().getCatalogMgr()
-            .dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true);
+                .dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true);
         Env.getCurrentEnv().getCatalogMgr()
-            .createExternalTable(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true);
+                .createExternalTableFromEvent(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true);
     }
 
     private void processRename() throws DdlException {
@@ -93,7 +93,7 @@ public class AlterTableEvent extends MetastoreTableEvent {
         Env.getCurrentEnv().getCatalogMgr()
                 .dropExternalTable(tableBefore.getDbName(), tableBefore.getTableName(), catalogName, true);
         Env.getCurrentEnv().getCatalogMgr()
-                .createExternalTable(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true);
+                .createExternalTableFromEvent(tableAfter.getDbName(), tableAfter.getTableName(), catalogName, true);
 
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
index a97d131dce..9ac8fd4e68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/event/CreateTableEvent.java
@@ -59,7 +59,8 @@ public class CreateTableEvent extends MetastoreTableEvent {
     protected void process() throws MetastoreNotificationException {
         try {
             infoLog("catalogName:[{}],dbName:[{}],tableName:[{}]", catalogName, dbName, tblName);
-            Env.getCurrentEnv().getCatalogMgr().createExternalTable(dbName, hmsTbl.getTableName(), catalogName, true);
+            Env.getCurrentEnv().getCatalogMgr()
+                    .createExternalTableFromEvent(dbName, hmsTbl.getTableName(), catalogName, true);
         } catch (DdlException e) {
             throw new MetastoreNotificationException(
                     debugString("Failed to process event"), e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 04c1b58d6e..4244664d43 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -960,7 +960,7 @@ public class EditLog {
                 }
                 case OperationType.OP_CREATE_EXTERNAL_TABLE: {
                     final ExternalObjectLog log = (ExternalObjectLog) journal.getData();
-                    env.getCatalogMgr().replayCreateExternalTable(log);
+                    env.getCatalogMgr().replayCreateExternalTableFromEvent(log);
                     break;
                 }
                 case OperationType.OP_DROP_EXTERNAL_DB: {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/03: [enhancement](merge-on-write) add async publish task when version is discontinuous for merge on write table when clone (#21025)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e5997114bd23fd4422da27b6d40ca46a1fa2fdad
Author: Xin Liao <li...@126.com>
AuthorDate: Thu Jun 22 21:50:14 2023 +0800

    [enhancement](merge-on-write) add async publish task when version is discontinuous for merge on write table when clone (#21025)
    
    version discontinuity may occur when clone. To deal with this case, add async publish task when version is discontinuous.
---
 be/src/agent/task_worker_pool.cpp                |  9 ++-
 be/src/olap/data_dir.cpp                         | 15 ++++
 be/src/olap/olap_server.cpp                      | 98 ++++++++++++++++++++++++
 be/src/olap/rowset/rowset.h                      |  7 ++
 be/src/olap/storage_engine.cpp                   |  4 +
 be/src/olap/storage_engine.h                     | 12 +++
 be/src/olap/tablet_meta_manager.cpp              | 39 ++++++++++
 be/src/olap/tablet_meta_manager.h                | 12 +++
 be/src/olap/task/engine_clone_task.cpp           | 16 +++-
 be/src/olap/task/engine_publish_version_task.cpp | 63 ++++++++++++++-
 be/src/olap/task/engine_publish_version_task.h   | 30 +++++++-
 gensrc/proto/olap_file.proto                     |  5 ++
 12 files changed, 302 insertions(+), 8 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index a91540b445..463845d2c7 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -1448,13 +1448,15 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
 
         std::vector<TTabletId> error_tablet_ids;
         std::vector<TTabletId> succ_tablet_ids;
+        // partition_id, tablet_id, publish_version
+        std::vector<std::tuple<int64_t, int64_t, int64_t>> discontinuous_version_tablets;
         uint32_t retry_time = 0;
         Status status;
         bool is_task_timeout = false;
         while (retry_time < PUBLISH_VERSION_MAX_RETRY) {
             error_tablet_ids.clear();
             EnginePublishVersionTask engine_task(publish_version_req, &error_tablet_ids,
-                                                 &succ_tablet_ids);
+                                                 &succ_tablet_ids, &discontinuous_version_tablets);
             status = _env->storage_engine()->execute_task(&engine_task);
             if (status.ok()) {
                 break;
@@ -1488,6 +1490,11 @@ void PublishVersionTaskPool::_publish_version_worker_thread_callback() {
             continue;
         }
 
+        for (auto& item : discontinuous_version_tablets) {
+            StorageEngine::instance()->add_async_publish_task(
+                    std::get<0>(item), std::get<1>(item), std::get<2>(item),
+                    publish_version_req.transaction_id, false);
+        }
         TFinishTaskRequest finish_task_request;
         if (!status) {
             DorisMetrics::instance()->publish_task_failed_total->increment(1);
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index b83ce43635..970138cd0e 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -452,6 +452,21 @@ Status DataDir::load() {
         }
     }
 
+    auto load_pending_publish_info_func = [](int64_t tablet_id, int64_t publish_version,
+                                             const string& info) {
+        PendingPublishInfoPB pending_publish_info_pb;
+        bool parsed = pending_publish_info_pb.ParseFromString(info);
+        if (!parsed) {
+            LOG(WARNING) << "parse pending publish info failed, tablt_id: " << tablet_id
+                         << " publish_version: " << publish_version;
+        }
+        StorageEngine::instance()->add_async_publish_task(
+                pending_publish_info_pb.partition_id(), tablet_id, publish_version,
+                pending_publish_info_pb.transaction_id(), true);
+        return true;
+    };
+    TabletMetaManager::traverse_pending_publish(_meta, load_pending_publish_info_func);
+
     // traverse rowset
     // 1. add committed rowset to txn map
     // 2. add visible rowset to tablet
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 2fab94a2bf..57bf40a147 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <gen_cpp/Types_types.h>
+#include <gen_cpp/olap_file.pb.h>
 #include <stdint.h>
 
 #include <algorithm>
@@ -60,7 +61,9 @@
 #include "olap/tablet.h"
 #include "olap/tablet_manager.h"
 #include "olap/tablet_meta.h"
+#include "olap/tablet_meta_manager.h"
 #include "olap/tablet_schema.h"
+#include "olap/task/engine_publish_version_task.h"
 #include "olap/task/index_builder.h"
 #include "runtime/client_cache.h"
 #include "service/brpc.h"
@@ -245,6 +248,11 @@ Status StorageEngine::start_bg_threads() {
             .set_max_threads(config::calc_delete_bitmap_max_thread)
             .build(&_calc_delete_bitmap_thread_pool);
 
+    RETURN_IF_ERROR(Thread::create(
+            "StorageEngine", "aync_publish_version_thread",
+            [this]() { this->_async_publish_callback(); }, &_async_publish_thread));
+    LOG(INFO) << "async publish thread started";
+
     LOG(INFO) << "all storage engine's background threads are started.";
     return Status::OK();
 }
@@ -1204,4 +1212,94 @@ void StorageEngine::_cache_file_cleaner_tasks_producer_callback() {
     } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
 }
 
+void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id,
+                                           int64_t publish_version, int64_t transaction_id,
+                                           bool is_recovery) {
+    if (!is_recovery) {
+        TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
+        PendingPublishInfoPB pending_publish_info_pb;
+        pending_publish_info_pb.set_partition_id(partition_id);
+        pending_publish_info_pb.set_transaction_id(transaction_id);
+        TabletMetaManager::save_pending_publish_info(tablet->data_dir(), tablet->tablet_id(),
+                                                     publish_version,
+                                                     pending_publish_info_pb.SerializeAsString());
+    }
+    LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id
+              << " version: " << publish_version << " txn_id:" << transaction_id
+              << " is_recovery: " << is_recovery;
+    std::lock_guard<std::mutex> lock(_async_publish_mutex);
+    _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id};
+}
+
+int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) {
+    std::lock_guard<std::mutex> lock(_async_publish_mutex);
+    auto iter = _async_publish_tasks.find(tablet_id);
+    if (iter == _async_publish_tasks.end()) {
+        return INT64_MAX;
+    }
+    if (iter->second.empty()) {
+        return INT64_MAX;
+    }
+    return iter->second.begin()->first;
+}
+
+void StorageEngine::_async_publish_callback() {
+    while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) {
+        // tablet, publish_version
+        std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks;
+        {
+            std::lock_guard<std::mutex> lock(_async_publish_mutex);
+            for (auto tablet_iter = _async_publish_tasks.begin();
+                 tablet_iter != _async_publish_tasks.end();) {
+                if (tablet_iter->second.empty()) {
+                    tablet_iter = _async_publish_tasks.erase(tablet_iter);
+                    continue;
+                }
+                int64_t tablet_id = tablet_iter->first;
+                TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id);
+                if (!tablet) {
+                    LOG(WARNING) << "tablet does not exist when async publush, tablet_id: "
+                                 << tablet_id;
+                    // TODO(liaoxin) remove pending publish info from db
+                    tablet_iter = _async_publish_tasks.erase(tablet_iter);
+                    continue;
+                }
+
+                auto task_iter = tablet_iter->second.begin();
+                int64_t version = task_iter->first;
+                int64_t transaction_id = task_iter->second.first;
+                int64_t partition_id = task_iter->second.second;
+                int64_t max_version;
+                {
+                    std::shared_lock rdlock(tablet->get_header_lock());
+                    max_version = tablet->max_version().second;
+                }
+
+                if (version <= max_version) {
+                    need_removed_tasks.emplace_back(tablet, version);
+                    tablet_iter->second.erase(task_iter);
+                    tablet_iter++;
+                    continue;
+                }
+                if (version != max_version + 1) {
+                    tablet_iter++;
+                    continue;
+                }
+
+                auto async_publish_task = std::make_shared<AsyncTabletPublishTask>(
+                        tablet, partition_id, transaction_id, version);
+                StorageEngine::instance()->tablet_publish_txn_thread_pool()->submit_func(
+                        [=]() { async_publish_task->handle(); });
+                tablet_iter->second.erase(task_iter);
+                need_removed_tasks.emplace_back(tablet, version);
+                tablet_iter++;
+            }
+        }
+        for (auto& [tablet, publish_version] : need_removed_tasks) {
+            TabletMetaManager::remove_pending_publish_info(tablet->data_dir(), tablet->tablet_id(),
+                                                           publish_version);
+        }
+    }
+}
+
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 7160af3424..8829cc0770 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -299,6 +299,12 @@ public:
 
     bool check_rowset_segment();
 
+    bool start_publish() {
+        bool expect = false;
+        return _is_publish_running.compare_exchange_strong(expect, true);
+    }
+    void finish_publish() { _is_publish_running.store(false); }
+
     [[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
 
 protected:
@@ -337,6 +343,7 @@ protected:
     // rowset state machine
     RowsetStateMachine _rowset_state_machine;
     std::atomic<uint64_t> _delayed_expired_timestamp = 0;
+    std::atomic<bool> _is_publish_running {false};
 };
 
 } // namespace doris
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 94f1bb5a87..d10c11b1c2 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -162,6 +162,9 @@ StorageEngine::~StorageEngine() {
     if (_tablet_meta_checkpoint_thread_pool) {
         _tablet_meta_checkpoint_thread_pool->shutdown();
     }
+    if (_calc_delete_bitmap_thread_pool) {
+        _calc_delete_bitmap_thread_pool->shutdown();
+    }
     _clear();
     _s_instance = nullptr;
 }
@@ -556,6 +559,7 @@ void StorageEngine::stop() {
     THREAD_JOIN(_disk_stat_monitor_thread);
     THREAD_JOIN(_fd_cache_clean_thread);
     THREAD_JOIN(_tablet_checkpoint_tasks_producer_thread);
+    THREAD_JOIN(_async_publish_thread);
 #undef THREAD_JOIN
 
 #define THREADS_JOIN(threads)            \
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 15b1a98a78..0113101565 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -220,6 +220,10 @@ public:
 
     void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos);
 
+    void add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version,
+                                int64_t transaction_id, bool is_recover);
+    int64_t get_pending_publish_min_version(int64_t tablet_id);
+
 private:
     // Instance should be inited from `static open()`
     // MUST NOT be called in other circumstances.
@@ -318,6 +322,8 @@ private:
 
     void _gc_binlogs(int64_t tablet_id, int64_t version);
 
+    void _async_publish_callback();
+
 private:
     struct CompactionCandidate {
         CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
@@ -454,6 +460,12 @@ private:
     std::mutex _running_cooldown_mutex;
     std::unordered_set<int64_t> _running_cooldown_tablets;
 
+    // tablet_id, publish_version, transaction_id, partition_id
+    std::map<int64_t, std::map<int64_t, std::pair<int64_t, int64_t>>> _async_publish_tasks;
+    // aync publish for discontinuous versions of merge_on_write table
+    scoped_refptr<Thread> _async_publish_thread;
+    std::mutex _async_publish_mutex;
+
     DISALLOW_COPY_AND_ASSIGN(StorageEngine);
 };
 
diff --git a/be/src/olap/tablet_meta_manager.cpp b/be/src/olap/tablet_meta_manager.cpp
index ac4deacd14..d3bc41fa19 100644
--- a/be/src/olap/tablet_meta_manager.cpp
+++ b/be/src/olap/tablet_meta_manager.cpp
@@ -175,4 +175,43 @@ Status TabletMetaManager::load_json_meta(DataDir* store, const std::string& meta
     return save(store, tablet_id, schema_hash, meta_binary);
 }
 
+Status TabletMetaManager::save_pending_publish_info(DataDir* store, TTabletId tablet_id,
+                                                    int64_t publish_version,
+                                                    const std::string& meta_binary) {
+    std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version);
+    OlapMeta* meta = store->get_meta();
+    LOG(INFO) << "save pending publish rowset, key:" << key
+              << " meta_size=" << meta_binary.length();
+    return meta->put(META_COLUMN_FAMILY_INDEX, key, meta_binary);
+}
+
+Status TabletMetaManager::remove_pending_publish_info(DataDir* store, TTabletId tablet_id,
+                                                      int64_t publish_version) {
+    std::string key = fmt::format("{}{}_{}", PENDING_PUBLISH_INFO, tablet_id, publish_version);
+    OlapMeta* meta = store->get_meta();
+    Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
+    LOG(INFO) << "remove pending publish_info, key:" << key << ", res:" << res;
+    return res;
+}
+
+Status TabletMetaManager::traverse_pending_publish(
+        OlapMeta* meta, std::function<bool(int64_t, int64_t, const std::string&)> const& func) {
+    auto traverse_header_func = [&func](const std::string& key, const std::string& value) -> bool {
+        std::vector<std::string> parts;
+        // key format: "ppi_" + tablet_id + "_" + publish_version
+        split_string<char>(key, '_', &parts);
+        if (parts.size() != 3) {
+            LOG(WARNING) << "invalid pending publish info key:" << key
+                         << ", split size:" << parts.size();
+            return true;
+        }
+        int64_t tablet_id = std::stol(parts[1], nullptr, 10);
+        int64_t version = std::stol(parts[2], nullptr, 10);
+        return func(tablet_id, version, value);
+    };
+    Status status =
+            meta->iterate(META_COLUMN_FAMILY_INDEX, PENDING_PUBLISH_INFO, traverse_header_func);
+    return status;
+}
+
 } // namespace doris
diff --git a/be/src/olap/tablet_meta_manager.h b/be/src/olap/tablet_meta_manager.h
index 15d6763477..6ba1d76757 100644
--- a/be/src/olap/tablet_meta_manager.h
+++ b/be/src/olap/tablet_meta_manager.h
@@ -34,6 +34,8 @@ const std::string OLD_HEADER_PREFIX = "hdr_";
 
 const std::string HEADER_PREFIX = "tabletmeta_";
 
+const std::string PENDING_PUBLISH_INFO = "ppi_";
+
 // Helper Class for managing tablet headers of one root path.
 class TabletMetaManager {
 public:
@@ -57,6 +59,16 @@ public:
                                    const string& header_prefix = "tabletmeta_");
 
     static Status load_json_meta(DataDir* store, const std::string& meta_path);
+
+    static Status save_pending_publish_info(DataDir* store, TTabletId tablet_id,
+                                            int64_t publish_version,
+                                            const std::string& meta_binary);
+
+    static Status remove_pending_publish_info(DataDir* store, TTabletId tablet_id,
+                                              int64_t publish_version);
+
+    static Status traverse_pending_publish(
+            OlapMeta* meta, std::function<bool(int64_t, int64_t, const std::string&)> const& func);
 };
 
 } // namespace doris
diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp
index 912b3aa4e5..845cbe1117 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -130,7 +130,19 @@ Status EngineCloneTask::_do_clone() {
         auto local_data_path = fmt::format("{}/{}", tablet->tablet_path(), CLONE_PREFIX);
         bool allow_incremental_clone = false;
 
-        tablet->calc_missed_versions(_clone_req.committed_version, &missed_versions);
+        int64_t specified_version = _clone_req.committed_version;
+        if (tablet->enable_unique_key_merge_on_write()) {
+            int64_t min_pending_ver =
+                    StorageEngine::instance()->get_pending_publish_min_version(tablet->tablet_id());
+            if (min_pending_ver - 1 < specified_version) {
+                LOG(INFO) << "use min pending publish version for clone, min_pending_ver: "
+                          << min_pending_ver
+                          << " committed_version: " << _clone_req.committed_version;
+                specified_version = min_pending_ver - 1;
+            }
+        }
+
+        tablet->calc_missed_versions(specified_version, &missed_versions);
 
         // if missed version size is 0, then it is useless to clone from remote be, it means local data is
         // completed. Or remote be will just return header not the rowset files. clone will failed.
@@ -153,7 +165,7 @@ Status EngineCloneTask::_do_clone() {
         RETURN_IF_ERROR(_make_and_download_snapshots(*(tablet->data_dir()), local_data_path,
                                                      &src_host, &src_file_path, missed_versions,
                                                      &allow_incremental_clone));
-        RETURN_IF_ERROR(_finish_clone(tablet.get(), local_data_path, _clone_req.committed_version,
+        RETURN_IF_ERROR(_finish_clone(tablet.get(), local_data_path, specified_version,
                                       allow_incremental_clone));
     } else {
         LOG(INFO) << "clone tablet not exist, begin clone a new tablet from remote be. "
diff --git a/be/src/olap/task/engine_publish_version_task.cpp b/be/src/olap/task/engine_publish_version_task.cpp
index 748706e421..02af6bf674 100644
--- a/be/src/olap/task/engine_publish_version_task.cpp
+++ b/be/src/olap/task/engine_publish_version_task.cpp
@@ -70,11 +70,13 @@ void TabletPublishStatistics::record_in_bvar() {
 
 EnginePublishVersionTask::EnginePublishVersionTask(
         const TPublishVersionRequest& publish_version_req, std::vector<TTabletId>* error_tablet_ids,
-        std::vector<TTabletId>* succ_tablet_ids)
+        std::vector<TTabletId>* succ_tablet_ids,
+        std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinuous_version_tablets)
         : _total_task_num(0),
           _publish_version_req(publish_version_req),
           _error_tablet_ids(error_tablet_ids),
-          _succ_tablet_ids(succ_tablet_ids) {}
+          _succ_tablet_ids(succ_tablet_ids),
+          _discontinuous_version_tablets(discontinuous_version_tablets) {}
 
 void EnginePublishVersionTask::add_error_tablet_id(int64_t tablet_id) {
     std::lock_guard<std::mutex> lck(_tablet_ids_mutex);
@@ -178,6 +180,8 @@ Status EnginePublishVersionTask::finish() {
                     // publish failed
                     if (!tablet->check_version_exist(version)) {
                         add_error_tablet_id(tablet_info.tablet_id);
+                        _discontinuous_version_tablets->emplace_back(
+                                partition_id, tablet_info.tablet_id, version.first);
                         res = Status::Error<PUBLISH_VERSION_NOT_CONTINUOUS>();
                     }
                     continue;
@@ -250,7 +254,14 @@ TabletPublishTxnTask::TabletPublishTxnTask(EnginePublishVersionTask* engine_task
 
 void TabletPublishTxnTask::handle() {
     _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
+    if (!_rowset->start_publish()) {
+        LOG(WARNING) << "publish is running. rowset_id=" << _rowset->rowset_id()
+                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id;
+        _engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
+        return;
+    }
     Defer defer {[&] {
+        _rowset->finish_publish();
         if (_engine_publish_version_task->finish_task() == 1) {
             _engine_publish_version_task->notify();
         }
@@ -289,4 +300,52 @@ void TabletPublishTxnTask::handle() {
               << (cost_us > 500 * 1000 ? _stats.to_string() : "");
 }
 
+void AsyncTabletPublishTask::handle() {
+    _stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
+    std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
+    StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
+            _transaction_id, _partition_id, &tablet_related_rs);
+    auto iter = tablet_related_rs.find(
+            TabletInfo(_tablet->tablet_id(), _tablet->schema_hash(), _tablet->tablet_uid()));
+    if (iter == tablet_related_rs.end()) {
+        return;
+    }
+    RowsetSharedPtr rowset = iter->second;
+    if (!rowset->start_publish()) {
+        LOG(WARNING) << "publish is running. rowset_id=" << rowset->rowset_id()
+                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id;
+        return;
+    }
+    Defer defer {[&] { rowset->finish_publish(); }};
+    Version version(_version, _version);
+    auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
+            _partition_id, _tablet, _transaction_id, version, &_stats);
+    if (publish_status != Status::OK()) {
+        LOG(WARNING) << "failed to publish version. rowset_id=" << rowset->rowset_id()
+                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id
+                     << ", res=" << publish_status;
+        return;
+    }
+
+    // add visible rowset to tablet
+    int64_t t1 = MonotonicMicros();
+    publish_status = _tablet->add_inc_rowset(rowset);
+    _stats.add_inc_rowset_us = MonotonicMicros() - t1;
+    if (publish_status != Status::OK() && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
+        LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << rowset->rowset_id()
+                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _transaction_id
+                     << ", res=" << publish_status;
+        return;
+    }
+    int64_t cost_us = MonotonicMicros() - _stats.submit_time_us;
+    // print stats if publish cost > 500ms
+    g_tablet_publish_latency << cost_us;
+    _stats.record_in_bvar();
+    LOG(INFO) << "async publish version successfully on tablet, table_id=" << _tablet->table_id()
+              << ", tablet=" << _tablet->full_name() << ", transaction_id=" << _transaction_id
+              << ", version=" << _version << ", num_rows=" << rowset->num_rows()
+              << ", res=" << publish_status << ", cost: " << cost_us << "(us) "
+              << (cost_us > 500 * 1000 ? _stats.to_string() : "");
+}
+
 } // namespace doris
diff --git a/be/src/olap/task/engine_publish_version_task.h b/be/src/olap/task/engine_publish_version_task.h
index bd6907b913..c8a68dedea 100644
--- a/be/src/olap/task/engine_publish_version_task.h
+++ b/be/src/olap/task/engine_publish_version_task.h
@@ -82,9 +82,10 @@ private:
 
 class EnginePublishVersionTask : public EngineTask {
 public:
-    EnginePublishVersionTask(const TPublishVersionRequest& publish_version_req,
-                             vector<TTabletId>* error_tablet_ids,
-                             std::vector<TTabletId>* succ_tablet_ids = nullptr);
+    EnginePublishVersionTask(
+            const TPublishVersionRequest& publish_version_req, vector<TTabletId>* error_tablet_ids,
+            std::vector<TTabletId>* succ_tablet_ids,
+            std::vector<std::tuple<int64_t, int64_t, int64_t>>* discontinous_version_tablets);
     ~EnginePublishVersionTask() {}
 
     virtual Status finish() override;
@@ -103,11 +104,34 @@ private:
     std::mutex _tablet_ids_mutex;
     vector<TTabletId>* _error_tablet_ids;
     vector<TTabletId>* _succ_tablet_ids;
+    std::vector<std::tuple<int64_t, int64_t, int64_t>>* _discontinuous_version_tablets;
 
     std::mutex _tablet_finish_mutex;
     std::condition_variable _tablet_finish_cond;
 };
 
+class AsyncTabletPublishTask {
+public:
+    AsyncTabletPublishTask(TabletSharedPtr tablet, int64_t partition_id, int64_t transaction_id,
+                           int64_t version)
+            : _tablet(tablet),
+              _partition_id(partition_id),
+              _transaction_id(transaction_id),
+              _version(version) {
+        _stats.submit_time_us = MonotonicMicros();
+    }
+    ~AsyncTabletPublishTask() = default;
+
+    void handle();
+
+private:
+    TabletSharedPtr _tablet;
+    int64_t _partition_id;
+    int64_t _transaction_id;
+    int64_t _version;
+    TabletPublishStatistics _stats;
+};
+
 } // namespace doris
 
 #endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_PUBLISH_VERSION_TASK_H
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index 6a982fb58c..fe894dba9d 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -333,3 +333,8 @@ message BinlogMetaEntryPB {
     optional int64 creation_time = 5;
     optional string rowset_id_v2 = 6;
 }
+
+message PendingPublishInfoPB {
+    optional int64 partition_id = 1;
+    optional int64 transaction_id = 2;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 03/03: [test](regression) update some case in p2 (#21094)

Posted by kx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit e5e4efe0881effa75e8ab39e855e2002c9e295da
Author: Mryange <59...@users.noreply.github.com>
AuthorDate: Sun Jun 25 11:16:56 2023 +0800

    [test](regression) update some case in p2  (#21094)
    
    update some case in p2
---
 regression-test/data/tpcds_sf100_dup_without_key_p2/sql/q04.out | 2 +-
 regression-test/data/tpcds_sf100_dup_without_key_p2/sql/q49.out | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/regression-test/data/tpcds_sf100_dup_without_key_p2/sql/q04.out b/regression-test/data/tpcds_sf100_dup_without_key_p2/sql/q04.out
index e13306ac67..4aa93d19c4 100644
--- a/regression-test/data/tpcds_sf100_dup_without_key_p2/sql/q04.out
+++ b/regression-test/data/tpcds_sf100_dup_without_key_p2/sql/q04.out
@@ -36,6 +36,7 @@ AAAAAAAAAABKDAAA	Dave	Francis	Y
 AAAAAAAAAABKFBAA	Megan	Dykes	N
 AAAAAAAAAABKMBAA	Velma	Thompson	Y
 AAAAAAAAAABMNAAA	Shellie	Green	Y
+AAAAAAAAAABOCAAA	Jesus	Taylor	N
 AAAAAAAAAABOIBAA	Melissa	Thompson	N
 AAAAAAAAAABPHBAA	Rebecca	Coleman	N
 AAAAAAAAAABPPAAA	Michael	Somers	N
@@ -99,5 +100,4 @@ AAAAAAAAAAELCBAA	Edward	Lewis	Y
 AAAAAAAAAAELPAAA	Maxine	Hilton	Y
 AAAAAAAAAAEMBAAA	Jerry	Kaplan	N
 AAAAAAAAAAEMDAAA	Donna	Rivera	N
-AAAAAAAAAAEMHBAA	Quinton	Downes	N
 
diff --git a/regression-test/data/tpcds_sf100_dup_without_key_p2/sql/q49.out b/regression-test/data/tpcds_sf100_dup_without_key_p2/sql/q49.out
index 57bdcea49c..25eb4aa780 100644
--- a/regression-test/data/tpcds_sf100_dup_without_key_p2/sql/q49.out
+++ b/regression-test/data/tpcds_sf100_dup_without_key_p2/sql/q49.out
@@ -1,5 +1,5 @@
 -- This file is automatically generated. You should know what you did if you want to edit this
--- !q49_rewrite --
+-- !q49 --
 catalog	23543	0E-8	1	3
 catalog	140373	0E-8	1	287
 catalog	31125	0E-8	1	294


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org