You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/27 13:52:13 UTC

[incubator-doris] branch master updated: [feature] Support compression prop (#8923)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b2c2cdb122 [feature] Support compression prop (#8923)
b2c2cdb122 is described below

commit b2c2cdb122b608c6391932ce5ef7b233ea9dc6bb
Author: Lightman <31...@users.noreply.github.com>
AuthorDate: Fri May 27 21:52:05 2022 +0800

    [feature] Support compression prop (#8923)
---
 be/src/olap/rowset/segment_v2/column_writer.cpp    |  4 +--
 be/src/olap/rowset/segment_v2/segment_writer.cpp   | 10 ++++---
 be/src/olap/rowset/segment_v2/segment_writer.h     |  2 +-
 be/src/olap/tablet_meta.cpp                        | 31 ++++++++++++++++++++--
 be/src/olap/tablet_meta.h                          |  3 ++-
 be/src/olap/tablet_schema.cpp                      |  2 ++
 be/src/olap/tablet_schema.h                        |  4 +++
 .../java/org/apache/doris/alter/RollupJobV2.java   |  3 ++-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  3 ++-
 .../java/org/apache/doris/backup/RestoreJob.java   |  3 ++-
 .../java/org/apache/doris/catalog/Catalog.java     | 28 ++++++++++++++++---
 .../java/org/apache/doris/catalog/OlapTable.java   | 16 +++++++++++
 .../org/apache/doris/catalog/TableProperty.java    | 16 ++++++++++-
 .../apache/doris/common/util/PropertyAnalyzer.java | 31 ++++++++++++++++++++++
 .../org/apache/doris/master/ReportHandler.java     |  3 ++-
 .../org/apache/doris/task/CreateReplicaTask.java   | 10 +++++--
 .../org/apache/doris/catalog/CreateTableTest.java  | 10 +++++++
 .../java/org/apache/doris/task/AgentTaskTest.java  |  3 ++-
 gensrc/proto/olap_file.proto                       |  2 ++
 gensrc/thrift/AgentService.thrift                  | 13 +++++++++
 20 files changed, 176 insertions(+), 21 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 9a54b210c8..6c76ddff62 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -121,7 +121,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
             length_options.meta->set_length(
                     get_scalar_type_info<OLAP_FIELD_TYPE_UNSIGNED_INT>()->size());
             length_options.meta->set_encoding(DEFAULT_ENCODING);
-            length_options.meta->set_compression(LZ4F);
+            length_options.meta->set_compression(opts.meta->compression());
 
             length_options.need_zone_map = false;
             length_options.need_bloom_filter = false;
@@ -149,7 +149,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
                 null_options.meta->set_length(
                         get_scalar_type_info<OLAP_FIELD_TYPE_TINYINT>()->size());
                 null_options.meta->set_encoding(DEFAULT_ENCODING);
-                null_options.meta->set_compression(LZ4F);
+                null_options.meta->set_compression(opts.meta->compression());
 
                 null_options.need_zone_map = false;
                 null_options.need_bloom_filter = false;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 4db2549683..f95b62c9c2 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -63,17 +63,19 @@ SegmentWriter::~SegmentWriter() {
 };
 
 void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t* column_id,
-                                     const TabletColumn& column) {
+                                     const TabletColumn& column,
+                                     const TabletSchema* tablet_schema) {
     // TODO(zc): Do we need this column_id??
     meta->set_column_id((*column_id)++);
     meta->set_unique_id(column.unique_id());
     meta->set_type(column.type());
     meta->set_length(column.length());
     meta->set_encoding(DEFAULT_ENCODING);
-    meta->set_compression(LZ4F);
+    meta->set_compression(tablet_schema->compression_type());
     meta->set_is_nullable(column.is_nullable());
     for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
-        init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i));
+        init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i),
+                         tablet_schema);
     }
 }
 
