You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2022/07/14 04:11:04 UTC

[doris] branch master updated: [feature-wip](unique-key-merge-on-write) Add option to enable unique-key-merge-on-write, DSIP-018[5/1] (#10814)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 13e9cb146f [feature-wip](unique-key-merge-on-write) Add option to enable unique-key-merge-on-write, DSIP-018[5/1] (#10814)
13e9cb146f is described below

commit 13e9cb146ff405112a61cb80293829d91fb3cdae
Author: zhannngchen <48...@users.noreply.github.com>
AuthorDate: Thu Jul 14 12:10:58 2022 +0800

    [feature-wip](unique-key-merge-on-write) Add option to enable unique-key-merge-on-write, DSIP-018[5/1] (#10814)
    
    * Add option in FE
    
    * add opt in be
    
    * some fix
    
    * update
    
    * fix code style
    
    * fix typo
    
    * fix typo
    
    * update
    
    * code format
---
 be/src/olap/tablet.h                               |  5 +++
 be/src/olap/tablet_manager.cpp                     |  9 ++---
 be/src/olap/tablet_meta.cpp                        | 13 ++++++--
 be/src/olap/tablet_meta.h                          |  7 +++-
 be/test/olap/test_data/header_without_inc_rs.txt   |  3 +-
 .../java/org/apache/doris/alter/RollupJobV2.java   |  4 ++-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  4 ++-
 .../java/org/apache/doris/backup/RestoreJob.java   |  4 ++-
 .../java/org/apache/doris/catalog/OlapTable.java   | 14 ++++++++
 .../org/apache/doris/catalog/TableProperty.java    |  9 +++++
 .../apache/doris/common/util/PropertyAnalyzer.java | 20 ++++++++++++
 .../doris/datasource/InternalDataSource.java       | 21 ++++++++----
 .../org/apache/doris/master/ReportHandler.java     |  4 ++-
 .../org/apache/doris/task/CreateReplicaTask.java   | 38 +++-------------------
 .../java/org/apache/doris/task/AgentTaskTest.java  |  2 +-
 gensrc/proto/olap_file.proto                       |  2 ++
 gensrc/thrift/AgentService.thrift                  |  1 +
 17 files changed, 108 insertions(+), 52 deletions(-)

diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 83f5e3d2e2..46b331dc97 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -90,6 +90,7 @@ public:
     int version_count() const;
     Version max_version() const;
     CumulativeCompactionPolicy* cumulative_compaction_policy();
+    bool enable_unique_key_merge_on_write() const;
 
     // properties encapsulated in TabletSchema
     KeysType keys_type() const;
@@ -430,6 +431,10 @@ inline void Tablet::set_cumulative_layer_point(int64_t new_point) {
     _cumulative_point = new_point;
 }
 
+inline bool Tablet::enable_unique_key_merge_on_write() const {
+    return _tablet_meta->enable_unique_key_merge_on_write();
+}
+
 // TODO(lingbin): Why other methods that need to get information from _tablet_meta
 // are not locked, here needs a comment to explain.
 inline size_t Tablet::tablet_footprint() {
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 719166b97e..5c4cd04a17 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -791,10 +791,11 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id,
               << " tablet_id=" << tablet_id << " schema_hash=" << schema_hash
               << " path = " << schema_hash_path << " force = " << force << " restore = " << restore;
     // not add lock here, because load_tablet_from_meta already add lock
-    string header_path = TabletMeta::construct_header_file_path(schema_hash_path, tablet_id);
+    std::string header_path = TabletMeta::construct_header_file_path(schema_hash_path, tablet_id);
     // should change shard id before load tablet
-    string shard_path = path_util::dir_name(path_util::dir_name(path_util::dir_name(header_path)));
-    string shard_str = shard_path.substr(shard_path.find_last_of('/') + 1);
+    std::string shard_path =
+            path_util::dir_name(path_util::dir_name(path_util::dir_name(header_path)));
+    std::string shard_str = shard_path.substr(shard_path.find_last_of('/') + 1);
     int32_t shard = stol(shard_str);
     // load dir is called by clone, restore, storage migration
     // should change tablet uid when tablet object changed
@@ -817,7 +818,7 @@ Status TabletManager::load_tablet_from_dir(DataDir* store, TTabletId tablet_id,
     // has to change shard id here, because meta file maybe copied from other source
     // its shard is different from local shard
     tablet_meta->set_shard_id(shard);
-    string meta_binary;
+    std::string meta_binary;
     tablet_meta->serialize(&meta_binary);
     RETURN_NOT_OK_LOG(load_tablet_from_meta(store, tablet_id, schema_hash, meta_binary, true, force,
                                             restore, true),
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index ac6fe44e4a..30b0ffd319 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -45,7 +45,10 @@ Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tabl
             col_ordinal_to_unique_id, tablet_uid,
             request.__isset.tablet_type ? request.tablet_type : TTabletType::TABLET_TYPE_DISK,
             request.storage_medium, request.storage_param.storage_name, request.compression_type,
-            request.storage_policy));
+            request.storage_policy,
+            request.__isset.enable_unique_key_merge_on_write
+                    ? request.enable_unique_key_merge_on_write
+                    : false));
     return Status::OK();
 }
 
