You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/23 14:18:03 UTC
[doris] 05/05: (improvement)[bucket] Add auto bucket implement (#15250)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
commit d07811258500093691f020d0c92966cdc26e4e27
Author: Drogon <ja...@gmail.com>
AuthorDate: Wed Jan 18 19:50:18 2023 +0800
(improvement)[bucket] Add auto bucket implement (#15250)
---
fe/fe-core/src/main/cup/sql_parser.cup | 20 +-
.../org/apache/doris/analysis/CreateTableStmt.java | 34 ++-
.../apache/doris/analysis/DistributionDesc.java | 50 ++--
.../doris/analysis/HashDistributionDesc.java | 49 +---
.../doris/analysis/RandomDistributionDesc.java | 33 +--
.../org/apache/doris/catalog/DistributionInfo.java | 20 ++
.../main/java/org/apache/doris/catalog/Env.java | 6 +
.../apache/doris/catalog/HashDistributionInfo.java | 26 +-
.../java/org/apache/doris/catalog/OlapTable.java | 47 +++-
.../doris/catalog/RandomDistributionInfo.java | 20 +-
.../org/apache/doris/catalog/TableProperty.java | 8 +
.../doris/clone/DynamicPartitionScheduler.java | 86 +++++-
.../java/org/apache/doris/common/FeConstants.java | 4 +
.../apache/doris/common/util/AutoBucketUtils.java | 98 +++++++
.../apache/doris/common/util/PropertyAnalyzer.java | 25 +-
.../apache/doris/datasource/InternalCatalog.java | 36 ++-
fe/fe-core/src/main/jflex/sql_scanner.flex | 1 +
.../doris/common/util/AutoBucketUtilsTest.java | 294 +++++++++++++++++++++
.../org/apache/doris/utframe/UtFrameUtils.java | 83 +++++-
.../suites/autobucket/test_autobucket.groovy | 41 +++
20 files changed, 835 insertions(+), 146 deletions(-)
diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup
index 83d105c734..12b4643552 100644
--- a/fe/fe-core/src/main/cup/sql_parser.cup
+++ b/fe/fe-core/src/main/cup/sql_parser.cup
@@ -45,6 +45,7 @@ import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.View;
import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Version;
import org.apache.doris.mysql.MysqlPassword;
import org.apache.doris.load.loadv2.LoadTask;
@@ -602,7 +603,8 @@ terminal String
KW_WORK,
KW_WRITE,
KW_YEAR,
- KW_MTMV;
+ KW_MTMV,
+ KW_AUTO;
terminal COMMA, COLON, DOT, DOTDOTDOT, AT, STAR, LPAREN, RPAREN, SEMICOLON, LBRACKET, RBRACKET, DIVIDE, MOD, ADD, SUBTRACT;
terminal BITAND, BITOR, BITXOR, BITNOT;
@@ -2884,12 +2886,16 @@ opt_distribution ::=
/* Hash distributed */
| KW_DISTRIBUTED KW_BY KW_HASH LPAREN ident_list:columns RPAREN opt_distribution_number:numDistribution
{:
- RESULT = new HashDistributionDesc(numDistribution, columns);
+ int bucketNum = (numDistribution == null ? -1 : numDistribution);
+ boolean is_auto_bucket = (numDistribution == null);
+ RESULT = new HashDistributionDesc(bucketNum, is_auto_bucket, columns);
:}
/* Random distributed */
| KW_DISTRIBUTED KW_BY KW_RANDOM opt_distribution_number:numDistribution
{:
- RESULT = new RandomDistributionDesc(numDistribution);
+ int bucketNum = (numDistribution == null ? -1 : numDistribution);
+ boolean is_auto_bucket = (numDistribution == null);
+ RESULT = new RandomDistributionDesc(bucketNum, is_auto_bucket);
:}
;
@@ -2907,13 +2913,17 @@ opt_rollup ::=
opt_distribution_number ::=
/* Empty */
{:
- /* If distribution number is null, default distribution number is 10. */
- RESULT = 10;
+ /* If distribution number is null, default distribution number is FeConstants.default_bucket_num. */
+ RESULT = FeConstants.default_bucket_num;
:}
| KW_BUCKETS INTEGER_LITERAL:numDistribution
{:
RESULT = numDistribution.intValue();
:}
+ | KW_BUCKETS KW_AUTO
+ {:
+ RESULT = null;
+ :}
;
opt_keys ::=
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
index 826aee4663..867cb6aa38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableStmt.java
@@ -31,6 +31,8 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.AutoBucketUtils;
+import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
@@ -94,6 +96,32 @@ public class CreateTableStmt extends DdlStmt {
engineNames.add("jdbc");
}
+ // if auto bucket auto bucket enable, rewrite distribution bucket num &&
+ // set properties[PropertyAnalyzer.PROPERTIES_AUTO_BUCKET] = "true"
+ private static Map<String, String> maybeRewriteByAutoBucket(DistributionDesc distributionDesc,
+ Map<String, String> properties) throws AnalysisException {
+ if (distributionDesc == null || !distributionDesc.isAutoBucket()) {
+ return properties;
+ }
+
+ // auto bucket is enable
+ Map<String, String> newProperties = properties;
+ if (newProperties == null) {
+ newProperties = new HashMap<String, String>();
+ }
+ newProperties.put(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, "true");
+
+ if (!newProperties.containsKey(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE)) {
+ distributionDesc.setBuckets(FeConstants.default_bucket_num);
+ } else {
+ long partitionSize = ParseUtil
+ .analyzeDataVolumn(newProperties.get(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE));
+ distributionDesc.setBuckets(AutoBucketUtils.getBucketsNum(partitionSize));
+ }
+
+ return newProperties;
+ }
+
public CreateTableStmt() {
// for persist
tableName = new TableName();
@@ -260,7 +288,11 @@ public class CreateTableStmt extends DdlStmt {
}
@Override
- public void analyze(Analyzer analyzer) throws UserException {
+ public void analyze(Analyzer analyzer) throws UserException, AnalysisException {
+ if (Strings.isNullOrEmpty(engineName) || engineName.equalsIgnoreCase("olap")) {
+ this.properties = maybeRewriteByAutoBucket(distributionDesc, properties);
+ }
+
super.analyze(analyzer);
tableName.analyze(analyzer);
FeNameFormat.checkTableName(tableName.getTbl());
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java
index 3e755c750c..02005a3985 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DistributionDesc.java
@@ -22,57 +22,47 @@ import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
-import org.apache.doris.common.io.Text;
-import org.apache.doris.common.io.Writable;
import org.apache.commons.lang.NotImplementedException;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.util.List;
import java.util.Set;
-public class DistributionDesc implements Writable {
+public class DistributionDesc {
protected DistributionInfoType type;
+ protected int numBucket;
+ protected boolean autoBucket;
- public DistributionDesc() {
+ public DistributionDesc(int numBucket) {
+ this(numBucket, false);
+ }
+ public DistributionDesc(int numBucket, boolean autoBucket) {
+ this.numBucket = numBucket;
+ this.autoBucket = autoBucket;
}
- public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException {
- throw new NotImplementedException();
+ public int getBuckets() {
+ return numBucket;
}
- public String toSql() {
- throw new NotImplementedException();
+ public int setBuckets(int numBucket) {
+ return this.numBucket = numBucket;
}
- public DistributionInfo toDistributionInfo(List<Column> columns) throws DdlException {
- throw new NotImplementedException();
+ public boolean isAutoBucket() {
+ return autoBucket;
}
- public static DistributionDesc read(DataInput in) throws IOException {
- DistributionInfoType type = DistributionInfoType.valueOf(Text.readString(in));
- if (type == DistributionInfoType.HASH) {
- DistributionDesc desc = new HashDistributionDesc();
- desc.readFields(in);
- return desc;
- } else if (type == DistributionInfoType.RANDOM) {
- DistributionDesc desc = new RandomDistributionDesc();
- desc.readFields(in);
- return desc;
- } else {
- throw new IOException("Unknown distribution type: " + type);
- }
+ public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException {
+ throw new NotImplementedException();
}
- @Override
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, type.name());
+ public String toSql() {
+ throw new NotImplementedException();
}
- public void readFields(DataInput in) throws IOException {
+ public DistributionInfo toDistributionInfo(List<Column> columns) throws DdlException {
throw new NotImplementedException();
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
index 6d14fb3844..8b312a6ba7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/HashDistributionDesc.java
@@ -25,29 +25,25 @@ import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
-import org.apache.doris.common.io.Text;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.util.List;
import java.util.Set;
public class HashDistributionDesc extends DistributionDesc {
- private int numBucket;
private List<String> distributionColumnNames;
- public HashDistributionDesc() {
+ public HashDistributionDesc(int numBucket, List<String> distributionColumnNames) {
+ super(numBucket);
type = DistributionInfoType.HASH;
- distributionColumnNames = Lists.newArrayList();
+ this.distributionColumnNames = distributionColumnNames;
}
- public HashDistributionDesc(int numBucket, List<String> distributionColumnNames) {
+ public HashDistributionDesc(int numBucket, boolean autoBucket, List<String> distributionColumnNames) {
+ super(numBucket, autoBucket);
type = DistributionInfoType.HASH;
- this.numBucket = numBucket;
this.distributionColumnNames = distributionColumnNames;
}
@@ -55,14 +51,10 @@ public class HashDistributionDesc extends DistributionDesc {
return distributionColumnNames;
}
- public int getBuckets() {
- return numBucket;
- }
-
@Override
public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException {
if (numBucket <= 0) {
- throw new AnalysisException("Number of hash distribution should be larger than zero.");
+ throw new AnalysisException("Number of hash distribution should be greater than zero.");
}
if (distributionColumnNames == null || distributionColumnNames.size() == 0) {
throw new AnalysisException("Number of hash column should be larger than zero.");
@@ -100,7 +92,11 @@ public class HashDistributionDesc extends DistributionDesc {
i++;
}
stringBuilder.append(")\n");
- stringBuilder.append("BUCKETS ").append(numBucket);
+ if (autoBucket) {
+ stringBuilder.append("BUCKETS AUTO");
+ } else {
+ stringBuilder.append("BUCKETS ").append(numBucket);
+ }
return stringBuilder.toString();
}
@@ -139,27 +135,8 @@ public class HashDistributionDesc extends DistributionDesc {
}
}
- HashDistributionInfo hashDistributionInfo = new HashDistributionInfo(numBucket, distributionColumns);
+ HashDistributionInfo hashDistributionInfo =
+ new HashDistributionInfo(numBucket, autoBucket, distributionColumns);
return hashDistributionInfo;
}
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
-
- out.writeInt(numBucket);
- int count = distributionColumnNames.size();
- out.writeInt(count);
- for (String colName : distributionColumnNames) {
- Text.writeString(out, colName);
- }
- }
-
- public void readFields(DataInput in) throws IOException {
- numBucket = in.readInt();
- int count = in.readInt();
- for (int i = 0; i < count; i++) {
- distributionColumnNames.add(Text.readString(in));
- }
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java
index e445aa5bdf..3b89dfaff7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/RandomDistributionDesc.java
@@ -23,28 +23,24 @@ import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.common.AnalysisException;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
import java.util.List;
import java.util.Set;
public class RandomDistributionDesc extends DistributionDesc {
- int numBucket;
-
- public RandomDistributionDesc() {
+ public RandomDistributionDesc(int numBucket) {
+ super(numBucket);
type = DistributionInfoType.RANDOM;
}
- public RandomDistributionDesc(int numBucket) {
+ public RandomDistributionDesc(int numBucket, boolean autoBucket) {
+ super(numBucket, autoBucket);
type = DistributionInfoType.RANDOM;
- this.numBucket = numBucket;
}
@Override
public void analyze(Set<String> colSet, List<ColumnDef> columnDefs, KeysDesc keysDesc) throws AnalysisException {
if (numBucket <= 0) {
- throw new AnalysisException("Number of random distribution should be larger than zero.");
+ throw new AnalysisException("Number of random distribution should be greater than zero.");
}
}
@@ -52,23 +48,18 @@ public class RandomDistributionDesc extends DistributionDesc {
public String toSql() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append("DISTRIBUTED BY RANDOM\n")
- .append("BUCKETS ").append(numBucket);
+ .append("BUCKETS ");
+ if (autoBucket) {
+ stringBuilder.append("AUTO");
+ } else {
+ stringBuilder.append(numBucket);
+ }
return stringBuilder.toString();
}
@Override
public DistributionInfo toDistributionInfo(List<Column> columns) {
- RandomDistributionInfo randomDistributionInfo = new RandomDistributionInfo(numBucket);
+ RandomDistributionInfo randomDistributionInfo = new RandomDistributionInfo(numBucket, autoBucket);
return randomDistributionInfo;
}
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- out.writeInt(numBucket);
- }
-
- public void readFields(DataInput in) throws IOException {
- numBucket = in.readInt();
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
index e7f66c42f1..227900905e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/DistributionInfo.java
@@ -40,12 +40,28 @@ public abstract class DistributionInfo implements Writable {
@SerializedName(value = "type")
protected DistributionInfoType type;
+ @SerializedName(value = "bucketNum")
+ protected int bucketNum;
+
+ @SerializedName(value = "autoBucket")
+ protected boolean autoBucket;
+
public DistributionInfo() {
// for persist
}
public DistributionInfo(DistributionInfoType type) {
+ this(type, 0, false);
+ }
+
+ public DistributionInfo(DistributionInfoType type, int bucketNum) {
+ this(type, bucketNum, false);
+ }
+
+ public DistributionInfo(DistributionInfoType type, int bucketNum, boolean autoBucket) {
this.type = type;
+ this.bucketNum = bucketNum;
+ this.autoBucket = autoBucket;
}
public DistributionInfoType getType() {
@@ -62,6 +78,10 @@ public abstract class DistributionInfo implements Writable {
throw new NotImplementedException("not implemented");
}
+ public void markAutoBucket() {
+ autoBucket = true;
+ }
+
public DistributionDesc toDistributionDesc() {
throw new NotImplementedException();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 2cfd39f70a..42368f40da 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -2981,6 +2981,12 @@ public class Env {
sb.append(olapTable.getCompressionType()).append("\"");
}
+ // estimate_partition_size
+ if (!olapTable.getEstimatePartitionSize().equals("")) {
+ sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE).append("\" = \"");
+ sb.append(olapTable.getEstimatePartitionSize()).append("\"");
+ }
+
// unique key table with merge on write
if (olapTable.getEnableUniqueKeyMergeOnWrite()) {
sb.append(",\n\"").append(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE).append("\" = \"");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
index a651b8c0ab..d746bd355f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HashDistributionInfo.java
@@ -37,8 +37,6 @@ import java.util.Objects;
public class HashDistributionInfo extends DistributionInfo {
@SerializedName(value = "distributionColumns")
private List<Column> distributionColumns;
- @SerializedName(value = "bucketNum")
- private int bucketNum;
public HashDistributionInfo() {
super();
@@ -46,9 +44,13 @@ public class HashDistributionInfo extends DistributionInfo {
}
public HashDistributionInfo(int bucketNum, List<Column> distributionColumns) {
- super(DistributionInfoType.HASH);
+ super(DistributionInfoType.HASH, bucketNum);
+ this.distributionColumns = distributionColumns;
+ }
+
+ public HashDistributionInfo(int bucketNum, boolean autoBucket, List<Column> distributionColumns) {
+ super(DistributionInfoType.HASH, bucketNum, autoBucket);
this.distributionColumns = distributionColumns;
- this.bucketNum = bucketNum;
}
public List<Column> getDistributionColumns() {
@@ -65,6 +67,7 @@ public class HashDistributionInfo extends DistributionInfo {
this.bucketNum = bucketNum;
}
+ @Override
public void write(DataOutput out) throws IOException {
super.write(out);
int columnCount = distributionColumns.size();
@@ -75,6 +78,7 @@ public class HashDistributionInfo extends DistributionInfo {
out.writeInt(bucketNum);
}
+ @Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
int columnCount = in.readInt();
@@ -117,7 +121,7 @@ public class HashDistributionInfo extends DistributionInfo {
for (Column col : distributionColumns) {
distriColNames.add(col.getName());
}
- DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, distriColNames);
+ DistributionDesc distributionDesc = new HashDistributionDesc(bucketNum, autoBucket, distriColNames);
return distributionDesc;
}
@@ -133,7 +137,11 @@ public class HashDistributionInfo extends DistributionInfo {
String colList = Joiner.on(", ").join(colNames);
builder.append(colList);
- builder.append(") BUCKETS ").append(bucketNum);
+ if (autoBucket) {
+ builder.append(") BUCKETS AUTO");
+ } else {
+ builder.append(") BUCKETS ").append(bucketNum);
+ }
return builder.toString();
}
@@ -148,7 +156,11 @@ public class HashDistributionInfo extends DistributionInfo {
}
builder.append("]; ");
- builder.append("bucket num: ").append(bucketNum).append("; ");
+ if (autoBucket) {
+ builder.append("bucket num: auto;");
+ } else {
+ builder.append("bucket num: ").append(bucketNum).append(";");
+ }
return builder.toString();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 87ba47bb27..c83acb1af4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -591,8 +591,8 @@ public class OlapTable extends Table {
if (full) {
return indexIdToMeta.get(indexId).getSchema();
} else {
- return indexIdToMeta.get(indexId).getSchema().stream().filter(column ->
- column.isVisible()).collect(Collectors.toList());
+ return indexIdToMeta.get(indexId).getSchema().stream().filter(column -> column.isVisible())
+ .collect(Collectors.toList());
}
}
@@ -1124,7 +1124,6 @@ public class OlapTable extends Table {
return false;
}
-
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
@@ -1274,6 +1273,9 @@ public class OlapTable extends Table {
if (in.readBoolean()) {
tableProperty = TableProperty.read(in);
}
+ if (isAutoBucket()) {
+ defaultDistributionInfo.markAutoBucket();
+ }
// temp partitions
tempPartitions = TempPartitions.read(in);
@@ -1617,6 +1619,36 @@ public class OlapTable extends Table {
tableProperty.buildInMemory();
}
+ public Boolean isAutoBucket() {
+ if (tableProperty != null) {
+ return tableProperty.isAutoBucket();
+ }
+ return false;
+ }
+
+ public void setIsAutoBucket(boolean isAutoBucket) {
+ if (tableProperty == null) {
+ tableProperty = new TableProperty(new HashMap<>());
+ }
+ tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET,
+ Boolean.valueOf(isAutoBucket).toString());
+ }
+
+ public void setEstimatePartitionSize(String estimatePartitionSize) {
+ if (tableProperty == null) {
+ tableProperty = new TableProperty(new HashMap<>());
+ }
+ tableProperty.modifyTableProperties(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE,
+ estimatePartitionSize);
+ }
+
+ public String getEstimatePartitionSize() {
+ if (tableProperty != null) {
+ return tableProperty.getEstimatePartitionSize();
+ }
+ return "";
+ }
+
public boolean getEnableLightSchemaChange() {
if (tableProperty != null) {
return tableProperty.getUseSchemaLightChange();
@@ -1870,11 +1902,11 @@ public class OlapTable extends Table {
return false;
}
List<Expr> partitionExps = aggregateInfo.getPartitionExprs() != null
- ? aggregateInfo.getPartitionExprs() : groupingExps;
+ ? aggregateInfo.getPartitionExprs()
+ : groupingExps;
DistributionInfo distribution = getDefaultDistributionInfo();
if (distribution instanceof HashDistributionInfo) {
- List<Column> distributeColumns =
- ((HashDistributionInfo) distribution).getDistributionColumns();
+ List<Column> distributeColumns = ((HashDistributionInfo) distribution).getDistributionColumns();
PartitionInfo partitionInfo = getPartitionInfo();
if (partitionInfo instanceof RangePartitionInfo) {
List<Column> rangeColumns = partitionInfo.getPartitionColumns();
@@ -1882,8 +1914,7 @@ public class OlapTable extends Table {
return false;
}
}
- List<SlotRef> partitionSlots =
- partitionExps.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList());
+ List<SlotRef> partitionSlots = partitionExps.stream().map(Expr::unwrapSlotRef).collect(Collectors.toList());
if (partitionSlots.contains(null)) {
return false;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
index 2e11b5cfd4..31371e7799 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RandomDistributionInfo.java
@@ -29,21 +29,21 @@ import java.util.Objects;
* Random partition.
*/
public class RandomDistributionInfo extends DistributionInfo {
-
- private int bucketNum;
-
public RandomDistributionInfo() {
super();
}
public RandomDistributionInfo(int bucketNum) {
- super(DistributionInfoType.RANDOM);
- this.bucketNum = bucketNum;
+ super(DistributionInfoType.RANDOM, bucketNum);
+ }
+
+ public RandomDistributionInfo(int bucketNum, boolean autoBucket) {
+ super(DistributionInfoType.RANDOM, bucketNum, autoBucket);
}
@Override
public DistributionDesc toDistributionDesc() {
- DistributionDesc distributionDesc = new RandomDistributionDesc(bucketNum);
+ DistributionDesc distributionDesc = new RandomDistributionDesc(bucketNum, autoBucket);
return distributionDesc;
}
@@ -55,15 +55,21 @@ public class RandomDistributionInfo extends DistributionInfo {
@Override
public String toSql() {
StringBuilder builder = new StringBuilder();
- builder.append("DISTRIBUTED BY RANDOM BUCKETS ").append(bucketNum);
+ if (autoBucket) {
+ builder.append("DISTRIBUTED BY RANDOM() BUCKETS AUTO");
+ } else {
+ builder.append("DISTRIBUTED BY RANDOM() BUCKETS ").append(bucketNum);
+ }
return builder.toString();
}
+ @Override
public void write(DataOutput out) throws IOException {
super.write(out);
out.writeInt(bucketNum);
}
+ @Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
bucketNum = in.readInt();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
index 860d404cd0..04db48067e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableProperty.java
@@ -244,6 +244,14 @@ public class TableProperty implements Writable {
return isInMemory;
}
+ public boolean isAutoBucket() {
+ return Boolean.parseBoolean(properties.getOrDefault(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET, "false"));
+ }
+
+ public String getEstimatePartitionSize() {
+ return properties.getOrDefault(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE, "");
+ }
+
public TStorageFormat getStorageFormat() {
// Force convert all V1 table to V2 table
if (TStorageFormat.V1 == storageFormat) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
index 754126caaa..0640a82cca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/clone/DynamicPartitionScheduler.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.DynamicPartitionProperty;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.RangePartitionInfo;
@@ -42,6 +43,7 @@ import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
+import org.apache.doris.common.util.AutoBucketUtils;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.PropertyAnalyzer;
@@ -140,6 +142,75 @@ public class DynamicPartitionScheduler extends MasterDaemon {
return defaultRuntimeInfo;
}
+ // exponential moving average
+ private static long ema(ArrayList<Long> history, int period) {
+ double alpha = 2.0 / (period + 1);
+ double ema = history.get(0);
+ for (int i = 1; i < history.size(); i++) {
+ ema = alpha * history.get(i) + (1 - alpha) * ema;
+ }
+ return (long) ema;
+ }
+
+ private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize) {
+ if (historyPartitionsSize.size() < 2) {
+ return historyPartitionsSize.get(0);
+ }
+
+ int size = historyPartitionsSize.size() > 7 ? 7 : historyPartitionsSize.size();
+
+ boolean isAscending = true;
+ for (int i = 1; i < size; i++) {
+ if (historyPartitionsSize.get(i) < historyPartitionsSize.get(i - 1)) {
+ isAscending = false;
+ break;
+ }
+ }
+
+ if (isAscending) {
+ ArrayList<Long> historyDeltaSize = Lists.newArrayList();
+ for (int i = 1; i < size; i++) {
+ historyDeltaSize.add(historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1));
+ }
+ return historyPartitionsSize.get(size - 1) + ema(historyDeltaSize, 7);
+ } else {
+ return ema(historyPartitionsSize, 7);
+ }
+ }
+
+ private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table) {
+ if (!table.isAutoBucket()) {
+ return property.getBuckets();
+ }
+
+ List<Partition> partitions = Lists.newArrayList();
+ RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo());
+ List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet());
+ idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint()));
+ for (Map.Entry<Long, PartitionItem> idToItem : idToItems) {
+ Partition partition = table.getPartition(idToItem.getKey());
+ if (partition != null) {
+ partitions.add(partition);
+ }
+ }
+
+ // auto bucket
+ if (partitions.size() == 0) {
+ return property.getBuckets();
+ }
+
+ ArrayList<Long> partitionSizeArray = Lists.newArrayList();
+ for (Partition partition : partitions) {
+ if (partition.getVisibleVersion() >= 2) {
+ partitionSizeArray.add(partition.getDataSize());
+ }
+ }
+
+ // * 5 for uncompressed data
+ long uncompressedPartitionSize = getNextPartitionSize(partitionSizeArray) * 5;
+ return AutoBucketUtils.getBucketsNum(uncompressedPartitionSize);
+ }
+
private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTable olapTable,
Column partitionColumn, String partitionFormat) {
ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>();
@@ -231,21 +302,22 @@ public class DynamicPartitionScheduler extends MasterDaemon {
String partitionName = dynamicPartitionProperty.getPrefix()
+ DynamicPartitionUtil.getFormattedPartitionName(dynamicPartitionProperty.getTimeZone(),
- prevBorder, dynamicPartitionProperty.getTimeUnit());
+ prevBorder, dynamicPartitionProperty.getTimeUnit());
SinglePartitionDesc rangePartitionDesc = new SinglePartitionDesc(true, partitionName,
partitionKeyDesc, partitionProperties);
DistributionDesc distributionDesc = null;
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
+ int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable);
if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<String> distColumnNames = new ArrayList<>();
for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) {
distColumnNames.add(distributionColumn.getName());
}
- distributionDesc = new HashDistributionDesc(dynamicPartitionProperty.getBuckets(), distColumnNames);
+ distributionDesc = new HashDistributionDesc(bucketsNum, distColumnNames);
} else {
- distributionDesc = new RandomDistributionDesc(dynamicPartitionProperty.getBuckets());
+ distributionDesc = new RandomDistributionDesc(bucketsNum);
}
// add partition according to partition desc and distribution desc
addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, distributionDesc, null, false));
@@ -265,8 +337,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
}
private void setStoragePolicyProperty(HashMap<String, String> partitionProperties,
- DynamicPartitionProperty property, ZonedDateTime now, int offset,
- String storagePolicyName) {
+ DynamicPartitionProperty property, ZonedDateTime now, int offset,
+ String storagePolicyName) {
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, storagePolicyName);
String baseTime = DynamicPartitionUtil.getPartitionRangeString(
property, now, offset, DynamicPartitionUtil.DATETIME_FORMAT);
@@ -341,8 +413,8 @@ public class DynamicPartitionScheduler extends MasterDaemon {
dynamicPartitionProperty, range.lowerEndpoint().toString(), partitionFormat);
String upperBorderOfReservedHistory = DynamicPartitionUtil.getHistoryPartitionRangeString(
dynamicPartitionProperty, range.upperEndpoint().toString(), partitionFormat);
- Range<PartitionKey> reservedHistoryPartitionKeyRange
- = getClosedRange(db, olapTable, partitionColumn, partitionFormat,
+ Range<PartitionKey> reservedHistoryPartitionKeyRange = getClosedRange(db, olapTable,
+ partitionColumn, partitionFormat,
lowerBorderOfReservedHistory, upperBorderOfReservedHistory);
reservedHistoryPartitionKeyRangeList.add(reservedHistoryPartitionKeyRange);
} catch (IllegalArgumentException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index dfde23e0a0..1b05795220 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -22,6 +22,10 @@ import org.apache.doris.persist.meta.FeMetaFormat;
public class FeConstants {
// Database and table's default configurations, we will never change them
public static short default_replication_num = 3;
+
+ // The default value of bucket setting && auto bucket without estimate_partition_size
+ public static int default_bucket_num = 10;
+
/*
* Those two fields is responsible for determining the default key columns in duplicate table.
* If user does not specify key of duplicate table in create table stmt,
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java
new file mode 100644
index 0000000000..49ec8c6bf7
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/AutoBucketUtils.java
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.DiskInfo;
+import org.apache.doris.catalog.DiskInfo.DiskState;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class AutoBucketUtils {
+ private static Logger logger = LogManager.getLogger(AutoBucketUtils.class);
+
+ static final long SIZE_100MB = 100 * 1024 * 1024L;
+ static final long SIZE_1GB = 1 * 1024 * 1024 * 1024L;
+ static final long SIZE_1TB = 1024 * SIZE_1GB;
+
+ private static int getBENum() {
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
+ ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null);
+
+ int activeBENum = 0;
+ for (Backend backend : backends.values()) {
+ if (backend.isAlive()) {
+ ++activeBENum;
+ }
+ }
+ return activeBENum;
+ }
+
+ private static int getBucketsNumByBEDisks() {
+ SystemInfoService infoService = Env.getCurrentSystemInfo();
+ ImmutableMap<Long, Backend> backends = infoService.getBackendsInCluster(null);
+
+ int buckets = 0;
+ for (Backend backend : backends.values()) {
+ if (!backend.isLoadAvailable()) {
+ continue;
+ }
+
+ ImmutableMap<String, DiskInfo> disks = backend.getDisks();
+ for (DiskInfo diskInfo : disks.values()) {
+ if (diskInfo.getState() == DiskState.ONLINE && diskInfo.hasPathHash()) {
+ buckets += (diskInfo.getAvailableCapacityB() - 1) / (50 * SIZE_1GB) + 1;
+ }
+ }
+ }
+ return buckets;
+ }
+
+ private static int convertParitionSizeToBucketsNum(long partitionSize) {
+ partitionSize /= 5; // for compression 5:1
+
+ // <= 100MB, 1 bucket
+ // <= 1GB, 2 buckets
+ // > 1GB, round to (size / 1G)
+ if (partitionSize <= SIZE_100MB) {
+ return 1;
+ } else if (partitionSize <= SIZE_1GB) {
+ return 2;
+ } else {
+ return (int) ((partitionSize - 1) / SIZE_1GB + 1);
+ }
+ }
+
+ public static int getBucketsNum(long partitionSize) {
+ int bucketsNumByPartitionSize = convertParitionSizeToBucketsNum(partitionSize);
+ int bucketsNumByBE = getBucketsNumByBEDisks();
+ int bucketsNum = Math.min(128, Math.min(bucketsNumByPartitionSize, bucketsNumByBE));
+ int beNum = getBENum();
+ logger.debug("AutoBucketsUtil: bucketsNumByPartitionSize {}, bucketsNumByBE {}, bucketsNum {}, beNum {}",
+ bucketsNumByPartitionSize, bucketsNumByBE, bucketsNum, beNum);
+ if (bucketsNum < bucketsNumByPartitionSize && bucketsNum < beNum) {
+ bucketsNum = beNum;
+ }
+ logger.debug("AutoBucketsUtil: final bucketsNum {}", bucketsNum);
+ return bucketsNum;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
index ff59546543..c4fdf9b9d1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/PropertyAnalyzer.java
@@ -92,6 +92,10 @@ public class PropertyAnalyzer {
public static final String PROPERTIES_INMEMORY = "in_memory";
+ // _auto_bucket can only set in create table stmt rewrite bucket and can not be changed
+ public static final String PROPERTIES_AUTO_BUCKET = "_auto_bucket";
+ public static final String PROPERTIES_ESTIMATE_PARTITION_SIZE = "estimate_partition_size";
+
public static final String PROPERTIES_TABLET_TYPE = "tablet_type";
public static final String PROPERTIES_STRICT_RANGE = "strict_range";
@@ -131,7 +135,7 @@ public class PropertyAnalyzer {
/**
* check and replace members of DataProperty by properties.
*
- * @param properties key->value for members to change.
+ * @param properties key->value for members to change.
* @param oldDataProperty old DataProperty
* @return new DataProperty
* @throws AnalysisException property has invalid key->value
@@ -246,7 +250,8 @@ public class PropertyAnalyzer {
throws AnalysisException {
Short replicationNum = oldReplicationNum;
String propKey = Strings.isNullOrEmpty(prefix)
- ? PROPERTIES_REPLICATION_NUM : prefix + "." + PROPERTIES_REPLICATION_NUM;
+ ? PROPERTIES_REPLICATION_NUM
+ : prefix + "." + PROPERTIES_REPLICATION_NUM;
if (properties != null && properties.containsKey(propKey)) {
try {
replicationNum = Short.valueOf(properties.get(propKey));
@@ -348,7 +353,7 @@ public class PropertyAnalyzer {
}
public static Set<String> analyzeBloomFilterColumns(Map<String, String> properties, List<Column> columns,
- KeysType keysType) throws AnalysisException {
+ KeysType keysType) throws AnalysisException {
Set<String> bfColumns = null;
if (properties != null && properties.containsKey(PROPERTIES_BF_COLUMNS)) {
bfColumns = Sets.newHashSet();
@@ -484,7 +489,7 @@ public class PropertyAnalyzer {
}
// analyzeCompressionType will parse the compression type from properties
- public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException {
+ public static TCompressionType analyzeCompressionType(Map<String, String> properties) throws AnalysisException {
String compressionType = "";
if (properties != null && properties.containsKey(PROPERTIES_COMPRESSION)) {
compressionType = properties.get(PROPERTIES_COMPRESSION);
@@ -546,6 +551,15 @@ public class PropertyAnalyzer {
return defaultVal;
}
+ public static String analyzeEstimatePartitionSize(Map<String, String> properties) {
+ String estimatePartitionSize = "";
+ if (properties != null && properties.containsKey(PROPERTIES_ESTIMATE_PARTITION_SIZE)) {
+ estimatePartitionSize = properties.get(PROPERTIES_ESTIMATE_PARTITION_SIZE);
+ properties.remove(PROPERTIES_ESTIMATE_PARTITION_SIZE);
+ }
+ return estimatePartitionSize;
+ }
+
public static String analyzeStoragePolicy(Map<String, String> properties) throws AnalysisException {
String storagePolicy = "";
if (properties != null && properties.containsKey(PROPERTIES_STORAGE_POLICY)) {
@@ -761,7 +775,6 @@ public class PropertyAnalyzer {
throw new AnalysisException(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE + " must be `true` or `false`");
}
-
/**
* Check the type property of the catalog props.
*/
@@ -777,5 +790,3 @@ public class PropertyAnalyzer {
}
}
}
-
-
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index 415d306f31..0466ac4af0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -241,7 +241,6 @@ public class InternalCatalog implements CatalogIf<Database> {
return INTERNAL_CATALOG_NAME;
}
-
@Override
public List<String> getDbNames() {
return Lists.newArrayList(fullNameToDb.keySet());
@@ -734,12 +733,12 @@ public class InternalCatalog implements CatalogIf<Database> {
if (Strings.isNullOrEmpty(newPartitionName)) {
if (olapTable.getPartition(partitionName) != null) {
throw new DdlException("partition[" + partitionName + "] "
- + "already exist in table[" + tableName + "]");
+ + "already exist in table[" + tableName + "]");
}
} else {
if (olapTable.getPartition(newPartitionName) != null) {
throw new DdlException("partition[" + newPartitionName + "] "
- + "already exist in table[" + tableName + "]");
+ + "already exist in table[" + tableName + "]");
}
}
@@ -932,7 +931,7 @@ public class InternalCatalog implements CatalogIf<Database> {
}
public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay,
- long recycleTime) {
+ long recycleTime) {
if (table.getType() == TableType.ELASTICSEARCH) {
esRepository.deRegisterTable(table.getId());
} else if (table.getType() == TableType.OLAP) {
@@ -964,7 +963,7 @@ public class InternalCatalog implements CatalogIf<Database> {
}
public void replayDropTable(Database db, long tableId, boolean isForceDrop,
- Long recycleTime) throws MetaNotFoundException {
+ Long recycleTime) throws MetaNotFoundException {
Table table = db.getTableOrMetaException(tableId);
db.writeLock();
table.writeLock();
@@ -1002,10 +1001,10 @@ public class InternalCatalog implements CatalogIf<Database> {
schemaHash = olapTable.getSchemaHashByIndexId(info.getIndexId());
}
- Replica replica =
- new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash, info.getDataSize(),
- info.getRemoteDataSize(), info.getRowCount(), ReplicaState.NORMAL, info.getLastFailedVersion(),
- info.getLastSuccessVersion());
+ Replica replica = new Replica(info.getReplicaId(), info.getBackendId(), info.getVersion(), schemaHash,
+ info.getDataSize(),
+ info.getRemoteDataSize(), info.getRowCount(), ReplicaState.NORMAL, info.getLastFailedVersion(),
+ info.getLastSuccessVersion());
tablet.addReplica(replica);
}
@@ -1368,8 +1367,8 @@ public class InternalCatalog implements CatalogIf<Database> {
if (distributionInfo.getType() == DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<Column> newDistriCols = hashDistributionInfo.getDistributionColumns();
- List<Column> defaultDistriCols
- = ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns();
+ List<Column> defaultDistriCols = ((HashDistributionInfo) defaultDistributionInfo)
+ .getDistributionColumns();
if (!newDistriCols.equals(defaultDistriCols)) {
throw new DdlException(
"Cannot assign hash distribution with different distribution cols. " + "default is: "
@@ -1629,7 +1628,7 @@ public class InternalCatalog implements CatalogIf<Database> {
olapTable.dropTempPartition(info.getPartitionName(), true);
} else {
Partition partition = olapTable.dropPartition(info.getDbId(), info.getPartitionName(),
- info.isForceDrop());
+ info.isForceDrop());
if (!info.isForceDrop() && partition != null && info.getRecycleTime() != 0) {
Env.getCurrentRecycleBin().setRecycleTimeByIdForReplay(partition.getId(), info.getRecycleTime());
}
@@ -1660,7 +1659,7 @@ public class InternalCatalog implements CatalogIf<Database> {
DistributionInfo distributionInfo, TStorageMedium storageMedium, ReplicaAllocation replicaAlloc,
Long versionInfo, Set<String> bfColumns, double bfFpp, Set<Long> tabletIdSet, List<Index> indexes,
boolean isInMemory, TStorageFormat storageFormat, TTabletType tabletType, TCompressionType compressionType,
- DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy,
+ DataSortInfo dataSortInfo, boolean enableUniqueKeyMergeOnWrite, String storagePolicy,
IdGeneratorBuffer idGeneratorBuffer, boolean disableAutoCompaction) throws DdlException {
// create base index first.
Preconditions.checkArgument(baseIndexId != -1);
@@ -1920,6 +1919,17 @@ public class InternalCatalog implements CatalogIf<Database> {
olapTable.setReplicationAllocation(replicaAlloc);
+ // set auto bucket
+ boolean isAutoBucket = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_AUTO_BUCKET,
+ false);
+ olapTable.setIsAutoBucket(isAutoBucket);
+
+ // set estimate partition size
+ if (isAutoBucket) {
+ String estimatePartitionSize = PropertyAnalyzer.analyzeEstimatePartitionSize(properties);
+ olapTable.setEstimatePartitionSize(estimatePartitionSize);
+ }
+
// set in memory
boolean isInMemory = PropertyAnalyzer.analyzeBooleanProp(properties, PropertyAnalyzer.PROPERTIES_INMEMORY,
false);
diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex
index 1f50df0105..7cbc2c896b 100644
--- a/fe/fe-core/src/main/jflex/sql_scanner.flex
+++ b/fe/fe-core/src/main/jflex/sql_scanner.flex
@@ -465,6 +465,7 @@ import org.apache.doris.qe.SqlModeHelper;
keywordMap.put("write", new Integer(SqlParserSymbols.KW_WRITE));
keywordMap.put("year", new Integer(SqlParserSymbols.KW_YEAR));
keywordMap.put("mtmv", new Integer(SqlParserSymbols.KW_MTMV));
+ keywordMap.put("auto", new Integer(SqlParserSymbols.KW_AUTO));
}
// map from token id to token description
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java
new file mode 100644
index 0000000000..fa61bd48a2
--- /dev/null
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/util/AutoBucketUtilsTest.java
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.util;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.persist.EditLog;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.ResultSetMetaData;
+import org.apache.doris.qe.ShowResultSet;
+import org.apache.doris.system.Backend;
+import org.apache.doris.system.SystemInfoService;
+import org.apache.doris.thrift.TDisk;
+import org.apache.doris.thrift.TStorageMedium;
+import org.apache.doris.utframe.UtFrameUtils;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import mockit.Expectations;
+import mockit.Mocked;
+import org.hamcrest.MatcherAssert;
+import org.hamcrest.core.StringContains;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+public class AutoBucketUtilsTest {
+ private static String databaseName = "AutoBucketUtilsTest";
+ // use a unique dir so that it won't be conflict with other unit test which
+ // may also start a Mocked Frontend
+ private static String runningDirBase = "fe";
+ private static String runningDir = runningDirBase + "/mocked/AutoBucketUtilsTest/" + UUID.randomUUID().toString()
+ + "/";
+ private static List<Backend> backends = Lists.newArrayList();
+ private static Random random = new Random(System.currentTimeMillis());
+ private ConnectContext connectContext;
+
+ // // create backends by be num, disk num, disk capacity
+ private static void createClusterWithBackends(int beNum, int diskNum, long diskCapacity) throws Exception {
+ UtFrameUtils.createDorisClusterWithMultiTag(runningDir, beNum);
+ // must set disk info, or the tablet scheduler won't work
+ backends = Env.getCurrentSystemInfo().getClusterBackends(SystemInfoService.DEFAULT_CLUSTER);
+ for (Backend be : backends) {
+ setDiskInfos(diskNum, diskCapacity, be);
+ }
+ }
+
+ private static ImmutableMap<Long, Backend> createBackends(int beNum, int diskNum, long diskCapacity)
+ throws Exception {
+ // must set disk info, or the tablet scheduler won't work
+ Map<Long, Backend> backends = Maps.newHashMap();
+ for (int i = 0; i < beNum; ++i) {
+ Backend be = new Backend(10000 + i, "127.0.0." + (i + 1), 9000 + i);
+ be.setAlive(true);
+ backends.put(be.getId(), be);
+ }
+ for (Backend be : backends.values()) {
+ setDiskInfos(diskNum, diskCapacity, be);
+ }
+ return ImmutableMap.copyOf(backends);
+ }
+
+ private static void setDiskInfos(int diskNum, long diskCapacity, Backend be) {
+ Map<String, TDisk> backendDisks = Maps.newHashMap();
+ for (int i = 0; i < diskNum; ++i) {
+ TDisk disk = new TDisk();
+ disk.setRootPath("/home/doris/" + UUID.randomUUID().toString());
+ disk.setDiskTotalCapacity(diskCapacity);
+ disk.setDataUsedCapacity(0);
+ disk.setUsed(true);
+ disk.setDiskAvailableCapacity(disk.disk_total_capacity - disk.data_used_capacity);
+ disk.setPathHash(random.nextLong());
+ disk.setStorageMedium(TStorageMedium.HDD);
+ backendDisks.put(disk.getRootPath(), disk);
+ }
+ be.updateDisks(backendDisks);
+ }
+
+ private void expectations(Env env, EditLog editLog, SystemInfoService systemInfoService,
+ ImmutableMap<Long, Backend> backends) {
+ new Expectations() {
+ {
+ Env.getCurrentSystemInfo();
+ minTimes = 0;
+ result = systemInfoService;
+
+ systemInfoService.getBackendsInCluster(null);
+ minTimes = 0;
+ result = backends;
+
+ Env.getCurrentEnv();
+ minTimes = 0;
+ result = env;
+
+ env.getEditLog();
+ minTimes = 0;
+ result = editLog;
+
+ editLog.logBackendStateChange((Backend) any);
+ minTimes = 0;
+ }
+ };
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ FeConstants.runningUnitTest = true;
+ FeConstants.tablet_checker_interval_ms = 1000;
+ FeConstants.default_scheduler_interval_millisecond = 100;
+ Config.tablet_repair_delay_factor_second = 1;
+ connectContext = UtFrameUtils.createDefaultCtx();
+ }
+
+ @After
+ public void tearDown() {
+ Env.getCurrentEnv().clear();
+ UtFrameUtils.cleanDorisFeDir(runningDirBase);
+ }
+
+ private static String genTableNameWithoutDatabase(String estimatePartitionSize) {
+ return "size_" + estimatePartitionSize;
+ }
+
+ private static String genTableName(String estimatePartitionSize) {
+ return databaseName + "." + genTableNameWithoutDatabase(estimatePartitionSize);
+ }
+
+ private static String genTableNameByTag(String estimatePartitionSize, String tag) {
+ return databaseName + "." + genTableNameWithoutDatabase(estimatePartitionSize) + "_" + tag;
+ }
+
+ private static String genCreateTableSql(String estimatePartitionSize) {
+ return "CREATE TABLE IF NOT EXISTS " + genTableName(estimatePartitionSize) + "\n"
+ + "(\n"
+ + "`user_id` LARGEINT NOT NULL\n"
+ + ")\n"
+ + "DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n"
+ + "PROPERTIES (\n"
+ + "\"estimate_partition_size\" = \"" + estimatePartitionSize + "\",\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")";
+ }
+
+ private void createTable(String sql) throws Exception {
+ // create database first
+ UtFrameUtils.createDatabase(connectContext, databaseName);
+ UtFrameUtils.createTable(connectContext, sql);
+ }
+
+ private void createTableBySize(String estimatePartitionSize) throws Exception {
+ createTable(genCreateTableSql(estimatePartitionSize));
+ }
+
+ private int getPartitionBucketNum(String tableName) throws Exception {
+ ShowResultSet result = UtFrameUtils.showPartitionsByName(connectContext, tableName);
+ ResultSetMetaData metaData = result.getMetaData();
+
+ for (int i = 0; i < metaData.getColumnCount(); ++i) {
+ if (metaData.getColumn(i).getName().equalsIgnoreCase("buckets")) {
+ return Integer.valueOf(result.getResultRows().get(0).get(i));
+ }
+ }
+
+ throw new Exception("No buckets column in show partitions result");
+ }
+
+ // also has checked create table && show partitions
+ @Test
+ public void testWithoutEstimatePartitionSize() throws Exception {
+ String tableName = genTableName("");
+ String sql = "CREATE TABLE IF NOT EXISTS " + tableName + "\n"
+ + "(\n"
+ + "`user_id` LARGEINT NOT NULL\n"
+ + ")\n"
+ + "DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n"
+ + "PROPERTIES (\n"
+ + "\"replication_num\" = \"1\"\n"
+ + ")";
+
+ createClusterWithBackends(1, 1, 2000000000);
+
+ createTable(sql);
+ ShowResultSet showCreateTableResult = UtFrameUtils.showCreateTableByName(connectContext, tableName);
+ String showCreateTableResultSql = showCreateTableResult.getResultRows().get(0).get(1);
+ MatcherAssert.assertThat(showCreateTableResultSql,
+ StringContains.containsString("DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO\n"));
+ int bucketNum = getPartitionBucketNum(tableName);
+ Assert.assertEquals(FeConstants.default_bucket_num, bucketNum);
+ }
+
+ @Test
+ public void test100MB(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+ throws Exception {
+ long estimatePartitionSize = AutoBucketUtils.SIZE_100MB;
+ ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2000000000);
+ expectations(env, editLog, systemInfoService, backends);
+ Assert.assertEquals(1, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+ }
+
+ @Test
+ public void test500MB(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+ throws Exception {
+ long estimatePartitionSize = 5 * AutoBucketUtils.SIZE_100MB;
+ ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2000000000);
+ expectations(env, editLog, systemInfoService, backends);
+ Assert.assertEquals(1, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+ }
+
+ @Test
+ public void test1G(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+ throws Exception {
+ long estimatePartitionSize = AutoBucketUtils.SIZE_1GB;
+ ImmutableMap<Long, Backend> backends = createBackends(3, 2, 500 * AutoBucketUtils.SIZE_1GB);
+ expectations(env, editLog, systemInfoService, backends);
+ Assert.assertEquals(2, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+ }
+
+ @Test
+ public void test100G(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+ throws Exception {
+ long estimatePartitionSize = 100 * AutoBucketUtils.SIZE_1GB;
+ ImmutableMap<Long, Backend> backends = createBackends(3, 2, 500 * AutoBucketUtils.SIZE_1GB);
+ expectations(env, editLog, systemInfoService, backends);
+ Assert.assertEquals(20, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+ }
+
+ @Test
+ public void test500G_0(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+ throws Exception {
+ long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB;
+ ImmutableMap<Long, Backend> backends = createBackends(3, 1, AutoBucketUtils.SIZE_1TB);
+ expectations(env, editLog, systemInfoService, backends);
+ Assert.assertEquals(63, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+ }
+
+ @Test
+ public void test500G_1(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+ throws Exception {
+ long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB;
+ ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2 * AutoBucketUtils.SIZE_1TB);
+ expectations(env, editLog, systemInfoService, backends);
+ Assert.assertEquals(100, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+ }
+
+ @Test
+ public void test500G_2(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+ throws Exception {
+ long estimatePartitionSize = 500 * AutoBucketUtils.SIZE_1GB;
+ ImmutableMap<Long, Backend> backends = createBackends(1, 1, 100 * AutoBucketUtils.SIZE_1TB);
+ expectations(env, editLog, systemInfoService, backends);
+ Assert.assertEquals(100, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+ }
+
+ @Test
+ public void test1T_0(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+ throws Exception {
+ long estimatePartitionSize = AutoBucketUtils.SIZE_1TB;
+ ImmutableMap<Long, Backend> backends = createBackends(10, 3, 2 * AutoBucketUtils.SIZE_1TB);
+ expectations(env, editLog, systemInfoService, backends);
+ Assert.assertEquals(128, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+ }
+
+ @Test
+ public void test1T_1(@Mocked Env env, @Mocked EditLog editLog, @Mocked SystemInfoService systemInfoService)
+ throws Exception {
+ long estimatePartitionSize = AutoBucketUtils.SIZE_1TB;
+ ImmutableMap<Long, Backend> backends = createBackends(200, 7, 4 * AutoBucketUtils.SIZE_1TB);
+ expectations(env, editLog, systemInfoService, backends);
+ Assert.assertEquals(200, AutoBucketUtils.getBucketsNum(estimatePartitionSize));
+ }
+}
diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
index adc5a86a39..0161152904 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/UtFrameUtils.java
@@ -18,14 +18,20 @@
package org.apache.doris.utframe;
import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.CreateDbStmt;
+import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.analysis.ShowCreateTableStmt;
+import org.apache.doris.analysis.ShowPartitionsStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DiskInfo;
import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
@@ -35,6 +41,8 @@ import org.apache.doris.planner.Planner;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.QueryState;
+import org.apache.doris.qe.ShowExecutor;
+import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
@@ -62,6 +70,7 @@ import java.net.ServerSocket;
import java.net.SocketException;
import java.nio.channels.SocketChannel;
import java.nio.file.Files;
+import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -265,7 +274,7 @@ public class UtFrameUtils {
// start be
MockedBackend backend = MockedBackendFactory.createBackend(beHost, beHeartbeatPort, beThriftPort, beBrpcPort,
beHttpPort, new DefaultHeartbeatServiceImpl(beThriftPort, beHttpPort, beBrpcPort),
- new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
+ new DefaultBeThriftServiceImpl(), new DefaultPBackendServiceImpl());
backend.setFeAddress(new TNetworkAddress("127.0.0.1", feRpcPort));
backend.start();
@@ -305,7 +314,7 @@ public class UtFrameUtils {
datagramSocket.setReuseAddress(true);
break;
} catch (SocketException e) {
- System.out.println("The port " + port + " is invalid and try another port.");
+ System.out.println("The port " + port + " is invalid and try another port.");
}
} catch (IOException e) {
throw new IllegalStateException("Could not find a free TCP/IP port to start HTTP Server on");
@@ -355,8 +364,8 @@ public class UtFrameUtils {
}
public static String getStmtDigest(ConnectContext connectContext, String originStmt) throws Exception {
- SqlScanner input =
- new SqlScanner(new StringReader(originStmt), connectContext.getSessionVariable().getSqlMode());
+ SqlScanner input = new SqlScanner(new StringReader(originStmt),
+ connectContext.getSessionVariable().getSqlMode());
SqlParser parser = new SqlParser(input);
StatementBase statementBase = SqlParserUtils.getFirstStmt(parser);
Preconditions.checkState(statementBase instanceof QueryStmt);
@@ -370,4 +379,70 @@ public class UtFrameUtils {
String realVNodeName = idx + ":V" + nodeName;
return planResult.contains(realNodeName) || planResult.contains(realVNodeName);
}
+
+ public static void createDatabase(ConnectContext ctx, String db) throws Exception {
+ String createDbStmtStr = "CREATE DATABASE " + db;
+ CreateDbStmt createDbStmt = (CreateDbStmt) parseAndAnalyzeStmt(createDbStmtStr, ctx);
+ Env.getCurrentEnv().createDb(createDbStmt);
+ }
+
+ public static void createTable(ConnectContext ctx, String sql) throws Exception {
+ try {
+ createTables(ctx, sql);
+ } catch (ConcurrentModificationException e) {
+ e.printStackTrace();
+ throw e;
+ }
+ }
+
+ public static void createTables(ConnectContext ctx, String... sqls) throws Exception {
+ for (String sql : sqls) {
+ CreateTableStmt stmt = (CreateTableStmt) parseAndAnalyzeStmt(sql, ctx);
+ Env.getCurrentEnv().createTable(stmt);
+ }
+ updateReplicaPathHash();
+ }
+
+ public static ShowResultSet showCreateTable(ConnectContext ctx, String sql) throws Exception {
+ ShowCreateTableStmt stmt = (ShowCreateTableStmt) parseAndAnalyzeStmt(sql, ctx);
+ ShowExecutor executor = new ShowExecutor(ctx, stmt);
+ return executor.execute();
+ }
+
+ public static ShowResultSet showCreateTableByName(ConnectContext ctx, String table) throws Exception {
+ String sql = "show create table " + table;
+ return showCreateTable(ctx, sql);
+ }
+
+ public static ShowResultSet showPartitions(ConnectContext ctx, String sql) throws Exception {
+ ShowPartitionsStmt stmt = (ShowPartitionsStmt) parseAndAnalyzeStmt(sql, ctx);
+ ShowExecutor executor = new ShowExecutor(ctx, stmt);
+ return executor.execute();
+ }
+
+ public static ShowResultSet showPartitionsByName(ConnectContext ctx, String table) throws Exception {
+ String sql = "show partitions from " + table;
+ return showPartitions(ctx, sql);
+ }
+
+ private static void updateReplicaPathHash() {
+ com.google.common.collect.Table<Long, Long, Replica> replicaMetaTable = Env.getCurrentInvertedIndex()
+ .getReplicaMetaTable();
+ for (com.google.common.collect.Table.Cell<Long, Long, Replica> cell : replicaMetaTable.cellSet()) {
+ long beId = cell.getColumnKey();
+ Backend be = Env.getCurrentSystemInfo().getBackend(beId);
+ if (be == null) {
+ continue;
+ }
+ Replica replica = cell.getValue();
+ TabletMeta tabletMeta = Env.getCurrentInvertedIndex().getTabletMeta(cell.getRowKey());
+ ImmutableMap<String, DiskInfo> diskMap = be.getDisks();
+ for (DiskInfo diskInfo : diskMap.values()) {
+ if (diskInfo.getStorageMedium() == tabletMeta.getStorageMedium()) {
+ replica.setPathHash(diskInfo.getPathHash());
+ break;
+ }
+ }
+ }
+ }
}
diff --git a/regression-test/suites/autobucket/test_autobucket.groovy b/regression-test/suites/autobucket/test_autobucket.groovy
new file mode 100644
index 0000000000..29945e0f9a
--- /dev/null
+++ b/regression-test/suites/autobucket/test_autobucket.groovy
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_autobucket") {
+ sql "drop table if exists autobucket_test"
+ result = sql """
+ CREATE TABLE `autobucket_test` (
+ `user_id` largeint(40) NOT NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`user_id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`user_id`) BUCKETS AUTO
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ )
+ """
+
+ result = sql "show create table autobucket_test"
+ assertTrue(result.toString().containsIgnoreCase("BUCKETS AUTO"))
+
+ result = sql "show partitions from autobucket_test"
+ logger.info("${result}")
+ // XXX: buckets at pos(8), next maybe impl by sql meta
+ assertEquals(Integer.valueOf(result.get(0).get(8)), 10)
+
+ sql "drop table if exists autobucket_test"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org