@@ -84,7 +86,7 @@ Status SegmentWriter::init(uint32_t write_mbytes_per_sec __attribute__((unused))
         ColumnWriterOptions opts;
         opts.meta = _footer.add_columns();
 
-        init_column_meta(opts.meta, &column_id, column);
+        init_column_meta(opts.meta, &column_id, column, _tablet_schema);
 
         // now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS
         // and not support zone map for array type.
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h
index ab928b51e1..67fb9e02a2 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -80,7 +80,7 @@ public:
     Status finalize(uint64_t* segment_file_size, uint64_t* index_size);
 
     static void init_column_meta(ColumnMetaPB* meta, uint32_t* column_id,
-                                 const TabletColumn& column);
+                                 const TabletColumn& column, const TabletSchema* tablet_schema);
 
 private:
     DISALLOW_COPY_AND_ASSIGN(SegmentWriter);
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 0cd9202a72..e299d1a048 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -43,7 +43,7 @@ Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tabl
             request.tablet_schema.schema_hash, shard_id, request.tablet_schema, next_unique_id,
             col_ordinal_to_unique_id, tablet_uid,
             request.__isset.tablet_type ? request.tablet_type : TTabletType::TABLET_TYPE_DISK,
-            request.storage_medium, request.storage_param.storage_name));
+            request.storage_medium, request.storage_param.storage_name, request.compression_type));
     return Status::OK();
 }
 
@@ -54,7 +54,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
                        uint32_t next_unique_id,
                        const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
                        TabletUid tablet_uid, TTabletType::type tabletType,
-                       TStorageMedium::type t_storage_medium, const std::string& storage_name)
+                       TStorageMedium::type t_storage_medium, const std::string& storage_name,
+                       TCompressionType::type compression_type)
         : _tablet_uid(0, 0), _schema(new TabletSchema) {
     TabletMetaPB tablet_meta_pb;
     tablet_meta_pb.set_table_id(table_id);
@@ -90,8 +91,34 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
         LOG(WARNING) << "unknown tablet keys type";
         break;
     }
+    // compress_kind used to compress segment files
     schema->set_compress_kind(COMPRESS_LZ4);
 
+    // compression_type used to compress segment page
+    switch (compression_type) {
+    case TCompressionType::NO_COMPRESSION:
+        schema->set_compression_type(NO_COMPRESSION);
+        break;
+    case TCompressionType::SNAPPY:
+        schema->set_compression_type(SNAPPY);
+        break;
+    case TCompressionType::LZ4:
+        schema->set_compression_type(LZ4);
+        break;
+    case TCompressionType::LZ4F:
+        schema->set_compression_type(LZ4F);
+        break;
+    case TCompressionType::ZLIB:
+        schema->set_compression_type(ZLIB);
+        break;
+    case TCompressionType::ZSTD:
+        schema->set_compression_type(ZSTD);
+        break;
+    default:
+        schema->set_compression_type(LZ4F);
+        break;
+    }
+
     switch (tablet_schema.sort_type) {
     case TSortType::type::ZORDER:
         schema->set_sort_type(SortType::ZORDER);
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index c0b165a7bb..c8709941e5 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -83,7 +83,8 @@ public:
                uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t next_unique_id,
                const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
                TabletUid tablet_uid, TTabletType::type tabletType,
-               TStorageMedium::type t_storage_medium, const std::string& remote_storage_name);
+               TStorageMedium::type t_storage_medium, const std::string& remote_storage_name,
+               TCompressionType::type compression_type);
     // If need add a filed in TableMeta, filed init copy in copy construct function
     TabletMeta(const TabletMeta& tablet_meta);
     TabletMeta(TabletMeta&& tablet_meta) = delete;
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 702fef36a5..6e9703f569 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -441,6 +441,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
     _sequence_col_idx = schema.sequence_col_idx();
     _sort_type = schema.sort_type();
     _sort_col_num = schema.sort_col_num();
+    _compression_type = schema.compression_type();
 }
 
 void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) {
@@ -461,6 +462,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) {
     tablet_meta_pb->set_sequence_col_idx(_sequence_col_idx);
     tablet_meta_pb->set_sort_type(_sort_type);
     tablet_meta_pb->set_sort_col_num(_sort_col_num);
+    tablet_meta_pb->set_compression_type(_compression_type);
 }
 
 uint32_t TabletSchema::mem_size() const {
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 8554bebb3b..7c3209ee82 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -20,6 +20,7 @@
 #include <vector>
 
 #include "gen_cpp/olap_file.pb.h"
+#include "gen_cpp/segment_v2.pb.h"
 #include "olap/olap_define.h"
 #include "olap/types.h"
 
@@ -145,6 +146,8 @@ public:
     void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx = delete_sign_idx; }
     bool has_sequence_col() const { return _sequence_col_idx != -1; }
     int32_t sequence_col_idx() const { return _sequence_col_idx; }
+    segment_v2::CompressionTypePB compression_type() const { return _compression_type; }
+
     vectorized::Block create_block(
             const std::vector<uint32_t>& return_columns,
             const std::unordered_set<uint32_t>* tablet_columns_need_convert_null = nullptr) const;
