You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/28 03:10:19 UTC

[doris] branch master updated: [improvement](backup) Add BackupJobInfo with tableCommitSeqMap (#21255)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 08fe22cb0c [improvement](backup) Add BackupJobInfo with tableCommitSeqMap (#21255)
08fe22cb0c is described below

commit 08fe22cb0c1c16aaaf23b69d167fdab397c3742e
Author: Jack Drogon <ja...@gmail.com>
AuthorDate: Wed Jun 28 11:10:12 2023 +0800

    [improvement](backup) Add BackupJobInfo with tableCommitSeqMap (#21255)
    
    Signed-off-by: Jack Drogon <ja...@gmail.com>
---
 .../java/org/apache/doris/backup/BackupJob.java    | 23 +++++++++++++--
 .../org/apache/doris/backup/BackupJobInfo.java     |  6 +++-
 .../org/apache/doris/journal/JournalEntity.java    |  6 ++++
 .../java/org/apache/doris/persist/BarrierLog.java  | 34 ++++++++++++++++++++++
 .../java/org/apache/doris/persist/EditLog.java     | 17 ++++++++---
 .../org/apache/doris/persist/OperationType.java    |  2 ++
 .../org/apache/doris/backup/BackupHandlerTest.java |  2 +-
 7 files changed, 82 insertions(+), 8 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 61eda11a79..565efda2d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -75,6 +75,7 @@ import java.util.stream.Collectors;
 
 public class BackupJob extends AbstractJob {
     private static final Logger LOG = LogManager.getLogger(BackupJob.class);
+    private static final String TABLE_COMMIT_SEQ_PREFIX = "table_commit_seq:";
 
     public enum BackupJobState {
         PENDING, // Job is newly created. Send snapshot tasks and save copied meta info, then transfer to SNAPSHOTING
@@ -110,7 +111,7 @@ public class BackupJob extends AbstractJob {
     // save the local file path of meta info and job info file
     private String localMetaInfoFilePath = null;
     private String localJobInfoFilePath = null;
-    // backup properties
+    // backup properties && table commit seq with table id
     private Map<String, String> properties = Maps.newHashMap();
 
     private byte[] metaInfoBytes = null;
@@ -431,6 +432,12 @@ public class BackupJob extends AbstractJob {
 
     private void prepareSnapshotTaskForOlapTableWithoutLock(OlapTable olapTable,
             TableRef backupTableRef, AgentBatchTask batchTask) {
+        // Add barrier editolog for barrier commit seq
+        long commitSeq = env.getEditLog().logBarrier();
+        // format as "table:{tableId}"
+        String tableKey = String.format("%s%d", TABLE_COMMIT_SEQ_PREFIX, olapTable.getId());
+        properties.put(tableKey, String.valueOf(commitSeq));
+
         // check backup table again
         if (backupTableRef.getPartitionNames() != null) {
             for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) {
@@ -680,8 +687,20 @@ public class BackupJob extends AbstractJob {
             metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
 
             // 3. save job info file
+            Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
+            // iterate properties, convert key, value from string to long
+            // key is "${TABLE_COMMIT_SEQ_PREFIX}{tableId}", only need tableId to long
+            for (Map.Entry<String, String> entry : properties.entrySet()) {
+                String key = entry.getKey();
+                String value = entry.getValue();
+                if (key.startsWith(TABLE_COMMIT_SEQ_PREFIX)) {
+                    long tableId = Long.parseLong(key.substring(TABLE_COMMIT_SEQ_PREFIX.length()));
+                    long commitSeq = Long.parseLong(value);
+                    tableCommitSeqMap.put(tableId, commitSeq);
+                }
+            }
             jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, dbId,
-                    getContent(), backupMeta, snapshotInfos);
+                    getContent(), backupMeta, snapshotInfos, tableCommitSeqMap);
             LOG.debug("job info: {}. {}", jobInfo, this);
             File jobInfoFile = new File(jobDir, Repository.PREFIX_JOB_INFO + createTimeStr);
             if (!jobInfoFile.createNewFile()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
index 4457740440..fff2b755c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
@@ -99,6 +99,9 @@ public class BackupJobInfo implements Writable {
     @SerializedName("tablet_snapshot_path_map")
     public Map<Long, String> tabletSnapshotPathMap = Maps.newHashMap();
 
+    @SerializedName("table_commit_seq_map")
+    public Map<Long, Long> tableCommitSeqMap;
+
     public static class ExtraInfo {
         public static class NetworkAddrss {
             @SerializedName("ip")
@@ -575,7 +578,7 @@ public class BackupJobInfo implements Writable {
 
     public static BackupJobInfo fromCatalog(long backupTime, String label, String dbName, long dbId,
                                             BackupContent content, BackupMeta backupMeta,
-                                            Map<Long, SnapshotInfo> snapshotInfos) {
+                                            Map<Long, SnapshotInfo> snapshotInfos, Map<Long, Long> tableCommitSeqMap) {
 
         BackupJobInfo jobInfo = new BackupJobInfo();
         jobInfo.backupTime = backupTime;
@@ -584,6 +587,7 @@ public class BackupJobInfo implements Writable {
         jobInfo.dbId = dbId;
         jobInfo.metaVersion = FeConstants.meta_version;
         jobInfo.content = content;
+        jobInfo.tableCommitSeqMap = tableCommitSeqMap;
 
         Collection<Table> tbls = backupMeta.getTables().values();
         // tbls
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 39b804eb40..2a29b18c85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -69,6 +69,7 @@ import org.apache.doris.persist.AlterViewInfo;
 import org.apache.doris.persist.AnalyzeDeletionLog;
 import org.apache.doris.persist.BackendReplicasInfo;
 import org.apache.doris.persist.BackendTabletsInfo;
+import org.apache.doris.persist.BarrierLog;
 import org.apache.doris.persist.BatchDropInfo;
 import org.apache.doris.persist.BatchModifyPartitionsInfo;
 import org.apache.doris.persist.BatchRemoveTransactionsOperation;
@@ -829,6 +830,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_BARRIER: {
+                data = new BarrierLog();
+                isRead = true;
+                break;
+            }
             default: {
                 IOException e = new IOException();
                 LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
new file mode 100644
index 0000000000..e44cbb2f31
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class BarrierLog implements Writable {
+    public BarrierLog() {
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        Text.writeString(out, "");
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 4244664d43..7be7253f1d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -1037,6 +1037,11 @@ public class EditLog {
                     env.replayGcBinlog(binlogGcInfo);
                     break;
                 }
+                case OperationType.OP_BARRIER: {
+                    // the log only for barrier commit seq, not need to replay
+                    LOG.info("replay barrier");
+                    break;
+                }
                 default: {
                     IOException e = new IOException();
                     LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1800,11 +1805,15 @@ public class EditLog {
         logEdit(OperationType.OP_DELETE_ANALYSIS_TASK, log);
     }
 
-    public void logAlterDatabaseProperty(AlterDatabasePropertyInfo log) {
-        logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
+    public long logAlterDatabaseProperty(AlterDatabasePropertyInfo log) {
+        return logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
+    }
+
+    public long logGcBinlog(BinlogGcInfo log) {
+        return logEdit(OperationType.OP_GC_BINLOG, log);
     }
 
-    public void logGcBinlog(BinlogGcInfo log) {
-        logEdit(OperationType.OP_GC_BINLOG, log);
+    public long logBarrier() {
+        return logEdit(OperationType.OP_BARRIER, new BarrierLog());
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 674374f917..451a3eb7aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -308,6 +308,8 @@ public class OperationType {
 
     public static final short OP_GC_BINLOG = 435;
 
+    public static final short OP_BARRIER = 436;
+
 
     /**
      * Get opcode name by op code.
diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java
index 50d39e0ab6..97e689b697 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java
@@ -228,7 +228,7 @@ public class BackupHandlerTest {
                 BackupJobInfo info = BackupJobInfo.fromCatalog(System.currentTimeMillis(),
                         "ss2", CatalogMocker.TEST_DB_NAME,
                         CatalogMocker.TEST_DB_ID, BackupStmt.BackupContent.ALL,
-                        backupMeta, snapshotInfos);
+                        backupMeta, snapshotInfos, null);
                 infos.add(info);
                 return Status.OK;
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org