You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/30 08:57:19 UTC
[doris] branch master updated: [feature](binlog) Add AddPartitionRecord && DROP_PARTITION (#21344)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new de39632f1b [feature](binlog) Add AddPartitionRecord && DROP_PARTITION (#21344)
de39632f1b is described below
commit de39632f1b7d4c53929a88daa7ed9f412a307dc1
Author: Jack Drogon <ja...@gmail.com>
AuthorDate: Fri Jun 30 16:57:11 2023 +0800
[feature](binlog) Add AddPartitionRecord && DROP_PARTITION (#21344)
Signed-off-by: Jack Drogon <ja...@gmail.com>
---
.../apache/doris/binlog/AddPartitionRecord.java | 23 ++++++++++++++++++++++
.../org/apache/doris/binlog/BinlogManager.java | 12 +++++++++++
.../apache/doris/persist/DropPartitionInfo.java | 13 ++++++++++++
.../java/org/apache/doris/persist/EditLog.java | 12 +++++++----
gensrc/thrift/FrontendService.thrift | 2 ++
5 files changed, 58 insertions(+), 4 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java
index 5dc2f62749..9bc5ff7da0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java
@@ -18,6 +18,7 @@
package org.apache.doris.binlog;
import org.apache.doris.catalog.DataProperty;
+import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
@@ -51,6 +52,8 @@ public class AddPartitionRecord {
private boolean isTempPartition = false;
@SerializedName(value = "isMutable")
private boolean isMutable = true;
+ @SerializedName(value = "sql")
+ private String sql;
public AddPartitionRecord(long commitSeq, PartitionPersistInfo partitionPersistInfo) {
this.commitSeq = commitSeq;
@@ -64,6 +67,26 @@ public class AddPartitionRecord {
this.isInMemory = partitionPersistInfo.isInMemory();
this.isTempPartition = partitionPersistInfo.isTempPartition();
this.isMutable = partitionPersistInfo.isMutable();
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("ADD PARTITION ").append("`").append(partition.getName()).append("`").append(" VALUES ");
+ if (this.listPartitionItem.equals(ListPartitionItem.DUMMY_ITEM)) {
+ // range
+ sb.append("[");
+ sb.append(range.lowerEndpoint().toSql());
+ sb.append(", ");
+ sb.append(range.upperEndpoint().toSql());
+ sb.append(")");
+ } else {
+ // list
+ sb.append("IN (");
+ sb.append(((ListPartitionItem) listPartitionItem).toSql());
+ sb.append(")");
+ }
+ sb.append("(\"version_info\" = \"");
+ sb.append(partition.getVisibleVersion()).append("\"");
+ sb.append(");");
+ this.sql = sb.toString();
}
public long getCommitSeq() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 59ba596152..bccc5dfc48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.persist.BinlogGcInfo;
+import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
@@ -128,6 +129,17 @@ public class BinlogManager {
addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
}
+ public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) {
+ long dbId = dropPartitionInfo.getDbId();
+ List<Long> tableIds = new ArrayList<Long>();
+ tableIds.add(dropPartitionInfo.getTableId());
+ long timestamp = -1;
+ TBinlogType type = TBinlogType.DROP_PARTITION;
+ String data = dropPartitionInfo.toJson();
+
+ addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+ }
+
// get binlog by dbId, return first binlog.version > version
public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long commitSeq) {
TStatus status = new TStatus(TStatusCode.OK);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java
index 5a4a07f0cd..f4fac91526 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java
@@ -40,6 +40,8 @@ public class DropPartitionInfo implements Writable {
private boolean forceDrop = false;
@SerializedName(value = "recycleTime")
private long recycleTime = 0;
+ @SerializedName(value = "sql")
+ private String sql;
private DropPartitionInfo() {
}
@@ -52,6 +54,17 @@ public class DropPartitionInfo implements Writable {
this.isTempPartition = isTempPartition;
this.forceDrop = forceDrop;
this.recycleTime = recycleTime;
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("DROP PARTITION ");
+ if (isTempPartition) {
+ sb.append("TEMPORARY ");
+ }
+ sb.append("`").append(partitionName).append("`");
+ if (forceDrop) {
+ sb.append(" FORCE");
+ }
+ this.sql = sb.toString();
}
public Long getDbId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 7be7253f1d..b67ca9259e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -234,8 +234,8 @@ public class EditLog {
"Begin to unprotect add partition. db = " + info.getDbId() + " table = " + info.getTableId()
+ " partitionName = " + info.getPartition().getName());
AddPartitionRecord addPartitionRecord = new AddPartitionRecord(logId, info);
- Env.getCurrentEnv().getBinlogManager().addAddPartitionRecord(addPartitionRecord);
env.replayAddPartition(info);
+ env.getBinlogManager().addAddPartitionRecord(addPartitionRecord);
break;
}
case OperationType.OP_DROP_PARTITION: {
@@ -243,6 +243,7 @@ public class EditLog {
LOG.info("Begin to unprotect drop partition. db = " + info.getDbId() + " table = "
+ info.getTableId() + " partitionName = " + info.getPartitionName());
env.replayDropPartition(info);
+ env.getBinlogManager().addDropPartitionRecord(info, logId);
break;
}
case OperationType.OP_MODIFY_PARTITION: {
@@ -1204,14 +1205,17 @@ public class EditLog {
logEdit(OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA, info);
}
- public void logAddPartition(PartitionPersistInfo info) {
+ public long logAddPartition(PartitionPersistInfo info) {
long logId = logEdit(OperationType.OP_ADD_PARTITION, info);
AddPartitionRecord record = new AddPartitionRecord(logId, info);
Env.getCurrentEnv().getBinlogManager().addAddPartitionRecord(record);
+ return logId;
}
- public void logDropPartition(DropPartitionInfo info) {
- logEdit(OperationType.OP_DROP_PARTITION, info);
+ public long logDropPartition(DropPartitionInfo info) {
+ long logId = logEdit(OperationType.OP_DROP_PARTITION, info);
+ Env.getCurrentEnv().getBinlogManager().addDropPartitionRecord(info, logId);
+ return logId;
}
public void logErasePartition(long partitionId) {
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index ca84522401..32e2e3b0a3 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -956,6 +956,8 @@ enum TBinlogType {
UPSERT = 0,
ADD_PARTITION = 1,
CREATE_TABLE = 2,
+ DROP_PARTITION = 3,
+ DROP_TABLE = 4,
}
struct TBinlog {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org