@@ -168,6 +171,7 @@ private:
     size_t _num_short_key_columns = 0;
     size_t _num_rows_per_row_block = 0;
     CompressKind _compress_kind = COMPRESS_NONE;
+    segment_v2::CompressionTypePB _compression_type = segment_v2::CompressionTypePB::LZ4F;
     size_t _next_column_unique_id = 0;
 
     bool _has_bf_fpp = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 53027c2357..7f3a115ed4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -231,7 +231,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
                                 rollupSchema, tbl.getCopiedBfColumns(), tbl.getBfFpp(), countDownLatch,
                                 tbl.getCopiedIndexes(),
                                 tbl.isInMemory(),
-                                tabletType);
+                                tabletType,
+                                tbl.getCompressionType());
                         createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
                         if (this.storageFormat != null) {
                             createReplicaTask.setStorageFormat(this.storageFormat);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 2d55523d75..a7faa58743 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -253,7 +253,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                                     originKeysType, TStorageType.COLUMN, storageMedium,
                                     shadowSchema, bfColumns, bfFpp, countDownLatch, indexes,
                                     tbl.isInMemory(),
-                                    tbl.getPartitionInfo().getTabletType(partitionId));
+                                    tbl.getPartitionInfo().getTabletType(partitionId),
+                                    tbl.getCompressionType());
                             createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId), originSchemaHash);
                             if (this.storageFormat != null) {
                                 createReplicaTask.setStorageFormat(this.storageFormat);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 9445f74413..922b314a21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -964,7 +964,8 @@ public class RestoreJob extends AbstractJob {
                             indexMeta.getSchema(), bfColumns, bfFpp, null,
                             localTbl.getCopiedIndexes(),
                             localTbl.isInMemory(),
-                            localTbl.getPartitionInfo().getTabletType(restorePart.getId()));
+                            localTbl.getPartitionInfo().getTabletType(restorePart.getId()),
+                            localTbl.getCompressionType());
                     task.setInRestoreMode(true);
                     batchTask.addTask(task);
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 50c3c40e78..45de5422c7 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -245,6 +245,7 @@ import org.apache.doris.task.CreateReplicaTask;
 import org.apache.doris.task.DropReplicaTask;
 import org.apache.doris.task.MasterTaskExecutor;
 import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
@@ -3026,6 +3027,7 @@ public class Catalog {
      *     6.2. replicationNum
      *     6.3. inMemory
      *     6.4. storageFormat
+     *     6.5. compressionType
      * 7. set index meta
      * 8. check colocation properties
      * 9. create tablet in BE
@@ -3314,6 +3316,7 @@ public class Catalog {
                     singlePartitionDesc.isInMemory(),
                     olapTable.getStorageFormat(),
                     singlePartitionDesc.getTabletType(),
+                    olapTable.getCompressionType(),
                     olapTable.getDataSortInfo()
             );
 
@@ -3545,6 +3548,7 @@ public class Catalog {
                                                  boolean isInMemory,
                                                  TStorageFormat storageFormat,
                                                  TTabletType tabletType,
+                                                 TCompressionType compressionType,
                                                  DataSortInfo dataSortInfo) throws DdlException {
         // create base index first.
         Preconditions.checkArgument(baseIndexId != -1);
@@ -3612,7 +3616,8 @@ public class Catalog {
                             indexes,
                             isInMemory,
                             tabletType,
-                            dataSortInfo);
+                            dataSortInfo,
+                            compressionType);
                     task.setStorageFormat(storageFormat);
                     batchTask.addTask(task);
                     // add to AgentTaskQueue for handling finish report.
@@ -3730,6 +3735,15 @@ public class Catalog {
         }
         olapTable.setStorageFormat(storageFormat);
 
+        // get compression type
+        TCompressionType compressionType = TCompressionType.LZ4;
+        try {
+            compressionType = PropertyAnalyzer.analyzeCompressionType(properties);
+        } catch (AnalysisException e) {
+            throw new DdlException(e.getMessage());
+        }
+        olapTable.setCompressionType(compressionType);
+
         // check data sort properties
         DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
                 keysDesc.keysColumnSize(), storageFormat);
@@ -3778,6 +3792,7 @@ public class Catalog {
             throw new DdlException(e.getMessage());
         }
 
