You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/30 08:57:19 UTC

[doris] branch master updated: [feature](binlog) Add AddPartitionRecord && DROP_PARTITION (#21344)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new de39632f1b [feature](binlog) Add AddPartitionRecord && DROP_PARTITION (#21344)
de39632f1b is described below

commit de39632f1b7d4c53929a88daa7ed9f412a307dc1
Author: Jack Drogon <ja...@gmail.com>
AuthorDate: Fri Jun 30 16:57:11 2023 +0800

    [feature](binlog) Add AddPartitionRecord && DROP_PARTITION (#21344)
    
    Signed-off-by: Jack Drogon <ja...@gmail.com>
---
 .../apache/doris/binlog/AddPartitionRecord.java    | 23 ++++++++++++++++++++++
 .../org/apache/doris/binlog/BinlogManager.java     | 12 +++++++++++
 .../apache/doris/persist/DropPartitionInfo.java    | 13 ++++++++++++
 .../java/org/apache/doris/persist/EditLog.java     | 12 +++++++----
 gensrc/thrift/FrontendService.thrift               |  2 ++
 5 files changed, 58 insertions(+), 4 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java
index 5dc2f62749..9bc5ff7da0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/AddPartitionRecord.java
@@ -18,6 +18,7 @@
 package org.apache.doris.binlog;
 
 import org.apache.doris.catalog.DataProperty;
+import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.PartitionItem;
 import org.apache.doris.catalog.PartitionKey;
@@ -51,6 +52,8 @@ public class AddPartitionRecord {
     private boolean isTempPartition = false;
     @SerializedName(value = "isMutable")
     private boolean isMutable = true;
+    @SerializedName(value = "sql")
+    private String sql;
 
     public AddPartitionRecord(long commitSeq, PartitionPersistInfo partitionPersistInfo) {
         this.commitSeq = commitSeq;
@@ -64,6 +67,26 @@ public class AddPartitionRecord {
         this.isInMemory = partitionPersistInfo.isInMemory();
         this.isTempPartition = partitionPersistInfo.isTempPartition();
         this.isMutable = partitionPersistInfo.isMutable();
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("ADD PARTITION ").append("`").append(partition.getName()).append("`").append(" VALUES ");
+        if (this.listPartitionItem.equals(ListPartitionItem.DUMMY_ITEM)) {
+            // range
+            sb.append("[");
+            sb.append(range.lowerEndpoint().toSql());
+            sb.append(", ");
+            sb.append(range.upperEndpoint().toSql());
+            sb.append(")");
+        } else {
+            // list
+            sb.append("IN (");
+            sb.append(((ListPartitionItem) listPartitionItem).toSql());
+            sb.append(")");
+        }
+        sb.append("(\"version_info\" = \"");
+        sb.append(partition.getVisibleVersion()).append("\"");
+        sb.append(");");
+        this.sql = sb.toString();
     }
 
     public long getCommitSeq() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
index 59ba596152..bccc5dfc48 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java
@@ -22,6 +22,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.Pair;
 import org.apache.doris.persist.BinlogGcInfo;
+import org.apache.doris.persist.DropPartitionInfo;
 import org.apache.doris.thrift.TBinlog;
 import org.apache.doris.thrift.TBinlogType;
 import org.apache.doris.thrift.TStatus;
@@ -128,6 +129,17 @@ public class BinlogManager {
         addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
     }
 
+    public void addDropPartitionRecord(DropPartitionInfo dropPartitionInfo, long commitSeq) {
+        long dbId = dropPartitionInfo.getDbId();
+        List<Long> tableIds = new ArrayList<Long>();
+        tableIds.add(dropPartitionInfo.getTableId());
+        long timestamp = -1;
+        TBinlogType type = TBinlogType.DROP_PARTITION;
+        String data = dropPartitionInfo.toJson();
+
+        addBinlog(dbId, tableIds, commitSeq, timestamp, type, data);
+    }
+
     // get binlog by dbId, return first binlog.version > version
     public Pair<TStatus, TBinlog> getBinlog(long dbId, long tableId, long commitSeq) {
         TStatus status = new TStatus(TStatusCode.OK);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java
index 5a4a07f0cd..f4fac91526 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/DropPartitionInfo.java
@@ -40,6 +40,8 @@ public class DropPartitionInfo implements Writable {
     private boolean forceDrop = false;
     @SerializedName(value = "recycleTime")
     private long recycleTime = 0;
+    @SerializedName(value = "sql")
+    private String sql;
 
     private DropPartitionInfo() {
     }
@@ -52,6 +54,17 @@ public class DropPartitionInfo implements Writable {
         this.isTempPartition = isTempPartition;
         this.forceDrop = forceDrop;
         this.recycleTime = recycleTime;
+
+        StringBuilder sb = new StringBuilder();
+        sb.append("DROP PARTITION ");
+        if (isTempPartition) {
+            sb.append("TEMPORARY ");
+        }
+        sb.append("`").append(partitionName).append("`");
+        if (forceDrop) {
+            sb.append(" FORCE");
+        }
+        this.sql = sb.toString();
     }
 
     public Long getDbId() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 7be7253f1d..b67ca9259e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -234,8 +234,8 @@ public class EditLog {
                             "Begin to unprotect add partition. db = " + info.getDbId() + " table = " + info.getTableId()
                                     + " partitionName = " + info.getPartition().getName());
                     AddPartitionRecord addPartitionRecord = new AddPartitionRecord(logId, info);
-                    Env.getCurrentEnv().getBinlogManager().addAddPartitionRecord(addPartitionRecord);
                     env.replayAddPartition(info);
+                    env.getBinlogManager().addAddPartitionRecord(addPartitionRecord);
                     break;
                 }
                 case OperationType.OP_DROP_PARTITION: {
@@ -243,6 +243,7 @@ public class EditLog {
                     LOG.info("Begin to unprotect drop partition. db = " + info.getDbId() + " table = "
                             + info.getTableId() + " partitionName = " + info.getPartitionName());
                     env.replayDropPartition(info);
+                    env.getBinlogManager().addDropPartitionRecord(info, logId);
                     break;
                 }
                 case OperationType.OP_MODIFY_PARTITION: {
@@ -1204,14 +1205,17 @@ public class EditLog {
         logEdit(OperationType.OP_ALTER_EXTERNAL_TABLE_SCHEMA, info);
     }
 
-    public void logAddPartition(PartitionPersistInfo info) {
+    public long logAddPartition(PartitionPersistInfo info) {
         long logId = logEdit(OperationType.OP_ADD_PARTITION, info);
         AddPartitionRecord record = new AddPartitionRecord(logId, info);
         Env.getCurrentEnv().getBinlogManager().addAddPartitionRecord(record);
+        return logId;
     }
 
-    public void logDropPartition(DropPartitionInfo info) {
-        logEdit(OperationType.OP_DROP_PARTITION, info);
+    public long logDropPartition(DropPartitionInfo info) {
+        long logId = logEdit(OperationType.OP_DROP_PARTITION, info);
+        Env.getCurrentEnv().getBinlogManager().addDropPartitionRecord(info, logId);
+        return logId;
     }
 
     public void logErasePartition(long partitionId) {
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index ca84522401..32e2e3b0a3 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -956,6 +956,8 @@ enum TBinlogType {
   UPSERT = 0,
   ADD_PARTITION = 1,
   CREATE_TABLE = 2,
+  DROP_PARTITION = 3,
+  DROP_TABLE = 4,
 }
 
 struct TBinlog {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org