You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/27 13:52:13 UTC
[incubator-doris] branch master updated: [feature] Support compression prop (#8923)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new b2c2cdb122 [feature] Support compression prop (#8923)
b2c2cdb122 is described below
commit b2c2cdb122b608c6391932ce5ef7b233ea9dc6bb
Author: Lightman <31...@users.noreply.github.com>
AuthorDate: Fri May 27 21:52:05 2022 +0800
[feature] Support compression prop (#8923)
---
be/src/olap/rowset/segment_v2/column_writer.cpp | 4 +--
be/src/olap/rowset/segment_v2/segment_writer.cpp | 10 ++++---
be/src/olap/rowset/segment_v2/segment_writer.h | 2 +-
be/src/olap/tablet_meta.cpp | 31 ++++++++++++++++++++--
be/src/olap/tablet_meta.h | 3 ++-
be/src/olap/tablet_schema.cpp | 2 ++
be/src/olap/tablet_schema.h | 4 +++
.../java/org/apache/doris/alter/RollupJobV2.java | 3 ++-
.../org/apache/doris/alter/SchemaChangeJobV2.java | 3 ++-
.../java/org/apache/doris/backup/RestoreJob.java | 3 ++-
.../java/org/apache/doris/catalog/Catalog.java | 28 ++++++++++++++++---
.../java/org/apache/doris/catalog/OlapTable.java | 16 +++++++++++
.../org/apache/doris/catalog/TableProperty.java | 16 ++++++++++-
.../apache/doris/common/util/PropertyAnalyzer.java | 31 ++++++++++++++++++++++
.../org/apache/doris/master/ReportHandler.java | 3 ++-
.../org/apache/doris/task/CreateReplicaTask.java | 10 +++++--
.../org/apache/doris/catalog/CreateTableTest.java | 10 +++++++
.../java/org/apache/doris/task/AgentTaskTest.java | 3 ++-
gensrc/proto/olap_file.proto | 2 ++
gensrc/thrift/AgentService.thrift | 13 +++++++++
20 files changed, 176 insertions(+), 21 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp
index 9a54b210c8..6c76ddff62 100644
--- a/be/src/olap/rowset/segment_v2/column_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/column_writer.cpp
@@ -121,7 +121,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
length_options.meta->set_length(
get_scalar_type_info<OLAP_FIELD_TYPE_UNSIGNED_INT>()->size());
length_options.meta->set_encoding(DEFAULT_ENCODING);
- length_options.meta->set_compression(LZ4F);
+ length_options.meta->set_compression(opts.meta->compression());
length_options.need_zone_map = false;
length_options.need_bloom_filter = false;
@@ -149,7 +149,7 @@ Status ColumnWriter::create(const ColumnWriterOptions& opts, const TabletColumn*
null_options.meta->set_length(
get_scalar_type_info<OLAP_FIELD_TYPE_TINYINT>()->size());
null_options.meta->set_encoding(DEFAULT_ENCODING);
- null_options.meta->set_compression(LZ4F);
+ null_options.meta->set_compression(opts.meta->compression());
null_options.need_zone_map = false;
null_options.need_bloom_filter = false;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 4db2549683..f95b62c9c2 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -63,17 +63,19 @@ SegmentWriter::~SegmentWriter() {
};
void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t* column_id,
- const TabletColumn& column) {
+ const TabletColumn& column,
+ const TabletSchema* tablet_schema) {
// TODO(zc): Do we need this column_id??
meta->set_column_id((*column_id)++);
meta->set_unique_id(column.unique_id());
meta->set_type(column.type());
meta->set_length(column.length());
meta->set_encoding(DEFAULT_ENCODING);
- meta->set_compression(LZ4F);
+ meta->set_compression(tablet_schema->compression_type());
meta->set_is_nullable(column.is_nullable());
for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
- init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i));
+ init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i),
+ tablet_schema);
}
}
@@ -84,7 +86,7 @@ Status SegmentWriter::init(uint32_t write_mbytes_per_sec __attribute__((unused))
ColumnWriterOptions opts;
opts.meta = _footer.add_columns();
- init_column_meta(opts.meta, &column_id, column);
+ init_column_meta(opts.meta, &column_id, column, _tablet_schema);
// now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS
// and not support zone map for array type.
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h
index ab928b51e1..67fb9e02a2 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -80,7 +80,7 @@ public:
Status finalize(uint64_t* segment_file_size, uint64_t* index_size);
static void init_column_meta(ColumnMetaPB* meta, uint32_t* column_id,
- const TabletColumn& column);
+ const TabletColumn& column, const TabletSchema* tablet_schema);
private:
DISALLOW_COPY_AND_ASSIGN(SegmentWriter);
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 0cd9202a72..e299d1a048 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -43,7 +43,7 @@ Status TabletMeta::create(const TCreateTabletReq& request, const TabletUid& tabl
request.tablet_schema.schema_hash, shard_id, request.tablet_schema, next_unique_id,
col_ordinal_to_unique_id, tablet_uid,
request.__isset.tablet_type ? request.tablet_type : TTabletType::TABLET_TYPE_DISK,
- request.storage_medium, request.storage_param.storage_name));
+ request.storage_medium, request.storage_param.storage_name, request.compression_type));
return Status::OK();
}
@@ -54,7 +54,8 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
uint32_t next_unique_id,
const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
TabletUid tablet_uid, TTabletType::type tabletType,
- TStorageMedium::type t_storage_medium, const std::string& storage_name)
+ TStorageMedium::type t_storage_medium, const std::string& storage_name,
+ TCompressionType::type compression_type)
: _tablet_uid(0, 0), _schema(new TabletSchema) {
TabletMetaPB tablet_meta_pb;
tablet_meta_pb.set_table_id(table_id);
@@ -90,8 +91,34 @@ TabletMeta::TabletMeta(int64_t table_id, int64_t partition_id, int64_t tablet_id
LOG(WARNING) << "unknown tablet keys type";
break;
}
+ // compress_kind used to compress segment files
schema->set_compress_kind(COMPRESS_LZ4);
+ // compression_type used to compress segment page
+ switch (compression_type) {
+ case TCompressionType::NO_COMPRESSION:
+ schema->set_compression_type(NO_COMPRESSION);
+ break;
+ case TCompressionType::SNAPPY:
+ schema->set_compression_type(SNAPPY);
+ break;
+ case TCompressionType::LZ4:
+ schema->set_compression_type(LZ4);
+ break;
+ case TCompressionType::LZ4F:
+ schema->set_compression_type(LZ4F);
+ break;
+ case TCompressionType::ZLIB:
+ schema->set_compression_type(ZLIB);
+ break;
+ case TCompressionType::ZSTD:
+ schema->set_compression_type(ZSTD);
+ break;
+ default:
+ schema->set_compression_type(LZ4F);
+ break;
+ }
+
switch (tablet_schema.sort_type) {
case TSortType::type::ZORDER:
schema->set_sort_type(SortType::ZORDER);
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index c0b165a7bb..c8709941e5 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -83,7 +83,8 @@ public:
uint64_t shard_id, const TTabletSchema& tablet_schema, uint32_t next_unique_id,
const std::unordered_map<uint32_t, uint32_t>& col_ordinal_to_unique_id,
TabletUid tablet_uid, TTabletType::type tabletType,
- TStorageMedium::type t_storage_medium, const std::string& remote_storage_name);
+ TStorageMedium::type t_storage_medium, const std::string& remote_storage_name,
+ TCompressionType::type compression_type);
// If need add a filed in TableMeta, filed init copy in copy construct function
TabletMeta(const TabletMeta& tablet_meta);
TabletMeta(TabletMeta&& tablet_meta) = delete;
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 702fef36a5..6e9703f569 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -441,6 +441,7 @@ void TabletSchema::init_from_pb(const TabletSchemaPB& schema) {
_sequence_col_idx = schema.sequence_col_idx();
_sort_type = schema.sort_type();
_sort_col_num = schema.sort_col_num();
+ _compression_type = schema.compression_type();
}
void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) {
@@ -461,6 +462,7 @@ void TabletSchema::to_schema_pb(TabletSchemaPB* tablet_meta_pb) {
tablet_meta_pb->set_sequence_col_idx(_sequence_col_idx);
tablet_meta_pb->set_sort_type(_sort_type);
tablet_meta_pb->set_sort_col_num(_sort_col_num);
+ tablet_meta_pb->set_compression_type(_compression_type);
}
uint32_t TabletSchema::mem_size() const {
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 8554bebb3b..7c3209ee82 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -20,6 +20,7 @@
#include <vector>
#include "gen_cpp/olap_file.pb.h"
+#include "gen_cpp/segment_v2.pb.h"
#include "olap/olap_define.h"
#include "olap/types.h"
@@ -145,6 +146,8 @@ public:
void set_delete_sign_idx(int32_t delete_sign_idx) { _delete_sign_idx = delete_sign_idx; }
bool has_sequence_col() const { return _sequence_col_idx != -1; }
int32_t sequence_col_idx() const { return _sequence_col_idx; }
+ segment_v2::CompressionTypePB compression_type() const { return _compression_type; }
+
vectorized::Block create_block(
const std::vector<uint32_t>& return_columns,
const std::unordered_set<uint32_t>* tablet_columns_need_convert_null = nullptr) const;
@@ -168,6 +171,7 @@ private:
size_t _num_short_key_columns = 0;
size_t _num_rows_per_row_block = 0;
CompressKind _compress_kind = COMPRESS_NONE;
+ segment_v2::CompressionTypePB _compression_type = segment_v2::CompressionTypePB::LZ4F;
size_t _next_column_unique_id = 0;
bool _has_bf_fpp = false;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 53027c2357..7f3a115ed4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -231,7 +231,8 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
rollupSchema, tbl.getCopiedBfColumns(), tbl.getBfFpp(), countDownLatch,
tbl.getCopiedIndexes(),
tbl.isInMemory(),
- tabletType);
+ tabletType,
+ tbl.getCompressionType());
createReplicaTask.setBaseTablet(tabletIdMap.get(rollupTabletId), baseSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 2d55523d75..a7faa58743 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -253,7 +253,8 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
originKeysType, TStorageType.COLUMN, storageMedium,
shadowSchema, bfColumns, bfFpp, countDownLatch, indexes,
tbl.isInMemory(),
- tbl.getPartitionInfo().getTabletType(partitionId));
+ tbl.getPartitionInfo().getTabletType(partitionId),
+ tbl.getCompressionType());
createReplicaTask.setBaseTablet(partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId), originSchemaHash);
if (this.storageFormat != null) {
createReplicaTask.setStorageFormat(this.storageFormat);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index 9445f74413..922b314a21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -964,7 +964,8 @@ public class RestoreJob extends AbstractJob {
indexMeta.getSchema(), bfColumns, bfFpp, null,
localTbl.getCopiedIndexes(),
localTbl.isInMemory(),
- localTbl.getPartitionInfo().getTabletType(restorePart.getId()));
+ localTbl.getPartitionInfo().getTabletType(restorePart.getId()),
+ localTbl.getCompressionType());
task.setInRestoreMode(true);
batchTask.addTask(task);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 50c3c40e78..45de5422c7 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -245,6 +245,7 @@ import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.thrift.BackendService;
+import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
@@ -3026,6 +3027,7 @@ public class Catalog {
* 6.2. replicationNum
* 6.3. inMemory
* 6.4. storageFormat
+ * 6.5. compressionType
* 7. set index meta
* 8. check colocation properties
* 9. create tablet in BE
@@ -3314,6 +3316,7 @@ public class Catalog {
singlePartitionDesc.isInMemory(),
olapTable.getStorageFormat(),
singlePartitionDesc.getTabletType(),
+ olapTable.getCompressionType(),
olapTable.getDataSortInfo()
);
@@ -3545,6 +3548,7 @@ public class Catalog {
boolean isInMemory,
TStorageFormat storageFormat,
TTabletType tabletType,
+ TCompressionType compressionType,
DataSortInfo dataSortInfo) throws DdlException {
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
@@ -3612,7 +3616,8 @@ public class Catalog {
indexes,
isInMemory,
tabletType,
- dataSortInfo);
+ dataSortInfo,
+ compressionType);
task.setStorageFormat(storageFormat);
batchTask.addTask(task);
// add to AgentTaskQueue for handling finish report.
@@ -3730,6 +3735,15 @@ public class Catalog {
}
olapTable.setStorageFormat(storageFormat);
+ // get compression type
+ TCompressionType compressionType = TCompressionType.LZ4;
+ try {
+ compressionType = PropertyAnalyzer.analyzeCompressionType(properties);
+ } catch (AnalysisException e) {
+ throw new DdlException(e.getMessage());
+ }
+ olapTable.setCompressionType(compressionType);
+
// check data sort properties
DataSortInfo dataSortInfo = PropertyAnalyzer.analyzeDataSortInfo(properties, keysType,
keysDesc.keysColumnSize(), storageFormat);
@@ -3778,6 +3792,7 @@ public class Catalog {
throw new DdlException(e.getMessage());
}
+
if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
// if this is an unpartitioned table, we should analyze data property and replication num here.
// if this is a partitioned table, there properties are already analyzed in RangePartitionDesc analyze phase.
@@ -3914,7 +3929,7 @@ public class Catalog {
partitionInfo.getReplicaAllocation(partitionId),
versionInfo, bfColumns, bfFpp,
tabletIdSet, olapTable.getCopiedIndexes(),
- isInMemory, storageFormat, tabletType, olapTable.getDataSortInfo());
+ isInMemory, storageFormat, tabletType, compressionType, olapTable.getDataSortInfo());
olapTable.addPartition(partition);
} else if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
try {
@@ -3965,7 +3980,8 @@ public class Catalog {
versionInfo, bfColumns, bfFpp,
tabletIdSet, olapTable.getCopiedIndexes(),
isInMemory, storageFormat,
- partitionInfo.getTabletType(entry.getValue()), olapTable.getDataSortInfo());
+ partitionInfo.getTabletType(entry.getValue()),
+ compressionType, olapTable.getDataSortInfo());
olapTable.addPartition(partition);
}
} else {
@@ -4339,6 +4355,11 @@ public class Catalog {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_REMOTE_STORAGE_RESOURCE).append("\" = \"");
sb.append(remoteStorageResource).append("\"");
}
+ // compression type
+ if (olapTable.getCompressionType() != TCompressionType.LZ4F) {
+ sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPRESSION).append("\" = \"");
+ sb.append(olapTable.getCompressionType()).append("\"");
+ }
sb.append("\n)");
} else if (table.getType() == TableType.MYSQL) {
@@ -6882,6 +6903,7 @@ public class Catalog {
copiedTbl.isInMemory(),
copiedTbl.getStorageFormat(),
copiedTbl.getPartitionInfo().getTabletType(oldPartitionId),
+ copiedTbl.getCompressionType(),
copiedTbl.getDataSortInfo());
newPartitions.add(newPartition);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index c7401e08ae..25eb7f65db 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -48,6 +48,7 @@ import org.apache.doris.qe.OriginStatement;
import org.apache.doris.resource.Tag;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TOlapTable;
import org.apache.doris.thrift.TSortType;
import org.apache.doris.thrift.TStorageFormat;
@@ -1679,6 +1680,14 @@ public class OlapTable extends Table {
return !tempPartitions.isEmpty();
}
+ public void setCompressionType(TCompressionType compressionType) {
+ if (tableProperty == null) {
+ tableProperty = new TableProperty(new HashMap<>());
+ }
+ tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_COMPRESSION, compressionType.name());
+ tableProperty.buildCompressionType();
+ }
+
public void setStorageFormat(TStorageFormat storageFormat) {
if (tableProperty == null) {
tableProperty = new TableProperty(new HashMap<>());
@@ -1694,6 +1703,13 @@ public class OlapTable extends Table {
return tableProperty.getStorageFormat();
}
+ public TCompressionType getCompressionType() {
+ if (tableProperty == null) {
+ return TCompressionType.LZ4F;
+ }
+ return tableProperty.getCompressionType();
+ }
+
public DataSortInfo getDataSortInfo() {
if (tableProperty == null) {
return new DataSortInfo(TSortType.LEXICAL, this.getKeysNum());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index f9d2063284..b9c3835c96 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.gson.GsonUtils;
+import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TStorageFormat;
import com.google.common.base.Strings;
@@ -67,6 +68,8 @@ public class TableProperty implements Writable {
*/
private TStorageFormat storageFormat = TStorageFormat.DEFAULT;
+ private TCompressionType compressionType = TCompressionType.LZ4F;
+
private DataSortInfo dataSortInfo = new DataSortInfo();
// remote storage resource, for cold data
@@ -147,6 +150,12 @@ public class TableProperty implements Writable {
return this;
}
+ public TableProperty buildCompressionType() {
+ compressionType = TCompressionType.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_COMPRESSION,
+ TCompressionType.LZ4F.name()));
+ return this;
+ }
+
public TableProperty buildStorageFormat() {
storageFormat = TStorageFormat.valueOf(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT,
TStorageFormat.DEFAULT.name()));
@@ -227,6 +236,10 @@ public class TableProperty implements Writable {
return remoteStorageResource;
}
+ public TCompressionType getCompressionType() {
+ return compressionType;
+ }
+
public void buildReplicaAllocation() {
try {
// Must copy the properties because "analyzeReplicaAllocation" with remove the property
@@ -251,7 +264,8 @@ public class TableProperty implements Writable {
.buildInMemory()
.buildStorageFormat()
.buildDataSortInfo()
- .buildRemoteStorageResource();
+ .buildRemoteStorageResource()
+ .buildCompressionType();
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_105) {
// get replica num from property map and create replica allocation
String repNum = tableProperty.properties.remove(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index 20aac35971..5202122410 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.resource.Tag;
+import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TSortType;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
@@ -75,6 +76,7 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_COLOCATE_WITH = "colocate_with";
public static final String PROPERTIES_TIMEOUT = "timeout";
+ public static final String PROPERTIES_COMPRESSION = "compression";
public static final String PROPERTIES_DISTRIBUTION_TYPE = "distribution_type";
public static final String PROPERTIES_SEND_CLEAR_ALTER_TASK = "send_clear_alter_tasks";
@@ -433,6 +435,35 @@ public class PropertyAnalyzer {
return timeout;
}
+ // analyzeCompressionType will parse the compression type from properties
+ public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException {
+ String compressionType = "";
+ if (properties != null && properties.containsKey(PROPERTIES_COMPRESSION)) {
+ compressionType = properties.get(PROPERTIES_COMPRESSION);
+ properties.remove(PROPERTIES_COMPRESSION);
+ } else {
+ return TCompressionType.LZ4F;
+ }
+
+ if (compressionType.equalsIgnoreCase("no_compression")) {
+ return TCompressionType.NO_COMPRESSION;
+ } else if (compressionType.equalsIgnoreCase("lz4")) {
+ return TCompressionType.LZ4;
+ } else if (compressionType.equalsIgnoreCase("lz4f")) {
+ return TCompressionType.LZ4F;
+ } else if (compressionType.equalsIgnoreCase("zlib")) {
+ return TCompressionType.ZLIB;
+ } else if (compressionType.equalsIgnoreCase("zstd")) {
+ return TCompressionType.ZSTD;
+ } else if (compressionType.equalsIgnoreCase("snappy")) {
+ return TCompressionType.SNAPPY;
+ } else if (compressionType.equalsIgnoreCase("default_compression")) {
+ return TCompressionType.LZ4F;
+ } else {
+ throw new AnalysisException("unknown compression type: " + compressionType);
+ }
+ }
+
// analyzeStorageFormat will parse the storage format from properties
// sql: alter table tablet_name set ("storage_format" = "v2")
// Use this sql to convert all tablets(base and rollup index) to a new format segment
diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
index d130fbaf0a..cdca5251fa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/master/ReportHandler.java
@@ -601,7 +601,8 @@ public class ReportHandler extends Daemon {
TStorageMedium.HDD, indexMeta.getSchema(), bfColumns, bfFpp, null,
olapTable.getCopiedIndexes(),
olapTable.isInMemory(),
- olapTable.getPartitionInfo().getTabletType(partitionId));
+ olapTable.getPartitionInfo().getTabletType(partitionId),
+ olapTable.getCompressionType());
createReplicaTask.setIsRecoverTask(true);
createReplicaBatchTask.addTask(createReplicaTask);
} else {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
index 29068f7542..8718dd7663 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CreateReplicaTask.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.KeysType;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.common.Status;
import org.apache.doris.thrift.TColumn;
+import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TCreateTabletReq;
import org.apache.doris.thrift.TOlapTableIndex;
import org.apache.doris.thrift.TStatusCode;
@@ -54,6 +55,7 @@ public class CreateReplicaTask extends AgentTask {
private KeysType keysType;
private TStorageType storageType;
private TStorageMedium storageMedium;
+ private TCompressionType compressionType;
private List<Column> columns;
@@ -93,7 +95,7 @@ public class CreateReplicaTask extends AgentTask {
Set<String> bfColumns, double bfFpp, MarkedCountDownLatch<Long, Long> latch,
List<Index> indexes,
boolean isInMemory,
- TTabletType tabletType) {
+ TTabletType tabletType, TCompressionType compressionType) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
this.shortKeyColumnCount = shortKeyColumnCount;
@@ -104,6 +106,7 @@ public class CreateReplicaTask extends AgentTask {
this.keysType = keysType;
this.storageType = storageType;
this.storageMedium = storageMedium;
+ this.compressionType = compressionType;
this.columns = columns;
@@ -125,7 +128,8 @@ public class CreateReplicaTask extends AgentTask {
List<Index> indexes,
boolean isInMemory,
TTabletType tabletType,
- DataSortInfo dataSortInfo) {
+ DataSortInfo dataSortInfo,
+ TCompressionType compressionType) {
super(null, backendId, TTaskType.CREATE, dbId, tableId, partitionId, indexId, tabletId);
this.shortKeyColumnCount = shortKeyColumnCount;
@@ -136,6 +140,7 @@ public class CreateReplicaTask extends AgentTask {
this.keysType = keysType;
this.storageType = storageType;
this.storageMedium = storageMedium;
+ this.compressionType = compressionType;
this.columns = columns;
@@ -267,6 +272,7 @@ public class CreateReplicaTask extends AgentTask {
}
createTabletReq.setTabletType(tabletType);
+ createTabletReq.setCompressionType(compressionType);
return createTabletReq;
}
}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
index 93dbf9f7ea..513b3cef1c 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/CreateTableTest.java
@@ -138,6 +138,16 @@ public class CreateTableTest {
.expectThrowsNoException(() -> createTable("create table test.tb7(key1 int, key2 varchar(10)) \n"
+ "distributed by hash(key1) buckets 1 properties('replication_num' = '1', 'storage_medium' = 'ssd');"));
+ ExceptionChecker
+ .expectThrowsNoException(() -> createTable("create table test.compression1(key1 int, key2 varchar(10)) \n"
+ + "distributed by hash(key1) buckets 1 \n"
+ + "properties('replication_num' = '1', 'compression' = 'lz4f');"));
+
+ ExceptionChecker
+ .expectThrowsNoException(() -> createTable("create table test.compression2(key1 int, key2 varchar(10)) \n"
+ + "distributed by hash(key1) buckets 1 \n"
+ + "properties('replication_num' = '1', 'compression' = 'snappy');"));
+
ExceptionChecker
.expectThrowsNoException(() -> createTable("create table test.tbl8\n" + "(k1 varchar(40), k2 int, v1 int)\n"
+ "unique key(k1, k2)\n"
diff --git a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
index db47288484..31b84fa4af 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/task/AgentTaskTest.java
@@ -28,6 +28,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.MarkedCountDownLatch;
import org.apache.doris.thrift.TAgentTaskRequest;
import org.apache.doris.thrift.TBackend;
+import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
import org.apache.doris.thrift.TStorageMedium;
@@ -110,7 +111,7 @@ public class AgentTaskTest {
version, KeysType.AGG_KEYS,
storageType, TStorageMedium.SSD,
columns, null, 0, latch, null,
- false, TTabletType.TABLET_TYPE_DISK);
+ false, TTabletType.TABLET_TYPE_DISK, TCompressionType.LZ4F);
// drop
dropTask = new DropReplicaTask(backendId1, tabletId1, schemaHash1);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index b67d9c11cd..ac7c08580b 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -23,6 +23,7 @@ option java_package = "org.apache.doris.proto";
import "olap_common.proto";
import "types.proto";
+import "segment_v2.proto";
message ZoneMap {
required bytes min = 1;
@@ -195,6 +196,7 @@ message TabletSchemaPB {
optional int32 sequence_col_idx = 10 [default= -1];
optional SortType sort_type = 11;
optional int32 sort_col_num = 12;
+ optional segment_v2.CompressionTypePB compression_type = 13;
}
enum TabletStatePB {
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index 1eb01e8c5f..2233fae3fe 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -84,6 +84,18 @@ struct TStorageParam {
3: optional TS3StorageParam s3_storage_param
}
+enum TCompressionType {
+ UNKNOWN_COMPRESSION = 0,
+ DEFAULT_COMPRESSION = 1,
+ NO_COMPRESSION = 2,
+ SNAPPY = 3,
+ LZ4 = 4,
+ LZ4F = 5,
+ ZLIB = 6,
+ ZSTD = 7
+}
+
+
struct TCreateTabletReq {
1: required Types.TTabletId tablet_id
2: required TTabletSchema tablet_schema
@@ -105,6 +117,7 @@ struct TCreateTabletReq {
13: optional TStorageFormat storage_format
14: optional TTabletType tablet_type
15: optional TStorageParam storage_param
+ 16: optional TCompressionType compression_type = TCompressionType.LZ4F
}
struct TDropTabletReq {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org