+
         if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
             // if this is an unpartitioned table, we should analyze data property and replication num here.
             // if this is a partitioned table, there properties are already analyzed in RangePartitionDesc analyze phase.
@@ -3914,7 +3929,7 @@ public class Catalog {
                         partitionInfo.getReplicaAllocation(partitionId),
                         versionInfo, bfColumns, bfFpp,
                         tabletIdSet, olapTable.getCopiedIndexes(),
-                        isInMemory, storageFormat, tabletType, olapTable.getDataSortInfo());
+                        isInMemory, storageFormat, tabletType, compressionType, olapTable.getDataSortInfo());
                 olapTable.addPartition(partition);
             } else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
                 try {
@@ -3965,7 +3980,8 @@ public class Catalog {
                             versionInfo, bfColumns, bfFpp,
                             tabletIdSet, olapTable.getCopiedIndexes(),
                             isInMemory, storageFormat,
-                            partitionInfo.getTabletType(entry.getValue()), olapTable.getDataSortInfo());
+                            partitionInfo.getTabletType(entry.getValue()),
+                            compressionType, olapTable.getDataSortInfo());
                     olapTable.addPartition(partition);
                 }
             } else {
@@ -4339,6 +4355,11 @@ public class Catalog {
                 sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE).append("\" = \"");
                 sb.append(remoteStorageResource).append("\"");
             }
