You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/28 03:10:19 UTC
[doris] branch master updated: [improvement](backup) Add BackupJobInfo with tableCommitSeqMap (#21255)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 08fe22cb0c [improvement](backup) Add BackupJobInfo with tableCommitSeqMap (#21255)
08fe22cb0c is described below
commit 08fe22cb0c1c16aaaf23b69d167fdab397c3742e
Author: Jack Drogon <ja...@gmail.com>
AuthorDate: Wed Jun 28 11:10:12 2023 +0800
[improvement](backup) Add BackupJobInfo with tableCommitSeqMap (#21255)
Signed-off-by: Jack Drogon <ja...@gmail.com>
---
.../java/org/apache/doris/backup/BackupJob.java | 23 +++++++++++++--
.../org/apache/doris/backup/BackupJobInfo.java | 6 +++-
.../org/apache/doris/journal/JournalEntity.java | 6 ++++
.../java/org/apache/doris/persist/BarrierLog.java | 34 ++++++++++++++++++++++
.../java/org/apache/doris/persist/EditLog.java | 17 ++++++++---
.../org/apache/doris/persist/OperationType.java | 2 ++
.../org/apache/doris/backup/BackupHandlerTest.java | 2 +-
7 files changed, 82 insertions(+), 8 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 61eda11a79..565efda2d0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -75,6 +75,7 @@ import java.util.stream.Collectors;
public class BackupJob extends AbstractJob {
private static final Logger LOG = LogManager.getLogger(BackupJob.class);
+ private static final String TABLE_COMMIT_SEQ_PREFIX = "table_commit_seq:";
public enum BackupJobState {
PENDING, // Job is newly created. Send snapshot tasks and save copied meta info, then transfer to SNAPSHOTING
@@ -110,7 +111,7 @@ public class BackupJob extends AbstractJob {
// save the local file path of meta info and job info file
private String localMetaInfoFilePath = null;
private String localJobInfoFilePath = null;
- // backup properties
+ // backup properties && table commit seq with table id
private Map<String, String> properties = Maps.newHashMap();
private byte[] metaInfoBytes = null;
@@ -431,6 +432,12 @@ public class BackupJob extends AbstractJob {
private void prepareSnapshotTaskForOlapTableWithoutLock(OlapTable olapTable,
TableRef backupTableRef, AgentBatchTask batchTask) {
+ // Add barrier editolog for barrier commit seq
+ long commitSeq = env.getEditLog().logBarrier();
+ // format as "table:{tableId}"
+ String tableKey = String.format("%s%d", TABLE_COMMIT_SEQ_PREFIX, olapTable.getId());
+ properties.put(tableKey, String.valueOf(commitSeq));
+
// check backup table again
if (backupTableRef.getPartitionNames() != null) {
for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) {
@@ -680,8 +687,20 @@ public class BackupJob extends AbstractJob {
metaInfoBytes = Files.readAllBytes(metaInfoFile.toPath());
// 3. save job info file
+ Map<Long, Long> tableCommitSeqMap = Maps.newHashMap();
+ // iterate properties, convert key, value from string to long
+ // key is "${TABLE_COMMIT_SEQ_PREFIX}{tableId}", only need tableId to long
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (key.startsWith(TABLE_COMMIT_SEQ_PREFIX)) {
+ long tableId = Long.parseLong(key.substring(TABLE_COMMIT_SEQ_PREFIX.length()));
+ long commitSeq = Long.parseLong(value);
+ tableCommitSeqMap.put(tableId, commitSeq);
+ }
+ }
jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, dbId,
- getContent(), backupMeta, snapshotInfos);
+ getContent(), backupMeta, snapshotInfos, tableCommitSeqMap);
LOG.debug("job info: {}. {}", jobInfo, this);
File jobInfoFile = new File(jobDir, Repository.PREFIX_JOB_INFO + createTimeStr);
if (!jobInfoFile.createNewFile()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
index 4457740440..fff2b755c1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
@@ -99,6 +99,9 @@ public class BackupJobInfo implements Writable {
@SerializedName("tablet_snapshot_path_map")
public Map<Long, String> tabletSnapshotPathMap = Maps.newHashMap();
+ @SerializedName("table_commit_seq_map")
+ public Map<Long, Long> tableCommitSeqMap;
+
public static class ExtraInfo {
public static class NetworkAddrss {
@SerializedName("ip")
@@ -575,7 +578,7 @@ public class BackupJobInfo implements Writable {
public static BackupJobInfo fromCatalog(long backupTime, String label, String dbName, long dbId,
BackupContent content, BackupMeta backupMeta,
- Map<Long, SnapshotInfo> snapshotInfos) {
+ Map<Long, SnapshotInfo> snapshotInfos, Map<Long, Long> tableCommitSeqMap) {
BackupJobInfo jobInfo = new BackupJobInfo();
jobInfo.backupTime = backupTime;
@@ -584,6 +587,7 @@ public class BackupJobInfo implements Writable {
jobInfo.dbId = dbId;
jobInfo.metaVersion = FeConstants.meta_version;
jobInfo.content = content;
+ jobInfo.tableCommitSeqMap = tableCommitSeqMap;
Collection<Table> tbls = backupMeta.getTables().values();
// tbls
diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index 39b804eb40..2a29b18c85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -69,6 +69,7 @@ import org.apache.doris.persist.AlterViewInfo;
import org.apache.doris.persist.AnalyzeDeletionLog;
import org.apache.doris.persist.BackendReplicasInfo;
import org.apache.doris.persist.BackendTabletsInfo;
+import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.BatchDropInfo;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
import org.apache.doris.persist.BatchRemoveTransactionsOperation;
@@ -829,6 +830,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_BARRIER: {
+ data = new BarrierLog();
+ isRead = true;
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
new file mode 100644
index 0000000000..e44cbb2f31
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/BarrierLog.java
@@ -0,0 +1,34 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.persist;
+
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.io.Writable;
+
+import java.io.DataOutput;
+import java.io.IOException;
+
+public class BarrierLog implements Writable {
+ public BarrierLog() {
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, "");
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 4244664d43..7be7253f1d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -1037,6 +1037,11 @@ public class EditLog {
env.replayGcBinlog(binlogGcInfo);
break;
}
+ case OperationType.OP_BARRIER: {
+ // the log only for barrier commit seq, not need to replay
+ LOG.info("replay barrier");
+ break;
+ }
default: {
IOException e = new IOException();
LOG.error("UNKNOWN Operation Type {}", opCode, e);
@@ -1800,11 +1805,15 @@ public class EditLog {
logEdit(OperationType.OP_DELETE_ANALYSIS_TASK, log);
}
- public void logAlterDatabaseProperty(AlterDatabasePropertyInfo log) {
- logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
+ public long logAlterDatabaseProperty(AlterDatabasePropertyInfo log) {
+ return logEdit(OperationType.OP_ALTER_DATABASE_PROPERTY, log);
+ }
+
+ public long logGcBinlog(BinlogGcInfo log) {
+ return logEdit(OperationType.OP_GC_BINLOG, log);
}
- public void logGcBinlog(BinlogGcInfo log) {
- logEdit(OperationType.OP_GC_BINLOG, log);
+ public long logBarrier() {
+ return logEdit(OperationType.OP_BARRIER, new BarrierLog());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 674374f917..451a3eb7aa 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -308,6 +308,8 @@ public class OperationType {
public static final short OP_GC_BINLOG = 435;
+ public static final short OP_BARRIER = 436;
+
/**
* Get opcode name by op code.
diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java
index 50d39e0ab6..97e689b697 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java
@@ -228,7 +228,7 @@ public class BackupHandlerTest {
BackupJobInfo info = BackupJobInfo.fromCatalog(System.currentTimeMillis(),
"ss2", CatalogMocker.TEST_DB_NAME,
CatalogMocker.TEST_DB_ID, BackupStmt.BackupContent.ALL,
- backupMeta, snapshotInfos);
+ backupMeta, snapshotInfos, null);
infos.add(info);
return Status.OK;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org