@@ -58,7 +61,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
                        const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
                        TabletUid tablet_uid, TTabletType::type tabletType,
                        TStorageMedium::type t_storage_medium, const std::string& storage_name,
-                       TCompressionType::type compression_type, const std::string& storage_policy)
+                       TCompressionType::type compression_type, const std::string& storage_policy,
+                       bool enable_unique_key_merge_on_write)
         : _tablet_uid(0, 0), _schema(new TabletSchema), _delete_bitmap(new DeleteBitmap()) {
     TabletMetaPB tablet_meta_pb;
     tablet_meta_pb.set_table_id(table_id);
@@ -77,6 +81,7 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
                                            : TabletTypePB::TABLET_TYPE_MEMORY);
     tablet_meta_pb.set_storage_medium(fs::fs_util::get_storage_medium_pb(t_storage_medium));
     tablet_meta_pb.set_remote_storage_name(storage_name);
+    tablet_meta_pb.set_enable_unique_key_merge_on_write(enable_unique_key_merge_on_write);
     tablet_meta_pb.set_storage_policy(storage_policy);
     TabletSchemaPB* schema = tablet_meta_pb.mutable_schema();
     schema->set_num_short_key_columns(tablet_schema.short_key_column_count);
@@ -462,6 +467,9 @@ void TabletMeta::init_from_pb(const TabletMetaPB& tablet_meta_pb) {
     _remote_storage_name = tablet_meta_pb.remote_storage_name();
     _storage_medium = tablet_meta_pb.storage_medium();
     _cooldown_resource = tablet_meta_pb.storage_policy();
+    if (tablet_meta_pb.has_enable_unique_key_merge_on_write()) {
+        _enable_unique_key_merge_on_write = tablet_meta_pb.enable_unique_key_merge_on_write();
+    }
 
     if (tablet_meta_pb.has_delete_bitmap()) {
         int rst_ids_size = tablet_meta_pb.delete_bitmap().rowset_ids_size();
@@ -528,6 +536,7 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
     tablet_meta_pb->set_remote_storage_name(_remote_storage_name);
     tablet_meta_pb->set_storage_medium(_storage_medium);
     tablet_meta_pb->set_storage_policy(_cooldown_resource);
+    tablet_meta_pb->set_enable_unique_key_merge_on_write(_enable_unique_key_merge_on_write);
 
     {
         std::shared_lock l(delete_bitmap().lock);
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 26163a15e4..6c907dd6be 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -89,7 +89,8 @@ public:
                TabletUid tablet_uid, TTabletType::type tabletType,
                TStorageMedium::type t_storage_medium, const std::string& remote_storage_name,
                TCompressionType::type compression_type,
-               const std::string& storage_policy = std::string());
+               const std::string& storage_policy = std::string(),
+               bool enable_unique_key_merge_on_write = false);
     // If need add a filed in TableMeta, filed init copy in copy construct function
     TabletMeta(const TabletMeta& tablet_meta);
     TabletMeta(TabletMeta&& tablet_meta) = delete;
@@ -205,6 +206,8 @@ public:
 
     DeleteBitmap& delete_bitmap() { return *_delete_bitmap; }
 
+    bool enable_unique_key_merge_on_write() { return _enable_unique_key_merge_on_write; }
+
 private:
     Status _save_meta(DataDir* data_dir);
 
@@ -244,6 +247,8 @@ private:
     // FIXME(cyx): Currently `cooldown_resource` is equivalent to `storage_policy`.
     io::ResourceId _cooldown_resource;
 
+    // may be true iff unique keys model.
+    bool _enable_unique_key_merge_on_write = false;
     std::unique_ptr<DeleteBitmap> _delete_bitmap;
 
     mutable std::shared_mutex _meta_lock;
diff --git a/be/test/olap/test_data/header_without_inc_rs.txt b/be/test/olap/test_data/header_without_inc_rs.txt
index 040b37b244..660a6957bf 100644
--- a/be/test/olap/test_data/header_without_inc_rs.txt
+++ b/be/test/olap/test_data/header_without_inc_rs.txt
@@ -151,5 +151,6 @@
     "remote_storage_name": "",
     "replica_id": 0,
     "storage_policy": "",
-    "delete_bitmap": {}
+    "delete_bitmap": {},
+    "enable_unique_key_merge_on_write": false
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index bcd73e37be..56121a385b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -236,7 +236,9 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
                                 tbl.getCopiedIndexes(),
                                 tbl.isInMemory(),
                                 tabletType,
-                                tbl.getCompressionType());
+                                null,
+                                tbl.getCompressionType(),
+                                tbl.getEnableUniqueKeyMergeOnWrite());
                         createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
                         if (this.storageFormat != null) {
                             createReplicaTask.setStorageFormat(this.storageFormat);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 44ff3cdfae..ade4bcce3e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -259,7 +259,9 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                                     shadowSchema, bfColumns, bfFpp, countDownLatch, indexes,
                                     tbl.isInMemory(),
                                     tbl.getPartitionInfo().getTabletType(partitionId),
-                                    tbl.getCompressionType());
+                                    null,
+                                    tbl.getCompressionType(),
+                                    tbl.getEnableUniqueKeyMergeOnWrite());
                             createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId)
                                     .get(shadowTabletId), originSchemaHash);
                             if (this.storageFormat != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index dbdc3ad575..570550d2d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -973,7 +973,9 @@ public class RestoreJob extends AbstractJob {
                             localTbl.getCopiedIndexes(),
                             localTbl.isInMemory(),
                             localTbl.getPartitionInfo().getTabletType(restorePart.getId()),
-                            localTbl.getCompressionType());
+                            null,
+                            localTbl.getCompressionType(),
+                            localTbl.getEnableUniqueKeyMergeOnWrite());
                     task.setInRestoreMode(true);
                     batchTask.addTask(task);
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 9e23ebe374..0b1dda64a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -1761,6 +1761,20 @@ public class OlapTable extends Table {
         return tableProperty.getRemoteStoragePolicy();
     }
 