+            // compression type
+            if (olapTable.getCompressionType() != TCompressionType.LZ4F) {
+                sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPRESSION).append("\" = \"");
+                sb.append(olapTable.getCompressionType()).append("\"");
+            }
 
             sb.append("\n)");
         } else if (table.getType() == TableType.MYSQL) {
@@ -6882,6 +6903,7 @@ public class Catalog {
                         copiedTbl.isInMemory(),
                         copiedTbl.getStorageFormat(),
                         copiedTbl.getPartitionInfo().getTabletType(oldPartitionId),
+                        copiedTbl.getCompressionType(),
                         copiedTbl.getDataSortInfo());
                 newPartitions.add(newPartition);
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index c7401e08ae..25eb7f65db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -48,6 +48,7 @@ import org.apache.doris.qe.OriginStatement;
 import org.apache.doris.resource.Tag;
 import org.apache.doris.system.Backend;
 import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TOlapTable;
 import org.apache.doris.thrift.TSortType;
 import org.apache.doris.thrift.TStorageFormat;
@@ -1679,6 +1680,14 @@ public class OlapTable extends Table {
         return !tempPartitions.isEmpty();
     }
 
+    public void setCompressionType(TCompressionType compressionType) {
+        if (tableProperty == null) {
+            tableProperty = new TableProperty(new HashMap<>());
+        }
+        tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_COMPRESSION, compressionType.name());
+        tableProperty.buildCompressionType();
+    }
+
     public void setStorageFormat(TStorageFormat storageFormat) {
         if (tableProperty == null) {
             tableProperty = new TableProperty(new HashMap<>());
@@ -1694,6 +1703,13 @@ public class OlapTable extends Table {
         return tableProperty.getStorageFormat();
     }
 
+    public TCompressionType getCompressionType() {
+        if (tableProperty == null) {
+            return TCompressionType.LZ4F;
+        }
+        return tableProperty.getCompressionType();
+    }
+
     public DataSortInfo getDataSortInfo() {
         if (tableProperty == null) {
             return new DataSortInfo(TSortType.LEXICAL, this.getKeysNum());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index f9d2063284..b9c3835c96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.persist.OperationType;
 import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TStorageFormat;
 
 import com.google.common.base.Strings;
@@ -67,6 +68,8 @@ public class TableProperty implements Writable {
      */
     private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
 
+    private TCompressionType compressionType = TCompressionType.LZ4F;
+
     private DataSortInfo dataSortInfo = new DataSortInfo();
 
     // remote storage resource, for cold data
@@ -147,6 +150,12 @@ public class TableProperty implements Writable {
         return this;
     }
 
+    public TableProperty buildCompressionType() {
+        compressionType = TCompressionType.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPRESSION,
+                TCompressionType.LZ4F.name()));
+        return this;
+    }
+
     public TableProperty buildStorageFormat() {
         storageFormat = TStorageFormat.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT,
                 TStorageFormat.DEFAULT.name()));
@@ -227,6 +236,10 @@ public class TableProperty implements Writable {
         return remoteStorageResource;
     }
 
+    public TCompressionType getCompressionType() {
+        return compressionType;
+    }
+
     public void buildReplicaAllocation() {
         try {
             // Must copy the properties because "analyzeReplicaAllocation" with remove the property
@@ -251,7 +264,8 @@ public class TableProperty implements Writable {
                 .buildInMemory()
                 .buildStorageFormat()
                 .buildDataSortInfo()
-                .buildRemoteStorageResource();
+                .buildRemoteStorageResource()
+                .buildCompressionType();
         if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_105) {
             // get replica num from property map and create replica allocation
             String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 20aac35971..5202122410 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.resource.Tag;
+import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TSortType;
 import org.apache.doris.thrift.TStorageFormat;
 import org.apache.doris.thrift.TStorageMedium;
@@ -75,6 +76,7 @@ public class PropertyAnalyzer {
     public static final String PROPERTIES_COLOCATE_WITH = "colocate_with";
 
     public static final String PROPERTIES_TIMEOUT = "timeout";
+    public static final String PROPERTIES_COMPRESSION = "compression";
 
     public static final String PROPERTIES_DISTRIBUTION_TYPE = "distribution_type";
     public static final String PROPERTIES_SEND_CLEAR_ALTER_TASK = "send_clear_alter_tasks";
@@ -433,6 +435,35 @@ public class PropertyAnalyzer {
         return timeout;
     }
 
+    // analyzeCompressionType will parse the compression type from properties
+    public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws  AnalysisException {
+        String compressionType = "";
+        if (properties != null && properties.containsKey(PROPERTIES_COMPRESSION)) {
+            compressionType = properties.get(PROPERTIES_COMPRESSION);
+            properties.remove(PROPERTIES_COMPRESSION);
+        } else {
+            return TCompressionType.LZ4F;
+        }
+
+        if (compressionType.equalsIgnoreCase("no_compression")) {
+            return TCompressionType.NO_COMPRESSION;
+        } else if (compressionType.equalsIgnoreCase("lz4")) {
+            return TCompressionType.LZ4;
+        } else if (compressionType.equalsIgnoreCase("lz4f")) {
+            return TCompressionType.LZ4F;
+        } else if (compressionType.equalsIgnoreCase("zlib")) {
+            return TCompressionType.ZLIB;
+        } else if (compressionType.equalsIgnoreCase("zstd")) {
+            return TCompressionType.ZSTD;
+        } else if (compressionType.equalsIgnoreCase("snappy")) {
+            return TCompressionType.SNAPPY;
+        } else if (compressionType.equalsIgnoreCase("default_compression")) {
+            return TCompressionType.LZ4F;
+        } else {
+            throw new AnalysisException("unknown compression type: " + compressionType);
+        }
+    }
+
     // analyzeStorageFormat will parse the storage format from properties
     // sql: alter table tablet_name set ("storage_format" = "v2")
     // Use this sql to convert all tablets(base and rollup index) to a new format segment
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index d130fbaf0a..cdca5251fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -601,7 +601,8 @@ public class ReportHandler extends Daemon {
                                             TStorageMedium.HDD, indexMeta.getSchema(), bfColumns, bfFpp, null,
                                             olapTable.getCopiedIndexes(),
                                             olapTable.isInMemory(),
-                                            olapTable.getPartitionInfo().getTabletType(partitionId));
+                                            olapTable.getPartitionInfo().getTabletType(partitionId),
+                                            olapTable.getCompressionType());
                                     createReplicaTask.setIsRecoverTask(true);
                                     createReplicaBatchTask.addTask(createReplicaTask);
                                 } else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 29068f7542..8718dd7663 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.KeysType;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.Status;
 import org.apache.doris.thrift.TColumn;
+import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TCreateTabletReq;
 import org.apache.doris.thrift.TOlapTableIndex;
 import org.apache.doris.thrift.TStatusCode;
@@ -54,6 +55,7 @@ public class CreateReplicaTask extends AgentTask {
     private KeysType keysType;
     private TStorageType storageType;
     private TStorageMedium storageMedium;
+    private TCompressionType compressionType;
 
     private List<Column> columns;
 
@@ -93,7 +95,7 @@ public class CreateReplicaTask extends AgentTask {
                              Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch,
                              List<Index> indexes,
                              boolean isInMemory,
-                             TTabletType tabletType) {
+                             TTabletType tabletType, TCompressionType compressionType) {
         super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
 
         this.shortKeyColumnCount = shortKeyColumnCount;
@@ -104,6 +106,7 @@ public class CreateReplicaTask extends AgentTask {
         this.keysType = keysType;
         this.storageType = storageType;
         this.storageMedium = storageMedium;
+        this.compressionType = compressionType;
 
         this.columns = columns;
 
@@ -125,7 +128,8 @@ public class CreateReplicaTask extends AgentTask {
                              List<Index> indexes,
                              boolean isInMemory,
                              TTabletType tabletType,
-                             DataSortInfo dataSortInfo) {
+                             DataSortInfo dataSortInfo,
+                             TCompressionType compressionType) {
         super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
 
         this.shortKeyColumnCount = shortKeyColumnCount;
@@ -136,6 +140,7 @@ public class CreateReplicaTask extends AgentTask {
         this.keysType = keysType;
         this.storageType = storageType;
         this.storageMedium = storageMedium;
+        this.compressionType = compressionType;
 
         this.columns = columns;
 
@@ -267,6 +272,7 @@ public class CreateReplicaTask extends AgentTask {
         }
 
         createTabletReq.setTabletType(tabletType);
+        createTabletReq.setCompressionType(compressionType);
         return createTabletReq;
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index 93dbf9f7ea..513b3cef1c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -138,6 +138,16 @@ public class CreateTableTest {
                 .expectThrowsNoException(() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) \n"
                         + "distributed by hash(key1) buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
 
+        ExceptionChecker
+                .expectThrowsNoException(() -> createTable("create table test.compression1(key1 int, key2 varchar(10)) \n"
+                        + "distributed by hash(key1) buckets 1 \n"
+                        + "properties('replication_num' = '1', 'compression' = 'lz4f');"));
+
+        ExceptionChecker
+                .expectThrowsNoException(() -> createTable("create table test.compression2(key1 int, key2 varchar(10)) \n"
+                        + "distributed by hash(key1) buckets 1 \n"
+                        + "properties('replication_num' = '1', 'compression' = 'snappy');"));
+
         ExceptionChecker
                 .expectThrowsNoException(() -> createTable("create table test.tbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n"
                         + "unique key(k1, k2)\n"
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index db47288484..31b84fa4af 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.thrift.TAgentTaskRequest;
 import org.apache.doris.thrift.TBackend;
+import org.apache.doris.thrift.TCompressionType;
 import org.apache.doris.thrift.TPriority;
 import org.apache.doris.thrift.TPushType;
 import org.apache.doris.thrift.TStorageMedium;
@@ -110,7 +111,7 @@ public class AgentTaskTest {
                                                   version, KeysType.AGG_KEYS,
                                                   storageType, TStorageMedium.SSD,
                                                   columns, null, 0, latch, null,
-                                                  false, TTabletType.TABLET_TYPE_DISK);
+                                                  false, TTabletType.TABLET_TYPE_DISK, TCompressionType.LZ4F);
 
         // drop
         dropTask = new DropReplicaTask(backendId1, tabletId1, schemaHash1);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index b67d9c11cd..ac7c08580b 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -23,6 +23,7 @@ option java_package = "org.apache.doris.proto";
 
 import "olap_common.proto";
 import "types.proto";
+import "segment_v2.proto";
 
 message ZoneMap {
     required bytes min = 1;
@@ -195,6 +196,7 @@ message TabletSchemaPB {
     optional int32 sequence_col_idx = 10 [default= -1];
     optional SortType sort_type = 11;
     optional int32 sort_col_num = 12;
+    optional segment_v2.CompressionTypePB compression_type = 13;
 }
 
 enum TabletStatePB {
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index 1eb01e8c5f..2233fae3fe 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -84,6 +84,18 @@ struct TStorageParam {
     3: optional TS3StorageParam s3_storage_param
 }
 
+enum TCompressionType {
+    UNKNOWN_COMPRESSION = 0,
+    DEFAULT_COMPRESSION = 1,
+    NO_COMPRESSION = 2,
+    SNAPPY = 3,
+    LZ4 = 4,
+    LZ4F = 5,
+    ZLIB = 6,
+    ZSTD = 7
+}
+
+
 struct TCreateTabletReq {
     1: required Types.TTabletId tablet_id
     2: required TTabletSchema tablet_schema
@@ -105,6 +117,7 @@ struct TCreateTabletReq {
     13: optional TStorageFormat storage_format
     14: optional TTabletType tablet_type
     15: optional TStorageParam storage_param
+    16: optional TCompressionType compression_type = TCompressionType.LZ4F
 }
 
 struct TDropTabletReq {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org