+    public void setEnableUniqueKeyMergeOnWrite(boolean speedup) {
+        if (tableProperty == null) {
+            tableProperty = new TableProperty(new HashMap<>());
+        }
+        tableProperty.setEnableUniqueKeyMergeOnWrite(speedup);
+    }
+
+    public boolean getEnableUniqueKeyMergeOnWrite() {
+        if (tableProperty == null) {
+            return false;
+        }
+        return tableProperty.getEnableUniqueKeyMergeOnWrite();
+    }
+
     // For non partitioned table:
     //   The table's distribute hash columns need to be a subset of the aggregate columns.
     //
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 049fea9fcf..ef823eac65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -239,6 +239,15 @@ public class TableProperty implements Writable {
         return compressionType;
     }
 
+    public void setEnableUniqueKeyMergeOnWrite(boolean enable) {
+        properties.put(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, Boolean.toString(enable));
+    }
+
+    public boolean getEnableUniqueKeyMergeOnWrite() {
+        return Boolean.parseBoolean(properties.getOrDefault(
+                    PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE, "false"));
+    }
+
     public void buildReplicaAllocation() {
         try {
             // Must copy the properties because "analyzeReplicaAllocation" with remove the property
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index b5a73778cd..296f621532 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -114,6 +114,8 @@ public class PropertyAnalyzer {
     private static final double MAX_FPP = 0.05;
     private static final double MIN_FPP = 0.0001;
 
+    public static final String ENABLE_UNIQUE_KEY_MERGE_ON_WRITE = "enable_unique_key_merge_on_write";
+
     /**
      * check and replace members of DataProperty by properties.
      *
@@ -666,4 +668,22 @@ public class PropertyAnalyzer {
         DataSortInfo dataSortInfo = new DataSortInfo(sortType, colNum);
         return dataSortInfo;
     }
+
+    public static boolean analyzeUniqueKeyMergeOnWrite(Map<String, String> properties) throws AnalysisException {
+        if (properties == null || properties.isEmpty()) {
+            return false;
+        }
+        String value = properties.get(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE);
+        if (value == null) {
+            return false;
+        }
+        properties.remove(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE);
+        if (value.equals("true")) {
+            return true;
+        } else if (value.equals("false")) {
+            return false;
+        }
+        throw new AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE
+                                    + " must be `true` or `false`");
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
index 0dc170154a..c94b493021 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalDataSource.java
@@ -1321,7 +1321,8 @@ public class InternalDataSource implements DataSourceIf<Database> {
                     dataProperty.getStorageMedium(), singlePartitionDesc.getReplicaAlloc(),
                     singlePartitionDesc.getVersionInfo(), bfColumns, olapTable.getBfFpp(), tabletIdSet,
                     olapTable.getCopiedIndexes(), singlePartitionDesc.isInMemory(), olapTable.getStorageFormat(),
-                    singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo());
+                    singlePartitionDesc.getTabletType(), olapTable.getCompressionType(), olapTable.getDataSortInfo(),
+                    olapTable.getEnableUniqueKeyMergeOnWrite());
 
             // check again
             table = db.getOlapTableOrDdlException(tableName);
@@ -1542,7 +1543,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
             DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc,
             Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes,
             boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType,
-            DataSortInfo dataSortInfo) throws DdlException {
+            DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite) throws DdlException {
         // create base index first.
         Preconditions.checkArgument(baseIndexId != -1);
         MaterializedIndex baseIndex = new MaterializedIndex(baseIndexId, IndexState.NORMAL);
@@ -1602,7 +1603,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
                     CreateReplicaTask task = new CreateReplicaTask(backendId, dbId, tableId, partitionId, indexId,
                             tabletId, replicaId, shortKeyColumnCount, schemaHash, version, keysType, storageType,
                             storageMedium, schema, bfColumns, bfFpp, countDownLatch, indexes, isInMemory, tabletType,
-                            dataSortInfo, compressionType);
+                            dataSortInfo, compressionType, enableUniqueKeyMergeOnWrite);
                     task.setStorageFormat(storageFormat);
                     batchTask.addTask(task);
                     // add to AgentTaskQueue for handling finish report.
@@ -1741,6 +1742,14 @@ public class InternalDataSource implements DataSourceIf<Database> {
                 keysDesc.keysColumnSize(), storageFormat);
         olapTable.setDataSortInfo(dataSortInfo);
 
+        boolean enableUniqueKeyMergeOnWrite = false;
+        try {
+            enableUniqueKeyMergeOnWrite = PropertyAnalyzer.analyzeUniqueKeyMergeOnWrite(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        olapTable.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
+
         // analyze bloom filter columns
         Set<String> bfColumns = null;
         double bfFpp = 0;
@@ -1919,7 +1928,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
                         partitionDistributionInfo, partitionInfo.getDataProperty(partitionId).getStorageMedium(),
                         partitionInfo.getReplicaAllocation(partitionId), versionInfo, bfColumns, bfFpp, tabletIdSet,
                         olapTable.getCopiedIndexes(), isInMemory, storageFormat, tabletType, compressionType,
-                        olapTable.getDataSortInfo());
+                        olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite());
                 olapTable.addPartition(partition);
             } else if (partitionInfo.getType() == PartitionType.RANGE
                     || partitionInfo.getType() == PartitionType.LIST) {
@@ -1970,7 +1979,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
                             partitionInfo.getReplicaAllocation(entry.getValue()), versionInfo, bfColumns, bfFpp,
                             tabletIdSet, olapTable.getCopiedIndexes(), isInMemory, storageFormat,
                             partitionInfo.getTabletType(entry.getValue()), compressionType,
-                            olapTable.getDataSortInfo());
+                            olapTable.getDataSortInfo(), olapTable.getEnableUniqueKeyMergeOnWrite());
                     olapTable.addPartition(partition);
                 }
             } else {
@@ -2348,7 +2357,7 @@ public class InternalDataSource implements DataSourceIf<Database> {
                         copiedTbl.getCopiedBfColumns(), copiedTbl.getBfFpp(), tabletIdSet, copiedTbl.getCopiedIndexes(),
                         copiedTbl.isInMemory(), copiedTbl.getStorageFormat(),
                         copiedTbl.getPartitionInfo().getTabletType(oldPartitionId), copiedTbl.getCompressionType(),
-                        copiedTbl.getDataSortInfo());
+                        copiedTbl.getDataSortInfo(), copiedTbl.getEnableUniqueKeyMergeOnWrite());
                 newPartitions.add(newPartition);
             }
         } catch (DdlException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index 96719a2987..46e41188a3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -603,7 +603,9 @@ public class ReportHandler extends Daemon {
                                             olapTable.getCopiedIndexes(),
                                             olapTable.isInMemory(),
                                             olapTable.getPartitionInfo().getTabletType(partitionId),
-                                            olapTable.getCompressionType());
+                                            null,
+                                            olapTable.getCompressionType(),
+                                            olapTable.getEnableUniqueKeyMergeOnWrite());
                                     createReplicaTask.setIsRecoverTask(true);
                                     createReplicaBatchTask.addTask(createReplicaTask);
                                 } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 0e45b86cf2..c876bfa77a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -89,38 +89,7 @@ public class CreateReplicaTask extends AgentTask {
 
     private DataSortInfo dataSortInfo;
 
-    public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
-                             long replicaId, short shortKeyColumnCount, int schemaHash, long version,
-                             KeysType keysType, TStorageType storageType,
-                             TStorageMedium storageMedium, List<Column> columns,
-                             Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch,
-                             List<Index> indexes,
-                             boolean isInMemory,
-                             TTabletType tabletType, TCompressionType compressionType) {
-        super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
-
-        this.replicaId = replicaId;
-        this.shortKeyColumnCount = shortKeyColumnCount;
-        this.schemaHash = schemaHash;
-
-        this.version = version;
-
-        this.keysType = keysType;
-        this.storageType = storageType;
-        this.storageMedium = storageMedium;
-        this.compressionType = compressionType;
-
-        this.columns = columns;
-
-        this.bfColumns = bfColumns;
-        this.indexes = indexes;
-        this.bfFpp = bfFpp;
-
-        this.latch = latch;
-
-        this.isInMemory = isInMemory;
-        this.tabletType = tabletType;
-    }
+    private boolean enableUniqueKeyMergeOnWrite;
 
     public CreateReplicaTask(long backendId, long dbId, long tableId, long partitionId, long indexId, long tabletId,
                              long replicaId, short shortKeyColumnCount, int schemaHash, long version,
@@ -131,7 +100,8 @@ public class CreateReplicaTask extends AgentTask {
                              boolean isInMemory,
                              TTabletType tabletType,
                              DataSortInfo dataSortInfo,
-                             TCompressionType compressionType) {
+                             TCompressionType compressionType,
+                             boolean enableUniqueKeyMergeOnWrite) {
         super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
 
         this.replicaId = replicaId;
@@ -156,6 +126,7 @@ public class CreateReplicaTask extends AgentTask {
         this.isInMemory = isInMemory;
         this.tabletType = tabletType;
         this.dataSortInfo = dataSortInfo;
+        this.enableUniqueKeyMergeOnWrite = (keysType == KeysType.UNIQUE_KEYS && enableUniqueKeyMergeOnWrite);
     }
 
     public void setIsRecoverTask(boolean isRecoverTask) {
@@ -277,6 +248,7 @@ public class CreateReplicaTask extends AgentTask {
 
         createTabletReq.setTabletType(tabletType);
         createTabletReq.setCompressionType(compressionType);
+        createTabletReq.setEnableUniqueKeyMergeOnWrite(enableUniqueKeyMergeOnWrite);
         return createTabletReq;
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index a0ba30612a..2aae6679af 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -109,7 +109,7 @@ public class AgentTaskTest {
                                                   version, KeysType.AGG_KEYS,
                                                   storageType, TStorageMedium.SSD,
                                                   columns, null, 0, latch, null,
-                                                  false, TTabletType.TABLET_TYPE_DISK, TCompressionType.LZ4F);
+                                                  false, TTabletType.TABLET_TYPE_DISK, null, TCompressionType.LZ4F, false);
 
         // drop
         dropTask = new DropReplicaTask(backendId1, tabletId1, replicaId1, schemaHash1);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index e7a3893fc1..0ce1ca34cf 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -283,6 +283,8 @@ message TabletMetaPB {
     optional int64 replica_id = 21 [default = 0];
     optional string storage_policy = 22;
     optional DeleteBitmapPB delete_bitmap = 23;
+    // Use primary key index to speed up tabel unique key model
+    optional bool enable_unique_key_merge_on_write = 24 [default = false];
 }
 
 message OLAPIndexHeaderMessage {
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index 78c1fe0c3b..49d967f078 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -121,6 +121,7 @@ struct TCreateTabletReq {
     16: optional TCompressionType compression_type = TCompressionType.LZ4F
     17: optional Types.TReplicaId replica_id = 0
     18: optional string storage_policy
+    19: optional bool enable_unique_key_merge_on_write = false
 }
 
 struct TDropTabletReq {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org