You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2021/01/28 10:50:40 UTC

[incubator-doris] branch master updated: [BackupAndRestore] Support backup and restore view and external odbc table (#5299)

This is an automated email from the ASF dual-hosted git repository.

lingmiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bd22bc  [BackupAndRestore] Support backup and restore view and external odbc table (#5299)
6bd22bc is described below

commit 6bd22bc573c3f68fe9f874ad212bfabb19e3d8c0
Author: EmmyMiao87 <52...@qq.com>
AuthorDate: Thu Jan 28 18:50:18 2021 +0800

    [BackupAndRestore] Support backup and restore view and external odbc table (#5299)
    
    [BackupAndRestore] Support backup and restore view and external odbc table
    
    1. Support backup and restore view and odbc table. The syntax is the same as that of the backup and restore table.
    2. If the table associated with the view does not exist in the snapshot,
       the view can still be backed up successfully, but the TableNotFound exception will be thrown when querying the view.
    3. If the odbc table associated with the odbc resource, the odbc resource will be backuped and restored together.
    4. If the same view, odbc table and resource already exists in the database, it will compare whether the metadata of snapshot is consistent.
       If it is inconsistent, the restoration will fail.
    4. This pr also modified the json format of the backup information.
       A `new_backup_objects` object is added to the root node to store backup meta-information other than olap table,
       such as views and external tables.
       ```
       {
           "backup_objects": {},
           "new_backup_objects": {
               "view": [
                   {"name": "view1", "id": "10001"}
               ],
               "odbc_table": [
                   {"name":"xxx", xxx}
               ]
               "odbc_resources": [
                   {"name": "bj_oracle"}
               ]
           }
       }
       ```
    5. This pr changes the serialization and deserialization method of backup information
       from manual construction to automatic analysis by Gson tools.
    
    Change-Id: I216469bf2a6484177185d8354dcca2dc19f653f3
---
 .../org/apache/doris/backup/BackupHandler.java     | 113 +++--
 .../java/org/apache/doris/backup/BackupJob.java    | 254 +++++++---
 .../org/apache/doris/backup/BackupJobInfo.java     | 523 ++++++++++++---------
 .../java/org/apache/doris/backup/BackupMeta.java   |  35 +-
 .../java/org/apache/doris/backup/RestoreJob.java   | 415 +++++++++++-----
 .../java/org/apache/doris/catalog/Catalog.java     |   2 +-
 .../java/org/apache/doris/catalog/Database.java    |  20 +-
 .../java/org/apache/doris/catalog/EsTable.java     |  53 +--
 .../java/org/apache/doris/catalog/MetaObject.java  |  10 +-
 .../java/org/apache/doris/catalog/MysqlTable.java  |  47 +-
 .../apache/doris/catalog/OdbcCatalogResource.java  |  43 +-
 .../java/org/apache/doris/catalog/OdbcTable.java   |  74 +--
 .../java/org/apache/doris/catalog/OlapTable.java   |   3 +-
 .../java/org/apache/doris/catalog/Resource.java    |  20 +
 .../java/org/apache/doris/catalog/ResourceMgr.java |  28 +-
 .../org/apache/doris/catalog/SparkResource.java    |   9 +-
 .../main/java/org/apache/doris/catalog/View.java   |  46 +-
 .../org/apache/doris/common/AnalysisException.java |   1 +
 .../org/apache/doris/common/FeMetaVersion.java     |   4 +-
 .../java/org/apache/doris/common/io/DeepCopy.java  |  41 +-
 .../java/org/apache/doris/common/util/Util.java    |   1 +
 .../java/org/apache/doris/persist/EditLog.java     |   2 +-
 .../org/apache/doris/backup/BackupHandlerTest.java |   5 +-
 .../org/apache/doris/backup/BackupJobInfoTest.java |  28 +-
 .../org/apache/doris/backup/BackupJobTest.java     |   2 +-
 .../org/apache/doris/backup/RestoreJobTest.java    |  27 +-
 .../org/apache/doris/common/io/DeepCopyTest.java   |   3 +-
 27 files changed, 1159 insertions(+), 650 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
index 445dcc7..d454ce0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupHandler.java
@@ -28,10 +28,9 @@ import org.apache.doris.analysis.RestoreStmt;
 import org.apache.doris.analysis.TableRef;
 import org.apache.doris.backup.AbstractJob.JobType;
 import org.apache.doris.backup.BackupJob.BackupJobState;
-import org.apache.doris.backup.BackupJobInfo.BackupTableInfo;
+import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Database;
-import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Table;
@@ -274,14 +273,15 @@ public class BackupHandler extends MasterDaemon implements Writable {
         // This is just a pre-check to avoid most of invalid backup requests.
         // Also calculate the signature for incremental backup check.
         List<TableRef> tblRefs = stmt.getTableRefs();
-
-        List<Table> backupTbls = Lists.newArrayList();
         for (TableRef tblRef : tblRefs) {
             String tblName = tblRef.getName().getTbl();
             Table tbl = db.getTable(tblName);
             if (tbl == null) {
                 ErrorReport.reportDdlException(ErrorCode.ERR_BAD_TABLE_ERROR, tblName);
             }
+            if (tbl.getType() == TableType.VIEW || tbl.getType() == TableType.ODBC) {
+                continue;
+            }
             if (tbl.getType() != TableType.OLAP) {
                 ErrorReport.reportDdlException(ErrorCode.ERR_NOT_OLAP_TABLE, tblName);
             }
@@ -307,22 +307,11 @@ public class BackupHandler extends MasterDaemon implements Writable {
                         }
                     }
                 }
-
-                // copy a table with selected partitions for calculating the signature
-                List<String> reservedPartitions = partitionNames == null ? null : partitionNames.getPartitionNames();
-                OlapTable copiedTbl = olapTbl.selectiveCopy(reservedPartitions, true, IndexExtState.VISIBLE);
-                if (copiedTbl == null) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                            "Failed to copy table " + tblName + " with selected partitions");
-                }
-                backupTbls.add(copiedTbl);
             } finally {
                 tbl.readUnlock();
             }
         }
 
-        BackupMeta curBackupMeta = new BackupMeta(backupTbls);
-
         // Check if label already be used
         List<String> existSnapshotNames = Lists.newArrayList();
         Status st = repository.listSnapshots(existSnapshotNames);
@@ -336,24 +325,6 @@ public class BackupHandler extends MasterDaemon implements Writable {
             } else {
                 ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR, "Currently does not support "
                         + "incremental backup");
-
-                // TODO:
-                // This is a incremental backup, the existing snapshot in repository will be treated
-                // as base snapshot.
-                // But first we need to check if the existing snapshot has same meta.
-                List<BackupMeta> backupMetas = Lists.newArrayList();
-                st = repository.getSnapshotMetaFile(stmt.getLabel(), backupMetas, -1);
-                if (!st.ok()) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                                                   "Failed to get existing meta info for repository: "
-                                                           + st.getErrMsg());
-                }
-                Preconditions.checkState(backupMetas.size() == 1);
-
-                if (!curBackupMeta.compatibleWith(backupMetas.get(0))) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                                                   "Can not make incremental backup. Meta does not compatible");
-                }
             }
         }
 
@@ -401,42 +372,66 @@ public class BackupHandler extends MasterDaemon implements Writable {
 
     private void checkAndFilterRestoreObjsExistInSnapshot(BackupJobInfo jobInfo, List<TableRef> tblRefs)
             throws DdlException {
-        Set<String> allTbls = Sets.newHashSet();
+        Set<String> olapTableNames = Sets.newHashSet();
+        Set<String> viewNames = Sets.newHashSet();
+        Set<String> odbcTableNames = Sets.newHashSet();
         for (TableRef tblRef : tblRefs) {
             String tblName = tblRef.getName().getTbl();
-            if (!jobInfo.containsTbl(tblName)) {
+            TableType tableType = jobInfo.getTypeByTblName(tblName);
+            if (tableType == null) {
                 ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                                               "Table " + tblName + " does not exist in snapshot " + jobInfo.name);
+                        "Table " + tblName + " does not exist in snapshot " + jobInfo.name);
             }
-            BackupTableInfo tblInfo = jobInfo.getTableInfo(tblName);
-            PartitionNames partitionNames = tblRef.getPartitionNames();
-            if (partitionNames != null) {
-                if (partitionNames.isTemp()) {
-                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                            "Do not support restoring temporary partitions");
-                }
-                // check the selected partitions
-                for (String partName : partitionNames.getPartitionNames()) {
-                    if (!tblInfo.containsPart(partName)) {
-                        ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
-                                                       "Partition " + partName + " of table " + tblName
-                                                               + " does not exist in snapshot " + jobInfo.name);
-                    }
-                }
+            switch (tableType) {
+                case OLAP:
+                    checkAndFilterRestoreOlapTableExistInSnapshot(jobInfo.backupOlapTableObjects, tblRef);
+                    olapTableNames.add(tblName);
+                    break;
+                case VIEW:
+                    viewNames.add(tblName);
+                    break;
+                case ODBC:
+                    odbcTableNames.add(tblName);
+                    break;
+                default:
+                    break;
             }
-            
+
             // set alias
             if (tblRef.hasExplicitAlias()) {
                 jobInfo.setAlias(tblName, tblRef.getExplicitAlias());
             }
-
-            // only retain restore partitions
-            tblInfo.retainPartitions(partitionNames == null ? null : partitionNames.getPartitionNames());
-            allTbls.add(tblName);
         }
-        
+
         // only retain restore tables
-        jobInfo.retainTables(allTbls);
+        jobInfo.retainOlapTables(olapTableNames);
+        jobInfo.retainView(viewNames);
+        jobInfo.retainOdbcTables(odbcTableNames);
+    }
+
+
+
+    public void checkAndFilterRestoreOlapTableExistInSnapshot(Map<String, BackupOlapTableInfo> backupOlapTableInfoMap,
+                                                              TableRef tableRef) throws DdlException {
+        String tblName = tableRef.getName().getTbl();
+        BackupOlapTableInfo tblInfo = backupOlapTableInfoMap.get(tblName);
+        PartitionNames partitionNames = tableRef.getPartitionNames();
+        if (partitionNames != null) {
+            if (partitionNames.isTemp()) {
+                ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+                        "Do not support restoring temporary partitions");
+            }
+            // check the selected partitions
+            for (String partName : partitionNames.getPartitionNames()) {
+                if (!tblInfo.containsPart(partName)) {
+                    ErrorReport.reportDdlException(ErrorCode.ERR_COMMON_ERROR,
+                            "Partition " + partName + " of table " + tblName
+                                    + " does not exist in snapshot");
+                }
+            }
+        }
+        // only retain restore partitions
+        tblInfo.retainPartitions(partitionNames == null ? null : partitionNames.getPartitionNames());
     }
 
     public void cancel(CancelBackupStmt stmt) throws DdlException {
@@ -445,7 +440,7 @@ public class BackupHandler extends MasterDaemon implements Writable {
         if (db == null) {
             ErrorReport.reportDdlException(ErrorCode.ERR_BAD_DB_ERROR, dbName);
         }
-        
+
         AbstractJob job = dbIdToBackupOrRestoreJob.get(db.getId());
         if (job == null || (job instanceof BackupJob && stmt.isRestore())
                 || (job instanceof RestoreJob && !stmt.isRestore())) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
index 914ba56..dd96fa8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJob.java
@@ -24,12 +24,16 @@ import org.apache.doris.catalog.Database;
 import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
 import org.apache.doris.catalog.Replica;
+import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Table.TableType;
 import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.View;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.TimeUtils;
 import org.apache.doris.task.AgentBatchTask;
@@ -108,13 +112,15 @@ public class BackupJob extends AbstractJob {
     // save the local file path of meta info and job info file
     private String localMetaInfoFilePath = null;
     private String localJobInfoFilePath = null;
+    // backup properties
+    private Map<String, String> properties = Maps.newHashMap();
 
     public BackupJob() {
         super(JobType.BACKUP);
     }
 
     public BackupJob(String label, long dbId, String dbName, List<TableRef> tableRefs, long timeoutMs,
-            Catalog catalog, long repoId) {
+                     Catalog catalog, long repoId) {
         super(JobType.BACKUP, label, dbId, dbName, timeoutMs, catalog, repoId);
         this.tableRefs = tableRefs;
         this.state = BackupJobState.PENDING;
@@ -341,14 +347,10 @@ public class BackupJob extends AbstractJob {
 
         // generate job id
         jobId = catalog.getNextId();
-        AgentBatchTask batchTask = new AgentBatchTask();
-
         unfinishedTaskIds.clear();
         taskProgress.clear();
         taskErrMsg.clear();
-
-        List<Table> copiedTables = Lists.newArrayList();
-
+        AgentBatchTask batchTask = new AgentBatchTask();
         for (TableRef tableRef : tableRefs) {
             String tblName = tableRef.getName().getTbl();
             Table tbl = db.getTable(tblName);
@@ -356,84 +358,34 @@ public class BackupJob extends AbstractJob {
                 status = new Status(ErrCode.NOT_FOUND, "table " + tblName + " does not exist");
                 return;
             }
-            if (tbl.getType() != TableType.OLAP) {
-                status = new Status(ErrCode.COMMON_ERROR, "table " + tblName + " is not OLAP table");
-                return;
-            }
-
-            OlapTable olapTbl = (OlapTable) tbl;
-            olapTbl.readLock();
-            try {
-                // check backup table again
-                if (tableRef.getPartitionNames() != null) {
-                    for (String partName : tableRef.getPartitionNames().getPartitionNames()) {
-                        Partition partition = olapTbl.getPartition(partName);
-                        if (partition == null) {
-                            status = new Status(ErrCode.NOT_FOUND, "partition " + partName
-                                    + " does not exist  in table" + tblName);
+            switch (tbl.getType()){
+                case OLAP:
+                    prepareAndSendSnapshotTaskForOlapTable((OlapTable) tbl, tableRef, batchTask);
+                    break;
+                case VIEW:
+                    break;
+                case ODBC:
+                    OdbcTable odbcTable = (OdbcTable) tbl;
+                    if (odbcTable.getOdbcCatalogResourceName() != null) {
+                        String odbcResourceName = odbcTable.getOdbcCatalogResourceName();
+                        Resource resource = Catalog.getCurrentCatalog().getResourceMgr()
+                                .getResource(odbcResourceName);
+                        if (resource == null) {
+                            status = new Status(ErrCode.NOT_FOUND, "resource " + odbcResourceName
+                                    + " related to " + tblName + "does not exist.");
                             return;
                         }
                     }
-                }
-
-                // create snapshot tasks
-                List<Partition> partitions = Lists.newArrayList();
-                if (tableRef.getPartitionNames() == null) {
-                    partitions.addAll(olapTbl.getPartitions());
-                } else {
-                    for (String partName : tableRef.getPartitionNames().getPartitionNames()) {
-                        Partition partition = tbl.getPartition(partName);
-                        partitions.add(partition);
-                    }
-                }
-
-                // snapshot partitions
-                for (Partition partition : partitions) {
-                    long visibleVersion = partition.getVisibleVersion();
-                    long visibleVersionHash = partition.getVisibleVersionHash();
-                    List<MaterializedIndex> indexes = partition.getMaterializedIndices(IndexExtState.VISIBLE);
-                    for (MaterializedIndex index : indexes) {
-                        int schemaHash = olapTbl.getSchemaHashByIndexId(index.getId());
-                        List<Tablet> tablets = index.getTablets();
-                        for (Tablet tablet : tablets) {
-                            Replica replica = chooseReplica(tablet, visibleVersion, visibleVersionHash);
-                            if (replica == null) {
-                                status = new Status(ErrCode.COMMON_ERROR,
-                                        "failed to choose replica to make snapshot for tablet " + tablet.getId()
-                                                + ". visible version: " + visibleVersion
-                                                + ", visible version hash: " + visibleVersionHash);
-                                return;
-                            }
-                            SnapshotTask task = new SnapshotTask(null, replica.getBackendId(), tablet.getId(),
-                                    jobId, dbId, tbl.getId(), partition.getId(),
-                                    index.getId(), tablet.getId(),
-                                    visibleVersion, visibleVersionHash,
-                                    schemaHash, timeoutMs, false /* not restore task */);
-                            batchTask.addTask(task);
-                            unfinishedTaskIds.put(tablet.getId(), replica.getBackendId());
-                        }
-                    }
-
-                    LOG.info("snapshot for partition {}, version: {}, version hash: {}",
-                            partition.getId(), visibleVersion, visibleVersionHash);
-                }
-
-                // only copy visible indexes
-                List<String> reservedPartitions = tableRef.getPartitionNames() == null ? null
-                        : tableRef.getPartitionNames().getPartitionNames();
-                OlapTable copiedTbl = olapTbl.selectiveCopy(reservedPartitions, true, IndexExtState.VISIBLE);
-                if (copiedTbl == null) {
-                    status = new Status(ErrCode.COMMON_ERROR, "failed to copy table: " + tblName);
+                    break;
+                default:
+                    status = new Status(ErrCode.COMMON_ERROR,
+                            "backup job does not support this type of table " + tblName);
                     return;
-                }
-                copiedTables.add(copiedTbl);
-            } finally {
-                olapTbl.readUnlock();
             }
-
         }
 
-        backupMeta = new BackupMeta(copiedTables);
+        // copy all related schema at this moment
+        prepareBackupMeta(db);
 
         // send tasks
         for (AgentTask task : batchTask.getAllTasks()) {
@@ -448,6 +400,134 @@ public class BackupJob extends AbstractJob {
         LOG.info("finished to send snapshot tasks to backend. {}", this);
     }
 
+    private void prepareAndSendSnapshotTaskForOlapTable(OlapTable olapTable, TableRef backupTableRef, AgentBatchTask batchTask) {
+        String tblName = backupTableRef.getName().getTbl();
+        if (backupTableRef.getPartitionNames() != null) {
+            for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) {
+                Partition partition = olapTable.getPartition(partName);
+                if (partition == null) {
+                    status = new Status(ErrCode.NOT_FOUND, "partition " + partName
+                            + " does not exist  in table" + tblName);
+                    return;
+                }
+            }
+        }
+        olapTable.readLock();
+        try {
+            // check backup table again
+            if (backupTableRef.getPartitionNames() != null) {
+                for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) {
+                    Partition partition = olapTable.getPartition(partName);
+                    if (partition == null) {
+                        status = new Status(ErrCode.NOT_FOUND, "partition " + partName
+                                + " does not exist  in table" + tblName);
+                        return;
+                    }
+                }
+            }
+
+            // create snapshot tasks
+            List<Partition> partitions = Lists.newArrayList();
+            if (backupTableRef.getPartitionNames() == null) {
+                partitions.addAll(olapTable.getPartitions());
+            } else {
+                for (String partName : backupTableRef.getPartitionNames().getPartitionNames()) {
+                    Partition partition = olapTable.getPartition(partName);
+                    partitions.add(partition);
+                }
+            }
+
+            // snapshot partitions
+            for (Partition partition : partitions) {
+                long visibleVersion = partition.getVisibleVersion();
+                long visibleVersionHash = partition.getVisibleVersionHash();
+                List<MaterializedIndex> indexes = partition.getMaterializedIndices(IndexExtState.VISIBLE);
+                for (MaterializedIndex index : indexes) {
+                    int schemaHash = olapTable.getSchemaHashByIndexId(index.getId());
+                    List<Tablet> tablets = index.getTablets();
+                    for (Tablet tablet : tablets) {
+                        Replica replica = chooseReplica(tablet, visibleVersion, visibleVersionHash);
+                        if (replica == null) {
+                            status = new Status(ErrCode.COMMON_ERROR,
+                                    "failed to choose replica to make snapshot for tablet " + tablet.getId()
+                                            + ". visible version: " + visibleVersion
+                                            + ", visible version hash: " + visibleVersionHash);
+                            return;
+                        }
+                        SnapshotTask task = new SnapshotTask(null, replica.getBackendId(), tablet.getId(),
+                                jobId, dbId, olapTable.getId(), partition.getId(),
+                                index.getId(), tablet.getId(),
+                                visibleVersion, visibleVersionHash,
+                                schemaHash, timeoutMs, false /* not restore task */);
+                        batchTask.addTask(task);
+                        unfinishedTaskIds.put(tablet.getId(), replica.getBackendId());
+                    }
+                }
+
+                LOG.info("snapshot for partition {}, version: {}, version hash: {}",
+                        partition.getId(), visibleVersion, visibleVersionHash);
+            }
+        } finally {
+            olapTable.readUnlock();
+        }
+    }
+
+    private void prepareBackupMeta(Database db) {
+        // copy all related schema at this moment
+        List<Table> copiedTables = Lists.newArrayList();
+        List<Resource> copiedResources = Lists.newArrayList();
+        for (TableRef tableRef : tableRefs) {
+            String tblName = tableRef.getName().getTbl();
+            Table table = db.getTable(tblName);
+            table.readLock();
+            try {
+                if (table.getType() == TableType.OLAP) {
+                    OlapTable olapTable = (OlapTable) table;
+                    // only copy visible indexes
+                    List<String> reservedPartitions = tableRef.getPartitionNames() == null ? null
+                            : tableRef.getPartitionNames().getPartitionNames();
+                    OlapTable copiedTbl = olapTable.selectiveCopy(reservedPartitions, true, IndexExtState.VISIBLE);
+                    if (copiedTbl == null) {
+                        status = new Status(ErrCode.COMMON_ERROR, "failed to copy table: " + tblName);
+                        return;
+                    }
+                    copiedTables.add(copiedTbl);
+                } else if (table.getType() == TableType.VIEW) {
+                    View view = (View) table;
+                    View copiedView = view.clone();
+                    if (copiedView == null) {
+                        status = new Status(ErrCode.COMMON_ERROR, "failed to copy view: " + tblName);
+                        return;
+                    }
+                    copiedTables.add(copiedView);
+                } else if (table.getType() == TableType.ODBC) {
+                    OdbcTable odbcTable = (OdbcTable) table;
+                    OdbcTable copiedOdbcTable = odbcTable.clone();
+                    if (copiedOdbcTable == null) {
+                        status = new Status(ErrCode.COMMON_ERROR, "failed to copy odbc table: " + tblName);
+                        return;
+                    }
+                    copiedTables.add(copiedOdbcTable);
+                    if (copiedOdbcTable.getOdbcCatalogResourceName() != null) {
+                        Resource resource = Catalog.getCurrentCatalog().getResourceMgr()
+                                .getResource(copiedOdbcTable.getOdbcCatalogResourceName());
+                        Resource copiedResource = resource.clone();
+                        if (copiedResource == null) {
+                            status = new Status(ErrCode.COMMON_ERROR, "failed to copy odbc resource: "
+                                    + resource.getName());
+                            return;
+                        }
+                        copiedResources.add(copiedResource);
+                    }
+                }
+            }finally {
+                table.readUnlock();
+            }
+        }
+        backupMeta = new BackupMeta(copiedTables, copiedResources);
+    }
+
+
     private void waitingAllSnapshotsFinished() {
         if (unfinishedTaskIds.isEmpty()) {
             snapshotFinishedTime = System.currentTimeMillis();
@@ -568,7 +648,7 @@ public class BackupJob extends AbstractJob {
             localMetaInfoFilePath = metaInfoFile.getAbsolutePath();
 
             // 3. save job info file
-            jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, dbId, backupMeta.getTables().values(),
+            jobInfo = BackupJobInfo.fromCatalog(createTime, label, dbName, dbId, backupMeta,
                                                 snapshotInfos);
             LOG.debug("job info: {}. {}", jobInfo, this);
             File jobInfoFile = new File(jobDir, Repository.PREFIX_JOB_INFO + createTimeStr);
@@ -800,6 +880,13 @@ public class BackupJob extends AbstractJob {
             out.writeBoolean(true);
             Text.writeString(out, localJobInfoFilePath);
         }
+
+        // write properties
+        out.writeInt(properties.size());
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            Text.writeString(out, entry.getKey());
+            Text.writeString(out, entry.getValue());
+        }
     }
 
     public void readFields(DataInput in) throws IOException {
@@ -843,6 +930,17 @@ public class BackupJob extends AbstractJob {
         if (in.readBoolean()) {
             localJobInfoFilePath = Text.readString(in);
         }
+
+        if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_95) {
+            return;
+        }
+        // read properties
+        size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            String key = Text.readString(in);
+            String value = Text.readString(in);
+            properties.put(key, value);
+        }
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
index 10acdd0..b2dff76 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupJobInfo.java
@@ -20,24 +20,24 @@ package org.apache.doris.backup;
 import org.apache.doris.backup.RestoreFileMapping.IdChain;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
+import org.apache.doris.catalog.OdbcCatalogResource;
+import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.View;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 
 import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -53,7 +53,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.Expose;
+import com.google.gson.annotations.SerializedName;
+import org.glassfish.jersey.internal.guava.Sets;
 
 /*
  * This is a memory structure mapping the job info file in repository.
@@ -64,28 +69,85 @@ public class BackupJobInfo implements Writable {
     private static final Logger LOG = LogManager.getLogger(BackupJobInfo.class);
 
     public String name;
+    @SerializedName("database")
     public String dbName;
+    @SerializedName("id")
     public long dbId;
+    @SerializedName("backup_time")
     public long backupTime;
-    public Map<String, BackupTableInfo> tables = Maps.newHashMap();
-    public boolean success;
-
+    // only include olap table
+    @SerializedName("backup_objects")
+    public Map<String, BackupOlapTableInfo> backupOlapTableObjects = Maps.newHashMap();
+    // include other objects: view, external table
+    @SerializedName("new_backup_objects")
+    public NewBackupObjects newBackupObjects = new NewBackupObjects();
+    public boolean success = true;
+    @SerializedName("backup_result")
+    public String successJson = "succeed";
+
+    @SerializedName("meta_version")
     public int metaVersion;
 
     // This map is used to save the table alias mapping info when processing a restore job.
     // origin -> alias
+    @Expose(serialize = false, deserialize = false)
     public Map<String, String> tblAlias = Maps.newHashMap();
 
-    public boolean containsTbl(String tblName) {
-        return tables.containsKey(tblName);
+    public void initBackupJobInfoAfterDeserialize() {
+        // transform success
+        if (successJson.equals("succeed")) {
+            success = true;
+        } else {
+            success = false;
+        }
+
+        // init meta version
+        if (metaVersion == 0) {
+            // meta_version does not exist
+            metaVersion = FeConstants.meta_version;
+        }
+
+        // init olap table info
+        for (BackupOlapTableInfo backupOlapTableInfo : backupOlapTableObjects.values()) {
+            for (BackupPartitionInfo backupPartitionInfo : backupOlapTableInfo.partitions.values()) {
+                for (BackupIndexInfo backupIndexInfo : backupPartitionInfo.indexes.values()) {
+                    List<Long> sortedTabletIds = backupIndexInfo.getSortedTabletIds();
+                    for (Long tabletId : sortedTabletIds) {
+                        List<String> files = backupIndexInfo.getTabletFiles(tabletId);
+                        if (files == null) {
+                            continue;
+                        }
+                        BackupTabletInfo backupTabletInfo = new BackupTabletInfo(tabletId, files);
+                        backupIndexInfo.sortedTabletInfoList.add(backupTabletInfo);
+                    }
+                }
+            }
+        }
+    }
+
+    public Table.TableType getTypeByTblName(String tblName) {
+        if (backupOlapTableObjects.containsKey(tblName)) {
+            return Table.TableType.OLAP;
+        }
+        for (BackupViewInfo backupViewInfo : newBackupObjects.views) {
+            if (backupViewInfo.name.equals(tblName)) {
+                return Table.TableType.VIEW;
+            }
+        }
+        for (BackupOdbcTableInfo backupOdbcTableInfo : newBackupObjects.odbcTables) {
+            if (backupOdbcTableInfo.dorisTableName.equals(tblName)) {
+                return Table.TableType.ODBC;
+            }
+        }
+        return null;
     }
 
-    public BackupTableInfo getTableInfo(String tblName) {
-        return tables.get(tblName);
+    public BackupOlapTableInfo getOlapTableInfo(String tblName) {
+        return backupOlapTableObjects.get(tblName);
     }
 
-    public void retainTables(Set<String> tblNames) {
-        Iterator<Map.Entry<String, BackupTableInfo>> iter = tables.entrySet().iterator();
+    public void retainOlapTables(Set<String> tblNames) {
+        Iterator<Map.Entry<String, BackupOlapTableInfo>> iter = backupOlapTableObjects.entrySet().iterator();
         while (iter.hasNext()) {
             if (!tblNames.contains(iter.next().getKey())) {
                 iter.remove();
@@ -93,6 +155,33 @@ public class BackupJobInfo implements Writable {
         }
     }
 
+    public void retainView(Set<String> viewNames) {
+        Iterator<BackupViewInfo> iter = newBackupObjects.views.listIterator();
+        while (iter.hasNext()) {
+            if (!viewNames.contains(iter.next().name)) {
+                iter.remove();
+            }
+        }
+    }
+
+    public void retainOdbcTables(Set<String> odbcTableNames) {
+        Iterator<BackupOdbcTableInfo> odbcIter = newBackupObjects.odbcTables.listIterator();
+        Set<String> removedOdbcResourceNames = Sets.newHashSet();
+        while (odbcIter.hasNext()) {
+            BackupOdbcTableInfo backupOdbcTableInfo = odbcIter.next();
+            if (!odbcTableNames.contains(backupOdbcTableInfo.dorisTableName)) {
+                removedOdbcResourceNames.add(backupOdbcTableInfo.resourceName);
+                odbcIter.remove();
+            }
+        }
+        Iterator<BackupOdbcResourceInfo> resourceIter = newBackupObjects.odbcResources.listIterator();
+        while (resourceIter.hasNext()) {
+            if (removedOdbcResourceNames.contains(resourceIter.next().name)) {
+                resourceIter.remove();
+            }
+        }
+    }
+
     public void setAlias(String orig, String alias) {
         tblAlias.put(orig, alias);
     }
@@ -110,8 +199,54 @@ public class BackupJobInfo implements Writable {
         return alias;
     }
 
-    public static class BackupTableInfo {
+    public static class BriefBackupJobInfo {
+        public String name;
+        public String database;
+        @SerializedName("backup_time")
+        public long backupTime;
+        @SerializedName("olap_table_list")
+        public List<BriefBackupOlapTable> olapTableList = Lists.newArrayList();
+        @SerializedName("view_list")
+        public List<BackupViewInfo> viewList = Lists.newArrayList();
+        @SerializedName("odbc_table_list")
+        public List<BackupOdbcTableInfo> odbcTableList = Lists.newArrayList();
+        @SerializedName("odbc_resource_list")
+        public List<BackupOdbcResourceInfo> odbcResourceList = Lists.newArrayList();
+
+        public static BriefBackupJobInfo fromBackupJobInfo(BackupJobInfo backupJobInfo) {
+            BriefBackupJobInfo briefBackupJobInfo = new BriefBackupJobInfo();
+            briefBackupJobInfo.name = backupJobInfo.name;
+            briefBackupJobInfo.database = backupJobInfo.dbName;
+            briefBackupJobInfo.backupTime = backupJobInfo.backupTime;
+            for (Map.Entry<String, BackupOlapTableInfo> olapTableEntry :
+                    backupJobInfo.backupOlapTableObjects.entrySet()) {
+                BriefBackupOlapTable briefBackupOlapTable = new BriefBackupOlapTable();
+                briefBackupOlapTable.name = olapTableEntry.getKey();
+                briefBackupOlapTable.partitionNames = Lists.newArrayList(olapTableEntry.getValue().partitions.keySet());
+                briefBackupJobInfo.olapTableList.add(briefBackupOlapTable);
+            }
+            briefBackupJobInfo.viewList = backupJobInfo.newBackupObjects.views;
+            briefBackupJobInfo.odbcTableList = backupJobInfo.newBackupObjects.odbcTables;
+            briefBackupJobInfo.odbcResourceList = backupJobInfo.newBackupObjects.odbcResources;
+            return briefBackupJobInfo;
+        }
+    }
+
+    public static class BriefBackupOlapTable {
         public String name;
+        @SerializedName("partition_names")
+        public List<String> partitionNames;
+    }
+
+    public static class NewBackupObjects {
+        public List<BackupViewInfo> views = Lists.newArrayList();
+        @SerializedName("odbc_tables")
+        public List<BackupOdbcTableInfo> odbcTables = Lists.newArrayList();
+        @SerializedName("odbc_resources")
+        public List<BackupOdbcResourceInfo> odbcResources = Lists.newArrayList();
+    }
+
+    public static class BackupOlapTableInfo {
         public long id;
         public Map<String, BackupPartitionInfo> partitions = Maps.newHashMap();
 
@@ -135,20 +270,12 @@ public class BackupJobInfo implements Writable {
                 }
             }
         }
-
-        @Override
-        public String toString() {
-            StringBuilder sb = new StringBuilder();
-            sb.append("name: ").append(name).append(", id: ").append(id);
-            sb.append(", partitions: [").append(Joiner.on(", ").join(partitions.keySet())).append("]");
-            return sb.toString();
-        }
     }
 
     public static class BackupPartitionInfo {
-        public String name;
         public long id;
         public long version;
+        @SerializedName("version_hash")
         public long versionHash;
         public Map<String, BackupIndexInfo> indexes = Maps.newHashMap();
 
@@ -158,24 +285,61 @@ public class BackupJobInfo implements Writable {
     }
 
     public static class BackupIndexInfo {
-        public String name;
         public long id;
+        @SerializedName("schema_hash")
         public int schemaHash;
-        public List<BackupTabletInfo> tablets = Lists.newArrayList();
+        public Map<Long, List<String>> tablets = Maps.newHashMap();
+        @SerializedName("tablets_order")
+        public List<Long> tabletsOrder = Lists.newArrayList();
+        @Expose(serialize = false, deserialize = false)
+        public List<BackupTabletInfo> sortedTabletInfoList = Lists.newArrayList();
+
+        public List<String> getTabletFiles(long tabletId) {
+            return tablets.get(tabletId);
+        }
 
-        public BackupTabletInfo getTablet(long tabletId) {
-            for (BackupTabletInfo backupTabletInfo : tablets) {
-                if (backupTabletInfo.id == tabletId) {
-                    return backupTabletInfo;
-                }
+        private List<Long> getSortedTabletIds() {
+            if (tabletsOrder == null || tabletsOrder.isEmpty()) {
+                // in previous version, we are not saving tablets order(which was a BUG),
+                // so we have to sort the tabletIds to restore the original order of tablets.
+                List<Long> tmpList = Lists.newArrayList(tablets.keySet());
+                tmpList.sort((o1, o2) -> Long.valueOf(o1).compareTo(Long.valueOf(o2)));
+                return tmpList;
+            } else {
+                return tabletsOrder;
             }
-            return null;
         }
     }
 
     public static class BackupTabletInfo {
         public long id;
-        public List<String> files = Lists.newArrayList();
+        public List<String> files;
+
+        public BackupTabletInfo(long id, List<String> files) {
+            this.id = id;
+            this.files = files;
+        }
+    }
+
+    public static class BackupViewInfo {
+        public long id;
+        public String name;
+    }
+
+    public static class BackupOdbcTableInfo {
+        public long id;
+        @SerializedName("doris_table_name")
+        public String dorisTableName;
+        @SerializedName("linked_odbc_database_name")
+        public String linkedOdbcDatabaseName;
+        @SerializedName("linked_odbc_table_name")
+        public String linkedOdbcTableName;
+        @SerializedName("resource_name")
+        public String resourceName;
+    }
+
+    public static class BackupOdbcResourceInfo {
+        public String name;
     }
 
     // eg: __db_10001/__tbl_10002/__part_10003/__idx_10002/__10004
@@ -185,7 +349,7 @@ public class BackupJobInfo implements Writable {
             return null;
         }
 
-        BackupTableInfo tblInfo = tables.get(tbl);
+        BackupOlapTableInfo tblInfo = backupOlapTableObjects.get(tbl);
         if (tblInfo == null) {
             LOG.debug("tbl {} does not exist", tbl);
             return null;
@@ -226,46 +390,69 @@ public class BackupJobInfo implements Writable {
     }
 
     public static BackupJobInfo fromCatalog(long backupTime, String label, String dbName, long dbId,
-            Collection<Table> tbls, Map<Long, SnapshotInfo> snapshotInfos) {
+                                            BackupMeta backupMeta, Map<Long, SnapshotInfo> snapshotInfos) {
 
         BackupJobInfo jobInfo = new BackupJobInfo();
         jobInfo.backupTime = backupTime;
         jobInfo.name = label;
         jobInfo.dbName = dbName;
         jobInfo.dbId = dbId;
-        jobInfo.success = true;
         jobInfo.metaVersion = FeConstants.meta_version;
 
+        Collection<Table> tbls = backupMeta.getTables().values();
         // tbls
         for (Table tbl : tbls) {
-            OlapTable olapTbl = (OlapTable) tbl;
-            BackupTableInfo tableInfo = new BackupTableInfo();
-            tableInfo.id = tbl.getId();
-            tableInfo.name = tbl.getName();
-            jobInfo.tables.put(tableInfo.name, tableInfo);
-            // partitions
-            for (Partition partition : olapTbl.getPartitions()) {
-                BackupPartitionInfo partitionInfo = new BackupPartitionInfo();
-                partitionInfo.id = partition.getId();
-                partitionInfo.name = partition.getName();
-                partitionInfo.version = partition.getVisibleVersion();
-                partitionInfo.versionHash = partition.getVisibleVersionHash();
-                tableInfo.partitions.put(partitionInfo.name, partitionInfo);
-                // indexes
-                for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                    BackupIndexInfo idxInfo = new BackupIndexInfo();
-                    idxInfo.id = index.getId();
-                    idxInfo.name = olapTbl.getIndexNameById(index.getId());
-                    idxInfo.schemaHash = olapTbl.getSchemaHashByIndexId(index.getId());
-                    partitionInfo.indexes.put(idxInfo.name, idxInfo);
-                    // tablets
-                    for (Tablet tablet : index.getTablets()) {
-                        BackupTabletInfo tabletInfo = new BackupTabletInfo();
-                        tabletInfo.id = tablet.getId();
-                        tabletInfo.files.addAll(snapshotInfos.get(tablet.getId()).getFiles());
-                        idxInfo.tablets.add(tabletInfo);
+            if (tbl instanceof OlapTable) {
+                OlapTable olapTbl = (OlapTable) tbl;
+                BackupOlapTableInfo tableInfo = new BackupOlapTableInfo();
+                tableInfo.id = tbl.getId();
+                jobInfo.backupOlapTableObjects.put(tbl.getName(), tableInfo);
+                // partitions
+                for (Partition partition : olapTbl.getPartitions()) {
+                    BackupPartitionInfo partitionInfo = new BackupPartitionInfo();
+                    partitionInfo.id = partition.getId();
+                    partitionInfo.version = partition.getVisibleVersion();
+                    partitionInfo.versionHash = partition.getVisibleVersionHash();
+                    tableInfo.partitions.put(partition.getName(), partitionInfo);
+                    // indexes
+                    for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                        BackupIndexInfo idxInfo = new BackupIndexInfo();
+                        idxInfo.id = index.getId();
+                        idxInfo.schemaHash = olapTbl.getSchemaHashByIndexId(index.getId());
+                        partitionInfo.indexes.put(olapTbl.getIndexNameById(index.getId()), idxInfo);
+                        // tablets
+                        for (Tablet tablet : index.getTablets()) {
+                            idxInfo.tablets.put(tablet.getId(),
+                                    Lists.newArrayList(snapshotInfos.get(tablet.getId()).getFiles()));
+                        }
+                        idxInfo.tabletsOrder.addAll(index.getTabletIdsInOrder());
                     }
                 }
+            } else if (tbl instanceof View) {
+                View view = (View) tbl;
+                BackupViewInfo backupViewInfo = new BackupViewInfo();
+                backupViewInfo.id = view.getId();
+                backupViewInfo.name = view.getName();
+                jobInfo.newBackupObjects.views.add(backupViewInfo);
+            } else if (tbl instanceof OdbcTable) {
+                OdbcTable odbcTable = (OdbcTable) tbl;
+                BackupOdbcTableInfo backupOdbcTableInfo = new BackupOdbcTableInfo();
+                backupOdbcTableInfo.id = odbcTable.getId();
+                backupOdbcTableInfo.dorisTableName = odbcTable.getName();
+                backupOdbcTableInfo.linkedOdbcDatabaseName = odbcTable.getOdbcDatabaseName();
+                backupOdbcTableInfo.linkedOdbcTableName = odbcTable.getOdbcTableName();
+                backupOdbcTableInfo.resourceName = odbcTable.getOdbcCatalogResourceName();
+                jobInfo.newBackupObjects.odbcTables.add(backupOdbcTableInfo);
+            }
+        }
+        // resources
+        Collection<Resource> resources = backupMeta.getResourceNameMap().values();
+        for (Resource resource : resources) {
+            if (resource instanceof OdbcCatalogResource) {
+                OdbcCatalogResource odbcCatalogResource = (OdbcCatalogResource) resource;
+                BackupOdbcResourceInfo backupOdbcResourceInfo = new BackupOdbcResourceInfo();
+                backupOdbcResourceInfo.name = odbcCatalogResource.getName();
+                jobInfo.newBackupObjects.odbcResources.add(backupOdbcResourceInfo);
             }
         }
 
@@ -275,12 +462,10 @@ public class BackupJobInfo implements Writable {
     public static BackupJobInfo fromFile(String path) throws IOException {
         byte[] bytes = Files.readAllBytes(Paths.get(path));
         String json = new String(bytes, StandardCharsets.UTF_8);
-        BackupJobInfo jobInfo = new BackupJobInfo();
-        genFromJson(json, jobInfo);
-        return jobInfo;
+        return genFromJson(json);
     }
 
-    private static void genFromJson(String json, BackupJobInfo jobInfo) {
+    private static BackupJobInfo genFromJson(String json) {
         /* parse the json string: 
          * {
          *   "backup_time": 1522231864000,
@@ -299,9 +484,9 @@ public class BackupJobInfo implements Writable {
          *                           "schema_hash": 3473401,
          *                           "tablets": {
          *                               "10008": ["__10029_seg1.dat", "__10029_seg2.dat"],
-         *                               "10007": ["__10029_seg1.dat", "__10029_seg2.dat"]
+         *                               "10007": ["__10030_seg1.dat", "__10030_seg2.dat"]
          *                           },
-         *                           "tablets_order": ["10029", "10030"]
+         *                           "tablets_order": ["10007", "10008"]
          *                       },
          *                       "table1": {
          *                           "id": 10008,
@@ -310,7 +495,7 @@ public class BackupJobInfo implements Writable {
          *                               "10004": ["__10027_seg1.dat", "__10027_seg2.dat"],
          *                               "10005": ["__10028_seg1.dat", "__10028_seg2.dat"]
          *                           },
-         *                           "tablets_order": ["10027, "10028"]
+         *                           "tablets_order": ["10004, "10005"]
          *                       }
          *                   },
          *                   "id": 10007
@@ -320,96 +505,38 @@ public class BackupJobInfo implements Writable {
          *           },
          *           "id": 10001
          *       }
+         *   },
+         *   "new_backup_objects": {
+         *       "views": [
+         *           {"id": 1,
+         *            "name": "view1"
+         *           }
+         *       ],
+         *       "odbc_tables": [
+         *           {"id": 2,
+         *            "doris_table_name": "oracle1",
+         *            "linked_odbc_database_name": "external_db1",
+         *            "linked_odbc_table_name": "external_table1",
+         *            "resource_name": "bj_oracle"
+         *           }
+         *       ],
+         *       "odbc_resources": [
+         *           {"name": "bj_oracle"}
+         *       ]
          *   }
          * }
          */
-        JSONObject root = new JSONObject(json);
-        jobInfo.name = (String) root.get("name");
-        jobInfo.dbName = (String) root.get("database");
-        jobInfo.dbId = root.getLong("id");
-        jobInfo.backupTime = root.getLong("backup_time");
-        
-        try {
-            jobInfo.metaVersion = root.getInt("meta_version");
-        } catch (JSONException e) {
-            // meta_version does not exist
-            jobInfo.metaVersion = FeConstants.meta_version;
-        }
-        
-        JSONObject backupObjs = root.getJSONObject("backup_objects");
-        String[] tblNames = JSONObject.getNames(backupObjs);
-        for (String tblName : tblNames) {
-            BackupTableInfo tblInfo = new BackupTableInfo();
-            tblInfo.name = tblName;
-            JSONObject tbl = backupObjs.getJSONObject(tblName);
-            tblInfo.id = tbl.getLong("id");
-            JSONObject parts = tbl.getJSONObject("partitions");
-            String[] partsNames = JSONObject.getNames(parts);
-            for (String partName : partsNames) {
-                BackupPartitionInfo partInfo = new BackupPartitionInfo();
-                partInfo.name = partName;
-                JSONObject part = parts.getJSONObject(partName);
-                partInfo.id = part.getLong("id");
-                partInfo.version = part.getLong("version");
-                partInfo.versionHash = part.getLong("version_hash");
-                JSONObject indexes = part.getJSONObject("indexes");
-                String[] indexNames = JSONObject.getNames(indexes);
-                for (String idxName : indexNames) {
-                    BackupIndexInfo indexInfo = new BackupIndexInfo();
-                    indexInfo.name = idxName;
-                    JSONObject idx = indexes.getJSONObject(idxName);
-                    indexInfo.id = idx.getLong("id");
-                    indexInfo.schemaHash = idx.getInt("schema_hash");
-                    JSONObject tablets = idx.getJSONObject("tablets");
-                    String[] tabletIds = JSONObject.getNames(tablets);
-
-                    JSONArray tabletsOrder = null;
-                    if (idx.has("tablets_order")) {
-                        tabletsOrder = idx.getJSONArray("tablets_order");
-                    }
-                    String[] orderedTabletIds = sortTabletIds(tabletIds, tabletsOrder);
-                    Preconditions.checkState(tabletIds.length == orderedTabletIds.length);
-
-                    for (String tabletId : orderedTabletIds) {
-                        BackupTabletInfo tabletInfo = new BackupTabletInfo();
-                        tabletInfo.id = Long.valueOf(tabletId);
-                        JSONArray files = tablets.getJSONArray(tabletId);
-                        for (Object object : files) {
-                            tabletInfo.files.add((String) object);
-                        }
-                        indexInfo.tablets.add(tabletInfo);
-                    }
-                    partInfo.indexes.put(indexInfo.name, indexInfo);
-                }
-                tblInfo.partitions.put(partName, partInfo);
-            }
-            jobInfo.tables.put(tblName, tblInfo);
-        }
-        
-        String result = root.getString("backup_result");
-        if (result.equals("succeed")) {
-            jobInfo.success = true;
-        } else {
-            jobInfo.success = false;
-        }
+        Gson gson = new Gson();
+        BackupJobInfo jobInfo = gson.fromJson(json, BackupJobInfo.class);
+        jobInfo.initBackupJobInfoAfterDeserialize();
+        return jobInfo;
     }
 
-    private static String[] sortTabletIds(String[] tabletIds, JSONArray tabletsOrder) {
-        if (tabletsOrder == null || tabletsOrder.toList().isEmpty()) {
-            // in previous version, we are not saving tablets order(which was a BUG),
-            // so we have to sort the tabletIds to restore the original order of tablets.
-            List<String> tmpList = Lists.newArrayList(tabletIds);
-            tmpList.sort((o1, o2) -> Long.valueOf(o1).compareTo(Long.valueOf(o2)));
-            return tmpList.toArray(new String[0]);
-        } else {
-            return (String[]) tabletsOrder.toList().toArray(new String[0]);
-        }
-    }
 
     public void writeToFile(File jobInfoFile) throws FileNotFoundException {
         PrintWriter printWriter = new PrintWriter(jobInfoFile);
         try {
-            printWriter.print(toJson(true).toString());
+            printWriter.print(toJson(false));
             printWriter.flush();
         } finally {
             printWriter.close();
@@ -418,92 +545,32 @@ public class BackupJobInfo implements Writable {
 
     // Only return basic info, table and partitions
     public String getBrief() {
-        return toJson(false).toString(1);
+        BriefBackupJobInfo briefBackupJobInfo = BriefBackupJobInfo.fromBackupJobInfo(this);
+        Gson gson = new GsonBuilder().setPrettyPrinting().create();
+        return gson.toJson(briefBackupJobInfo);
     }
 
-    public JSONObject toJson(boolean verbose) {
-        JSONObject root = new JSONObject();
-        root.put("name", name);
-        root.put("database", dbName);
-        if (verbose) {
-            root.put("id", dbId);
-        }
-        root.put("backup_time", backupTime);
-        JSONObject backupObj = new JSONObject();
-        root.put("backup_objects", backupObj);
-        root.put("meta_version", FeConstants.meta_version);
-        
-        for (BackupTableInfo tblInfo : tables.values()) {
-            JSONObject tbl = new JSONObject();
-            if (verbose) {
-                tbl.put("id", tblInfo.id);
-            }
-            JSONObject parts = new JSONObject();
-            tbl.put("partitions", parts);
-            for (BackupPartitionInfo partInfo : tblInfo.partitions.values()) {
-                JSONObject part = new JSONObject();
-                if (verbose) {
-                    part.put("id", partInfo.id);
-                    part.put("version", partInfo.version);
-                    part.put("version_hash", partInfo.versionHash);
-                    JSONObject indexes = new JSONObject();
-                    part.put("indexes", indexes);
-                    for (BackupIndexInfo idxInfo : partInfo.indexes.values()) {
-                        JSONObject idx = new JSONObject();
-                        idx.put("id", idxInfo.id);
-                        idx.put("schema_hash", idxInfo.schemaHash);
-                        JSONObject tablets = new JSONObject();
-                        JSONArray tabletsOrder = new JSONArray();
-                        idx.put("tablets", tablets);
-                        for (BackupTabletInfo tabletInfo : idxInfo.tablets) {
-                            JSONArray files = new JSONArray();
-                            tablets.put(String.valueOf(tabletInfo.id), files);
-                            for (String fileName : tabletInfo.files) {
-                                files.put(fileName);
-                            }
-                            // to save the order of tablets
-                            tabletsOrder.put(String.valueOf(tabletInfo.id));
-                        }
-                        indexes.put(idxInfo.name, idx);
-                    }
-                }
-                parts.put(partInfo.name, part);
-            }
-            backupObj.put(tblInfo.name, tbl);
+    public String toJson(boolean prettyPrinting) {
+        Gson gson;
+        if (prettyPrinting) {
+            gson = new GsonBuilder().setPrettyPrinting().create();
+        } else {
+            gson = new Gson();
         }
-
-        root.put("backup_result", "succeed");
-        return root;
-    }
-
-    public String toString(int indentFactor) {
-        return toJson(true).toString(indentFactor);
+        return gson.toJson(this);
     }
 
     public String getInfo() {
-        List<String> objs = Lists.newArrayList();
-        for (BackupTableInfo tblInfo : tables.values()) {
-            StringBuilder sb = new StringBuilder();
-            sb.append(tblInfo.name);
-            List<String> partNames = tblInfo.partitions.values().stream()
-                    .filter(n -> !n.name.equals(tblInfo.name)).map(n -> n.name).collect(Collectors.toList());
-            if (!partNames.isEmpty()) {
-                sb.append(" PARTITIONS [").append(Joiner.on(", ").join(partNames)).append("]");
-            }
-            objs.add(sb.toString());
-        }
-        return Joiner.on(", ").join(objs);
+        return getBrief();
     }
 
     public static BackupJobInfo read(DataInput in) throws IOException {
-        BackupJobInfo jobInfo = new BackupJobInfo();
-        jobInfo.readFields(in);
-        return jobInfo;
+        return BackupJobInfo.readFields(in);
     }
 
     @Override
     public void write(DataOutput out) throws IOException {
-        Text.writeString(out, toJson(true).toString());
+        Text.writeString(out, toJson(false));
         out.writeInt(tblAlias.size());
         for (Map.Entry<String, String> entry : tblAlias.entrySet()) {
             Text.writeString(out, entry.getKey());
@@ -511,20 +578,20 @@ public class BackupJobInfo implements Writable {
         }
     }
 
-    public void readFields(DataInput in) throws IOException {
+    public static BackupJobInfo readFields(DataInput in) throws IOException {
         String json = Text.readString(in);
-        genFromJson(json, this);
+        BackupJobInfo backupJobInfo = genFromJson(json);
         int size = in.readInt();
         for (int i = 0; i < size; i++) {
             String tbl = Text.readString(in);
             String alias = Text.readString(in);
-            tblAlias.put(tbl, alias);
+            backupJobInfo.tblAlias.put(tbl, alias);
         }
+        return backupJobInfo;
     }
 
-    @Override
     public String toString() {
-        return toJson(true).toString();
+        return toJson(true);
     }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
index 081fc00..4e4beb6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BackupMeta.java
@@ -17,15 +17,15 @@
 
 package org.apache.doris.backup;
 
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.Table;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.meta.MetaContext;
 
 import com.google.common.collect.Maps;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
 import java.io.DataInput;
 import java.io.DataInputStream;
 import java.io.DataOutput;
@@ -38,32 +38,43 @@ import java.util.List;
 import java.util.Map;
 
 public class BackupMeta implements Writable {
-    private static final Logger LOG = LogManager.getLogger(BackupMeta.class);
 
     // tbl name -> tbl
     private Map<String, Table> tblNameMap = Maps.newHashMap();
     // tbl id -> tbl
     private Map<Long, Table> tblIdMap = Maps.newHashMap();
+    // resource name -> resource
+    private Map<String, Resource> resourceNameMap = Maps.newHashMap();
 
     private BackupMeta() {
-
     }
 
-    public BackupMeta(List<Table> tables) {
+    public BackupMeta(List<Table> tables, List<Resource> resources) {
         for (Table table : tables) {
             tblNameMap.put(table.getName(), table);
             tblIdMap.put(table.getId(), table);
         }
+        for (Resource resource : resources) {
+            resourceNameMap.put(resource.getName(), resource);
+        }
     }
 
     public Map<String, Table> getTables() {
         return tblNameMap;
     }
 
+    public Map<String, Resource> getResourceNameMap() {
+        return resourceNameMap;
+    }
+
     public Table getTable(String tblName) {
         return tblNameMap.get(tblName);
     }
 
+    public Resource getResource(String resourceName) {
+        return resourceNameMap.get(resourceName);
+    }
+
     public Table getTable(Long tblId) {
         return tblIdMap.get(tblId);
     }
@@ -108,6 +119,10 @@ public class BackupMeta implements Writable {
         for (Table table : tblNameMap.values()) {
             table.write(out);
         }
+        out.writeInt(resourceNameMap.size());
+        for (Resource resource : resourceNameMap.values()) {
+            resource.write(out);
+        }
     }
 
     public void readFields(DataInput in) throws IOException {
@@ -117,5 +132,13 @@ public class BackupMeta implements Writable {
             tblNameMap.put(tbl.getName(), tbl);
             tblIdMap.put(tbl.getId(), tbl);
         }
+        if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_95) {
+            return;
+        }
+        size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            Resource resource = Resource.read(in);
+            resourceNameMap.put(resource.getName(), resource);
+        }
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
index ae2559d..4f3d1c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/RestoreJob.java
@@ -18,8 +18,8 @@
 package org.apache.doris.backup;
 
 import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo;
+import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
 import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
-import org.apache.doris.backup.BackupJobInfo.BackupTableInfo;
 import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo;
 import org.apache.doris.backup.RestoreFileMapping.IdChain;
 import org.apache.doris.backup.Status.ErrCode;
@@ -30,6 +30,8 @@ import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.MaterializedIndexMeta;
+import org.apache.doris.catalog.OdbcCatalogResource;
+import org.apache.doris.catalog.OdbcTable;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.OlapTable.OlapTableState;
 import org.apache.doris.catalog.Partition;
@@ -39,11 +41,16 @@ import org.apache.doris.catalog.PartitionType;
 import org.apache.doris.catalog.RangePartitionInfo;
 import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Replica.ReplicaState;
+import org.apache.doris.catalog.Resource;
+import org.apache.doris.catalog.ResourceMgr;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Table.TableType;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletMeta;
+import org.apache.doris.catalog.View;
 import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.MarkedCountDownLatch;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
@@ -124,7 +131,8 @@ public class RestoreJob extends AbstractJob {
     // this 2 members is to save all newly restored objs
     // tbl name -> part
     private List<Pair<String, Partition>> restoredPartitions = Lists.newArrayList();
-    private List<OlapTable> restoredTbls = Lists.newArrayList();
+    private List<Table> restoredTbls = Lists.newArrayList();
+    private List<Resource> restoredResources = Lists.newArrayList();
 
     // save all restored partitions' version info which are already exist in catalog
     // table id -> partition id -> (version, version hash)
@@ -142,6 +150,8 @@ public class RestoreJob extends AbstractJob {
     // NOTICE: because we do not persist it, this info may be lost if Frontend restart,
     // and if you don't want to losing it, backup your data again by using latest Doris version.
     private int metaVersion = -1;
+    // restore properties
+    private Map<String, String> properties = Maps.newHashMap();
 
     public RestoreJob() {
         super(JobType.RESTORE);
@@ -382,14 +392,17 @@ public class RestoreJob extends AbstractJob {
 
     /**
      * Restore rules as follow:
+     * OlapTable
      * A. Table already exist
      *      A1. Partition already exist, generate file mapping
      *      A2. Partition does not exist, add restored partition to the table.
      *          Reset all index/tablet/replica id, and create replica on BE outside the db lock.
      * B. Table does not exist
-     *      B1. Add table to the db, reset all table/index/tablet/replica id, 
+     *      B1. Add table to the db, reset all table/index/tablet/replica id,
      *          and create replica on BE outside the db lock.
-     *          
+     * View
+     *      * A. View already exist. The same signature is allowed.
+     *      * B. View does not exist.
      * All newly created table/partition/index/tablet/replica should be saved for rolling back.
      * 
      * Step:
@@ -418,17 +431,17 @@ public class RestoreJob extends AbstractJob {
 
         // Set all restored tbls' state to RESTORE
         // Table's origin state must be NORMAL and does not have unfinished load job.
-        for (BackupTableInfo tblInfo : jobInfo.tables.values()) {
-            Table tbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
+        for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
+            Table tbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(tableName));
             if (tbl == null) {
                 continue;
             }
-                
+
             if (tbl.getType() != TableType.OLAP) {
                 status = new Status(ErrCode.COMMON_ERROR, "Only support retore OLAP table: " + tbl.getName());
                 return;
             }
-                
+
             OlapTable olapTbl = (OlapTable) tbl;
             olapTbl.writeLock();
             try {
@@ -455,22 +468,62 @@ public class RestoreJob extends AbstractJob {
             } finally {
                 olapTbl.writeUnlock();
             }
+        }
 
+        for (BackupJobInfo.BackupViewInfo backupViewInfo : jobInfo.newBackupObjects.views) {
+            Table tbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(backupViewInfo.name));
+            if (tbl == null) {
+                continue;
+            }
+            if (tbl.getType() != TableType.VIEW) {
+                status = new Status(ErrCode.COMMON_ERROR,
+                        "The local table " + tbl.getName()
+                                + " with the same name but a different type of backup meta.");
+                return;
+            }
+        }
+        for (BackupJobInfo.BackupOdbcTableInfo backupOdbcTableInfo : jobInfo.newBackupObjects.odbcTables) {
+            Table tbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(backupOdbcTableInfo.dorisTableName));
+            if (tbl == null) {
+                continue;
+            }
+            if (tbl.getType() != TableType.ODBC) {
+                status = new Status(ErrCode.COMMON_ERROR,
+                        "The local table " + tbl.getName()
+                                + " with the same name but a different type of backup meta.");
+                return;
+            }
+        }
+        for (BackupJobInfo.BackupOdbcResourceInfo backupOdbcResourceInfo : jobInfo.newBackupObjects.odbcResources) {
+            Resource resource = Catalog.getCurrentCatalog().getResourceMgr().
+                    getResource(backupOdbcResourceInfo.name);
+            if (resource == null) {
+                continue;
+            }
+            if (resource.getType() != Resource.ResourceType.ODBC_CATALOG) {
+                status = new Status(ErrCode.COMMON_ERROR,
+                        "The local resource " + resource.getName()
+                                + " with the same name but a different type of backup meta.");
+                return;
+            }
         }
 
+
         // Check and prepare meta objects.
         AgentBatchTask batchTask = new AgentBatchTask();
         db.readLock();
         try {
-            for (BackupTableInfo tblInfo : jobInfo.tables.values()) {
-                Table remoteTbl = backupMeta.getTable(tblInfo.name);
+            for (Map.Entry<String, BackupOlapTableInfo> olapTableEntry : jobInfo.backupOlapTableObjects.entrySet()) {
+                String tableName = olapTableEntry.getKey();
+                BackupOlapTableInfo tblInfo = olapTableEntry.getValue();
+                Table remoteTbl = backupMeta.getTable(tableName);
                 Preconditions.checkNotNull(remoteTbl);
-                Table localTbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
+                Table localTbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(tableName));
                 if (localTbl != null) {
                     // table already exist, check schema
                     if (localTbl.getType() != TableType.OLAP) {
                         status = new Status(ErrCode.COMMON_ERROR,
-                                "Only support retore olap table: " + localTbl.getName());
+                                "The type of local table should be same as type of remote table: " + remoteTbl.getName());
                         return;
                     }
                     OlapTable localOlapTbl = (OlapTable) localTbl;
@@ -485,16 +538,18 @@ public class RestoreJob extends AbstractJob {
                             return;
                         }
                         LOG.debug("get intersect part names: {}, job: {}", intersectPartNames, this);
-                        if (!localOlapTbl.getSignature(BackupHandler.SIGNATURE_VERSION, intersectPartNames).equals(
-                                remoteOlapTbl.getSignature(BackupHandler.SIGNATURE_VERSION, intersectPartNames))) {
-                            status = new Status(ErrCode.COMMON_ERROR, "Table " + jobInfo.getAliasByOriginNameIfSet(tblInfo.name)
+                        if (localOlapTbl.getSignature(BackupHandler.SIGNATURE_VERSION, intersectPartNames)
+                                != remoteOlapTbl.getSignature(BackupHandler.SIGNATURE_VERSION, intersectPartNames)) {
+                            status = new Status(ErrCode.COMMON_ERROR, "Table " + jobInfo.getAliasByOriginNameIfSet(tableName)
                                     + " already exist but with different schema");
                             return;
                         }
 
                         // Table with same name and has same schema. Check partition
-                        for (BackupPartitionInfo backupPartInfo : tblInfo.partitions.values()) {
-                            Partition localPartition = localOlapTbl.getPartition(backupPartInfo.name);
+                        for (Map.Entry<String, BackupPartitionInfo> partitionEntry : tblInfo.partitions.entrySet()) {
+                            String partitionName = partitionEntry.getKey();
+                            BackupPartitionInfo backupPartInfo = partitionEntry.getValue();
+                            Partition localPartition = localOlapTbl.getPartition(partitionName);
                             if (localPartition != null) {
                                 // Partition already exist.
                                 PartitionInfo localPartInfo = localOlapTbl.getPartitionInfo();
@@ -507,19 +562,19 @@ public class RestoreJob extends AbstractJob {
                                     Range<PartitionKey> remoteRange = remoteRangePartInfo.getRange(backupPartInfo.id);
                                     if (localRange.equals(remoteRange)) {
                                         // Same partition, same range
-                                        if (genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition, localTbl, backupPartInfo, tblInfo)) {
+                                        if (genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition, localTbl, backupPartInfo, partitionName, tblInfo)) {
                                             return;
                                         }
                                     } else {
                                         // Same partition name, different range
-                                        status = new Status(ErrCode.COMMON_ERROR, "Partition " + backupPartInfo.name
+                                        status = new Status(ErrCode.COMMON_ERROR, "Partition " + partitionName
                                                 + " in table " + localTbl.getName()
                                                 + " has different range with partition in repository");
                                         return;
                                     }
                                 } else {
                                     // If this is a single partitioned table.
-                                    if (genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition, localTbl, backupPartInfo, tblInfo)) {
+                                    if (genFileMappingWhenBackupReplicasEqual(localPartInfo, localPartition, localTbl, backupPartInfo, partitionName, tblInfo)) {
                                         return;
                                     }
                                 }
@@ -533,14 +588,14 @@ public class RestoreJob extends AbstractJob {
                                             = (RangePartitionInfo) remoteOlapTbl.getPartitionInfo();
                                     Range<PartitionKey> remoteRange = remoteRangePartitionInfo.getRange(backupPartInfo.id);
                                     if (localRangePartitionInfo.getAnyIntersectRange(remoteRange, false) != null) {
-                                        status = new Status(ErrCode.COMMON_ERROR, "Partition " + backupPartInfo.name
+                                        status = new Status(ErrCode.COMMON_ERROR, "Partition " + partitionName
                                                 + " in table " + localTbl.getName()
                                                 + " has conflict range with existing ranges");
                                         return;
                                     } else {
                                         // this partition can be added to this table, set ids
                                         Partition restorePart = resetPartitionForRestore(localOlapTbl, remoteOlapTbl,
-                                                backupPartInfo.name,
+                                                partitionName,
                                                 db.getClusterName(),
                                                 restoreReplicationNum);
                                         if (restorePart == null) {
@@ -567,24 +622,66 @@ public class RestoreJob extends AbstractJob {
                     for (String partName : allPartNames) {
                         if (!tblInfo.containsPart(partName)) {
                             remoteOlapTbl.dropPartition(-1 /* db id is useless here */, partName,
-                                                        true /* act like replay to disable recycle bin action */);
+                                    true /* act like replay to disable recycle bin action */);
                         }
                     }
-                    
+
                     // reset all ids in this table
                     Status st = remoteOlapTbl.resetIdsForRestore(catalog, db, restoreReplicationNum);
                     if (!st.ok()) {
                         status = st;
                         return;
                     }
-                    
+
                     // DO NOT set remote table's new name here, cause we will still need the origin name later
                     // remoteOlapTbl.setName(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
                     remoteOlapTbl.setState(allowLoad ? OlapTableState.RESTORE_WITH_LOAD : OlapTableState.RESTORE);
                     LOG.debug("put remote table {} to restoredTbls", remoteOlapTbl.getName());
                     restoredTbls.add(remoteOlapTbl);
                 }
-            } // end of all restore tables
+            } // end of all restore olap tables
+
+            // restore views
+            for (BackupJobInfo.BackupViewInfo backupViewInfo : jobInfo.newBackupObjects.views) {
+                String backupViewName = backupViewInfo.name;
+                Table localTbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(backupViewName));
+                View remoteView = (View) backupMeta.getTable(backupViewName);
+                if (localTbl != null) {
+                    Preconditions.checkState(localTbl.getType() == TableType.VIEW);
+                    View localView = (View) localTbl;
+                    if (!localView.getSignature(BackupHandler.SIGNATURE_VERSION)
+                            .equals(remoteView.getSignature(BackupHandler.SIGNATURE_VERSION))) {
+                        status = new Status(ErrCode.COMMON_ERROR, "View "
+                                + jobInfo.getAliasByOriginNameIfSet(backupViewName)
+                                + " already exist but with different schema");
+                        return;
+                    }
+                } else {
+                    remoteView.resetIdsForRestore(catalog);
+                    restoredTbls.add(remoteView);
+                }
+            }
+
+            // restore odbc external table
+            for (BackupJobInfo.BackupOdbcTableInfo backupOdbcTableInfo : jobInfo.newBackupObjects.odbcTables) {
+                String backupOdbcTableName = backupOdbcTableInfo.dorisTableName;
+                Table localTbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(backupOdbcTableName));
+                OdbcTable remoteOdbcTable = (OdbcTable) backupMeta.getTable(backupOdbcTableName);
+                if (localTbl != null) {
+                    Preconditions.checkState(localTbl.getType() == TableType.ODBC);
+                    OdbcTable localOdbcTable = (OdbcTable) localTbl;
+                    if (!localOdbcTable.getSignature(BackupHandler.SIGNATURE_VERSION)
+                            .equals(remoteOdbcTable.getSignature(BackupHandler.SIGNATURE_VERSION))) {
+                        status = new Status(ErrCode.COMMON_ERROR, "Odbc table "
+                                + jobInfo.getAliasByOriginNameIfSet(backupOdbcTableName)
+                                + " already exist but with different schema");
+                        return;
+                    }
+                } else {
+                    remoteOdbcTable.resetIdsForRestore(catalog);
+                    restoredTbls.add(remoteOdbcTable);
+                }
+            }
 
             LOG.debug("finished to prepare restored partitions and tables. {}", this);
             // for now, nothing is modified in catalog
@@ -595,23 +692,26 @@ public class RestoreJob extends AbstractJob {
                 Preconditions.checkNotNull(localTbl, localTbl.getName());
                 Partition restorePart = entry.second;
                 OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first);
-                BackupPartitionInfo backupPartitionInfo 
-                        = jobInfo.getTableInfo(entry.first).getPartInfo(restorePart.getName());
+                BackupPartitionInfo backupPartitionInfo
+                        = jobInfo.getOlapTableInfo(entry.first).getPartInfo(restorePart.getName());
 
                 createReplicas(db, batchTask, localTbl, restorePart);
 
                 genFileMapping(localTbl, restorePart, remoteTbl.getId(), backupPartitionInfo,
-                               !allowLoad /* if allow load, do not overwrite when commit */);
+                        !allowLoad /* if allow load, do not overwrite when commit */);
             }
 
             // generate create replica task for all restored tables
-            for (OlapTable restoreTbl : restoredTbls) {
-                for (Partition restorePart : restoreTbl.getPartitions()) {
-                    createReplicas(db, batchTask, restoreTbl, restorePart);
-                    BackupTableInfo backupTableInfo = jobInfo.getTableInfo(restoreTbl.getName());
-                    genFileMapping(restoreTbl, restorePart, backupTableInfo.id,
-                                   backupTableInfo.getPartInfo(restorePart.getName()),
-                                   !allowLoad /* if allow load, do not overwrite when commit */);
+            for (Table restoreTbl : restoredTbls) {
+                if (restoreTbl.getType() == TableType.OLAP) {
+                    OlapTable restoreOlapTable = (OlapTable) restoreTbl;
+                    for (Partition restorePart : restoreOlapTable.getPartitions()) {
+                        createReplicas(db, batchTask, restoreOlapTable, restorePart);
+                        BackupOlapTableInfo backupOlapTableInfo = jobInfo.getOlapTableInfo(restoreOlapTable.getName());
+                        genFileMapping(restoreOlapTable, restorePart, backupOlapTableInfo.id,
+                                backupOlapTableInfo.getPartInfo(restorePart.getName()),
+                                !allowLoad /* if allow load, do not overwrite when commit */);
+                    }
                 }
                 // set restored table's new name after all 'genFileMapping'
                 restoreTbl.setName(jobInfo.getAliasByOriginNameIfSet(restoreTbl.getName()));
@@ -622,9 +722,17 @@ public class RestoreJob extends AbstractJob {
             db.readUnlock();
         }
 
+        // check and restore resources
+        checkAndRestoreResources();
+        if (!status.ok()) {
+            return;
+        }
+        LOG.debug("finished to restore resources. {}", this.jobId);
+
         // Send create replica task to BE outside the db lock
+        boolean ok = false;
+        MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(batchTask.getTaskNum());
         if (batchTask.getTaskNum() > 0) {
-            MarkedCountDownLatch<Long, Long> latch = new MarkedCountDownLatch<Long, Long>(batchTask.getTaskNum());
             for (AgentTask task : batchTask.getAllTasks()) {
                 latch.addMark(task.getBackendId(), task.getTabletId());
                 ((CreateReplicaTask) task).setLatch(latch);
@@ -635,67 +743,67 @@ public class RestoreJob extends AbstractJob {
             // estimate timeout, at most 10 min
             long timeout = Config.tablet_create_timeout_second * 1000L * batchTask.getTaskNum();
             timeout = Math.min(10 * 60 * 1000, timeout);
-            boolean ok = false;
             try {
                 LOG.info("begin to send create replica tasks to BE for restore. total {} tasks. timeout: {}",
-                         batchTask.getTaskNum(), timeout);
+                        batchTask.getTaskNum(), timeout);
                 ok = latch.await(timeout, TimeUnit.MILLISECONDS);
             } catch (InterruptedException e) {
                 LOG.warn("InterruptedException: ", e);
                 ok = false;
             }
-            
-            if (ok) {
-                LOG.debug("finished to create all restored replcias. {}", this);
-                // add restored partitions.
-                // table should be in State RESTORE, so no other partitions can be
-                // added to or removed from this table during the restore process.
-                for (Pair<String, Partition> entry : restoredPartitions) {
-                    OlapTable localTbl = (OlapTable) db.getTable(entry.first);
-                    localTbl.writeLock();
-                    try {
-                        Partition restoredPart = entry.second;
-                        OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first);
-                        RangePartitionInfo localPartitionInfo = (RangePartitionInfo) localTbl.getPartitionInfo();
-                        RangePartitionInfo remotePartitionInfo = (RangePartitionInfo) remoteTbl.getPartitionInfo();
-                        BackupPartitionInfo backupPartitionInfo
-                                = jobInfo.getTableInfo(entry.first).getPartInfo(restoredPart.getName());
-                        long remotePartId = backupPartitionInfo.id;
-                        Range<PartitionKey> remoteRange = remotePartitionInfo.getRange(remotePartId);
-                        DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId);
-                        localPartitionInfo.addPartition(restoredPart.getId(), false, remoteRange,
-                                remoteDataProperty, (short) restoreReplicationNum,
-                                remotePartitionInfo.getIsInMemory(remotePartId));
-                        localTbl.addPartition(restoredPart);
-                    } finally {
-                        localTbl.writeUnlock();
-                    }
+        } else {
+            ok = true;
+        }
 
+        if (ok) {
+            LOG.debug("finished to create all restored replcias. {}", this);
+            // add restored partitions.
+            // table should be in State RESTORE, so no other partitions can be
+            // added to or removed from this table during the restore process.
+            for (Pair<String, Partition> entry : restoredPartitions) {
+                OlapTable localTbl = (OlapTable) db.getTable(entry.first);
+                localTbl.writeLock();
+                try {
+                    Partition restoredPart = entry.second;
+                    OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first);
+                    RangePartitionInfo localPartitionInfo = (RangePartitionInfo) localTbl.getPartitionInfo();
+                    RangePartitionInfo remotePartitionInfo = (RangePartitionInfo) remoteTbl.getPartitionInfo();
+                    BackupPartitionInfo backupPartitionInfo
+                            = jobInfo.getOlapTableInfo(entry.first).getPartInfo(restoredPart.getName());
+                    long remotePartId = backupPartitionInfo.id;
+                    Range<PartitionKey> remoteRange = remotePartitionInfo.getRange(remotePartId);
+                    DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId);
+                    localPartitionInfo.addPartition(restoredPart.getId(), false, remoteRange,
+                            remoteDataProperty, (short) restoreReplicationNum,
+                            remotePartitionInfo.getIsInMemory(remotePartId));
+                    localTbl.addPartition(restoredPart);
+                } finally {
+                    localTbl.writeUnlock();
                 }
 
-                // add restored tables
-                for (OlapTable tbl : restoredTbls) {
-                    db.writeLock();
-                    try {
-                        if (!db.createTable(tbl)) {
-                            status = new Status(ErrCode.COMMON_ERROR, "Table " + tbl.getName()
-                                    + " already exist in db: " + db.getFullName());
-                            return;
-                        }
-                    } finally {
-                        db.writeUnlock();
-                    }
+            }
 
+            // add restored tables
+            for (Table tbl : restoredTbls) {
+                db.writeLock();
+                try {
+                    if (!db.createTable(tbl)) {
+                        status = new Status(ErrCode.COMMON_ERROR, "Table " + tbl.getName()
+                                + " already exist in db: " + db.getFullName());
+                        return;
+                    }
+                } finally {
+                    db.writeUnlock();
                 }
-            } else {
-                List<Entry<Long, Long>> unfinishedMarks = latch.getLeftMarks();
-                // only show at most 10 results
-                List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 10));
-                String idStr = Joiner.on(", ").join(subList);
-                status = new Status(ErrCode.COMMON_ERROR,
-                        "Failed to create replicas for restore. unfinished marks: " + idStr);
-                return;
             }
+        } else {
+            List<Entry<Long, Long>> unfinishedMarks = latch.getLeftMarks();
+            // only show at most 10 results
+            List<Entry<Long, Long>> subList = unfinishedMarks.subList(0, Math.min(unfinishedMarks.size(), 10));
+            String idStr = Joiner.on(", ").join(subList);
+            status = new Status(ErrCode.COMMON_ERROR,
+                    "Failed to create replicas for restore. unfinished marks: " + idStr);
+            return;
         }
 
         LOG.info("finished to prepare meta. begin to make snapshot. {}", this);
@@ -753,13 +861,46 @@ public class RestoreJob extends AbstractJob {
 
         // No log here, PENDING state restore job will redo this method
         LOG.info("finished to prepare meta and send snapshot tasks, num: {}. {}",
-                 batchTask.getTaskNum(), this);
+                batchTask.getTaskNum(), this);
+    }
+
+    private void checkAndRestoreResources() {
+        ResourceMgr resourceMgr = Catalog.getCurrentCatalog().getResourceMgr();
+        for (BackupJobInfo.BackupOdbcResourceInfo backupOdbcResourceInfo : jobInfo.newBackupObjects.odbcResources) {
+            String backupResourceName = backupOdbcResourceInfo.name;
+            Resource localResource = resourceMgr.getResource(backupResourceName);
+            OdbcCatalogResource remoteOdbcResource = (OdbcCatalogResource) backupMeta.getResource(backupResourceName);
+            if (localResource != null) {
+                if (localResource.getType() != Resource.ResourceType.ODBC_CATALOG) {
+                    status = new Status(ErrCode.COMMON_ERROR, "The type of local resource "
+                            + backupResourceName + " is not same as restored resource");
+                    return;
+                }
+                OdbcCatalogResource localOdbcResource = (OdbcCatalogResource) localResource;
+                if (localOdbcResource.getSignature(BackupHandler.SIGNATURE_VERSION)
+                        != remoteOdbcResource.getSignature(BackupHandler.SIGNATURE_VERSION)) {
+                    status = new Status(ErrCode.COMMON_ERROR, "Odbc resource "
+                            + jobInfo.getAliasByOriginNameIfSet(backupResourceName)
+                            + " already exist but with different properties");
+                    return;
+                }
+            } else {
+                try {
+                    // restore resource
+                    resourceMgr.createResource(remoteOdbcResource);
+                } catch (DdlException e) {
+                    status = new Status(ErrCode.COMMON_ERROR, e.getMessage());
+                    return;
+                }
+                restoredResources.add(remoteOdbcResource);
+            }
+        }
     }
 
     private boolean genFileMappingWhenBackupReplicasEqual(PartitionInfo localPartInfo, Partition localPartition, Table localTbl,
-                                                          BackupPartitionInfo backupPartInfo, BackupTableInfo tblInfo) {
+                                                          BackupPartitionInfo backupPartInfo, String partitionName, BackupOlapTableInfo tblInfo) {
         if (localPartInfo.getReplicationNum(localPartition.getId()) != restoreReplicationNum) {
-            status = new Status(ErrCode.COMMON_ERROR, "Partition " + backupPartInfo.name
+            status = new Status(ErrCode.COMMON_ERROR, "Partition " + partitionName
                     + " in table " + localTbl.getName()
                     + " has different replication num '"
                     + localPartInfo.getReplicationNum(localPartition.getId())
@@ -877,10 +1018,10 @@ public class RestoreJob extends AbstractJob {
             LOG.debug("get index id: {}, index name: {}", localIdx.getId(),
                     localTbl.getIndexNameById(localIdx.getId()));
             BackupIndexInfo backupIdxInfo = backupPartInfo.getIdx(localTbl.getIndexNameById(localIdx.getId()));
-            Preconditions.checkState(backupIdxInfo.tablets.size() == localIdx.getTablets().size());
+            Preconditions.checkState(backupIdxInfo.sortedTabletInfoList.size() == localIdx.getTablets().size());
             for (int i = 0; i < localIdx.getTablets().size(); i++) {
                 Tablet localTablet = localIdx.getTablets().get(i);
-                BackupTabletInfo backupTabletInfo = backupIdxInfo.tablets.get(i);
+                BackupTabletInfo backupTabletInfo = backupIdxInfo.sortedTabletInfoList.get(i);
                 LOG.debug("get tablet mapping: {} to {}, index {}", backupTabletInfo.id, localTablet.getId(), i);
                 for (Replica localReplica : localTablet.getReplicas()) {
                     IdChain src = new IdChain(remoteTblId, backupPartInfo.id, backupIdxInfo.id, backupTabletInfo.id,
@@ -910,8 +1051,8 @@ public class RestoreJob extends AbstractJob {
         Database db = catalog.getDb(dbId);
 
         // replay set all existing tables's state to RESTORE
-        for (BackupTableInfo tblInfo : jobInfo.tables.values()) {
-            Table tbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
+        for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
+            Table tbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(tableName));
             if (tbl == null) {
                 continue;
             }
@@ -931,7 +1072,7 @@ public class RestoreJob extends AbstractJob {
             OlapTable remoteTbl = (OlapTable) backupMeta.getTable(entry.first);
             RangePartitionInfo localPartitionInfo = (RangePartitionInfo) localTbl.getPartitionInfo();
             RangePartitionInfo remotePartitionInfo = (RangePartitionInfo) remoteTbl.getPartitionInfo();
-            BackupPartitionInfo backupPartitionInfo = jobInfo.getTableInfo(entry.first).getPartInfo(restorePart.getName());
+            BackupPartitionInfo backupPartitionInfo = jobInfo.getOlapTableInfo(entry.first).getPartInfo(restorePart.getName());
             long remotePartId = backupPartitionInfo.id;
             Range<PartitionKey> remoteRange = remotePartitionInfo.getRange(remotePartId);
             DataProperty remoteDataProperty = remotePartitionInfo.getDataProperty(remotePartId);
@@ -955,19 +1096,23 @@ public class RestoreJob extends AbstractJob {
         }
 
         // restored tables
-        for (OlapTable restoreTbl : restoredTbls) {
+        for (Table restoreTbl : restoredTbls) {
             db.writeLock();
             try {
                 db.createTable(restoreTbl);
             } finally {
                 db.writeUnlock();
             }
-            restoreTbl.writeLock();
+            if (restoreTbl.getType() != TableType.OLAP) {
+                continue;
+            }
+            OlapTable olapRestoreTbl = (OlapTable) restoreTbl;
+            olapRestoreTbl.writeLock();
             try {
                 // modify tablet inverted index
-                for (Partition restorePart : restoreTbl.getPartitions()) {
+                for (Partition restorePart : olapRestoreTbl.getPartitions()) {
                     for (MaterializedIndex restoreIdx : restorePart.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                        int schemaHash = restoreTbl.getSchemaHashByIndexId(restoreIdx.getId());
+                        int schemaHash = olapRestoreTbl.getSchemaHashByIndexId(restoreIdx.getId());
                         TabletMeta tabletMeta = new TabletMeta(db.getId(), restoreTbl.getId(), restorePart.getId(),
                                 restoreIdx.getId(), schemaHash, TStorageMedium.HDD);
                         for (Tablet restoreTablet : restoreIdx.getTablets()) {
@@ -979,9 +1124,16 @@ public class RestoreJob extends AbstractJob {
                     }
                 }
             } finally {
-                restoreTbl.writeUnlock();
+                olapRestoreTbl.writeUnlock();
             }
         }
+
+        // restored resource
+        ResourceMgr resourceMgr = Catalog.getCurrentCatalog().getResourceMgr();
+        for (Resource resource : restoredResources) {
+            resourceMgr.replayCreateResource(resource);
+        }
+
         LOG.info("replay check and prepare meta. {}", this);
     }
 
@@ -1256,6 +1408,7 @@ public class RestoreJob extends AbstractJob {
         if (!isReplay) {
             restoredPartitions.clear();
             restoredTbls.clear();
+            restoredResources.clear();
 
             // release snapshot before clearing snapshotInfos
             releaseSnapshots();
@@ -1378,19 +1531,22 @@ public class RestoreJob extends AbstractJob {
             setTableStateToNormal(db);
 
             // remove restored tbls
-            for (OlapTable restoreTbl : restoredTbls) {
+            for (Table restoreTbl : restoredTbls) {
                 LOG.info("remove restored table when cancelled: {}", restoreTbl.getName());
-                restoreTbl.writeLock();
-                try {
-                    for (Partition part : restoreTbl.getPartitions()) {
-                        for (MaterializedIndex idx : part.getMaterializedIndices(IndexExtState.VISIBLE)) {
-                            for (Tablet tablet : idx.getTablets()) {
-                                Catalog.getCurrentInvertedIndex().deleteTablet(tablet.getId());
+                if (restoreTbl.getType() == TableType.OLAP) {
+                    OlapTable restoreOlapTable = (OlapTable) restoreTbl;
+                    restoreOlapTable.writeLock();
+                    try {
+                        for (Partition part : restoreOlapTable.getPartitions()) {
+                            for (MaterializedIndex idx : part.getMaterializedIndices(IndexExtState.VISIBLE)) {
+                                for (Tablet tablet : idx.getTablets()) {
+                                    Catalog.getCurrentInvertedIndex().deleteTablet(tablet.getId());
+                                }
                             }
                         }
+                    } finally {
+                        restoreTbl.writeUnlock();
                     }
-                } finally {
-                    restoreTbl.writeUnlock();
                 }
                 db.dropTableWithLock(restoreTbl.getName());
             }
@@ -1417,6 +1573,13 @@ public class RestoreJob extends AbstractJob {
                 }
 
             }
+
+            // remove restored resource
+            ResourceMgr resourceMgr = Catalog.getCurrentCatalog().getResourceMgr();
+            for (Resource resource : restoredResources) {
+                LOG.info("remove restored resource when cancelled: {}", resource.getName());
+                resourceMgr.dropResource(resource);
+            }
         }
 
         if (!isReplay) {
@@ -1441,8 +1604,8 @@ public class RestoreJob extends AbstractJob {
     }
 
     private void setTableStateToNormal(Database db) {
-        for (BackupTableInfo tblInfo : jobInfo.tables.values()) {
-            Table tbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(tblInfo.name));
+        for (String tableName : jobInfo.backupOlapTableObjects.keySet()) {
+            Table tbl = db.getTable(jobInfo.getAliasByOriginNameIfSet(tableName));
             if (tbl == null) {
                 continue;
             }
@@ -1502,7 +1665,7 @@ public class RestoreJob extends AbstractJob {
         }
 
         out.writeInt(restoredTbls.size());
-        for (OlapTable tbl : restoredTbls) {
+        for (Table tbl : restoredTbls) {
             tbl.write(out);
         }
 
@@ -1527,6 +1690,18 @@ public class RestoreJob extends AbstractJob {
                 entry.getValue().write(out);
             }
         }
+
+        out.writeInt(restoredResources.size());
+        for (Resource resource: restoredResources) {
+            resource.write(out);
+        }
+
+        // write properties
+        out.writeInt(properties.size());
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            Text.writeString(out, entry.getKey());
+            Text.writeString(out, entry.getValue());
+        }
     }
 
     @Override
@@ -1560,7 +1735,7 @@ public class RestoreJob extends AbstractJob {
 
         size = in.readInt();
         for (int i = 0; i < size; i++) {
-            restoredTbls.add((OlapTable) Table.read(in));
+            restoredTbls.add(Table.read(in));
         }
 
         size = in.readInt();
@@ -1585,6 +1760,24 @@ public class RestoreJob extends AbstractJob {
                 snapshotInfos.put(tabletId, beId, info);
             }
         }
+
+        if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_95) {
+            return;
+        }
+
+        // restored resource
+        size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            restoredResources.add(Resource.read(in));
+        }
+
+        // read properties
+        size = in.readInt();
+        for (int i = 0; i < size; i++) {
+            String key = Text.readString(in);
+            String value = Text.readString(in);
+            properties.put(key, value);
+        }
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index c0935e7..bf6d122 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -4296,7 +4296,7 @@ public class Catalog {
         }
     }
 
-    public void replayAlterExteranlTableSchema(String dbName, String tableName, List<Column> newSchema) throws DdlException {
+    public void replayAlterExternalTableSchema(String dbName, String tableName, List<Column> newSchema) throws DdlException {
         Database db = this.fullNameToDb.get(dbName);
         Table table = db.getTable(tableName);
         table.writeLock();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
index 3de097e..202c8eb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Database.java
@@ -38,13 +38,13 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -56,7 +56,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
-import java.util.zip.Adler32;
 
 /**
  * Internal representation of db-related metadata. Owned by Catalog instance.
@@ -473,17 +472,12 @@ public class Database extends MetaObject implements Writable {
     }
 
     @Override
-    public int getSignature(int signatureVersion) {
-        Adler32 adler32 = new Adler32();
-        adler32.update(signatureVersion);
-        String charsetName = "UTF-8";
-        try {
-            adler32.update(this.fullQualifiedName.getBytes(charsetName));
-        } catch (UnsupportedEncodingException e) {
-            LOG.error("encoding error", e);
-            return -1;
-        }
-        return Math.abs((int) adler32.getValue());
+    public String getSignature(int signatureVersion) {
+        StringBuilder sb = new StringBuilder(signatureVersion);
+        sb.append(fullQualifiedName);
+        String md5 = DigestUtils.md5Hex(sb.toString());
+        LOG.debug("get signature of database {}: {}. signature string: {}", fullQualifiedName, md5, sb.toString());
+        return md5;
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
index a490279..8cae113 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/EsTable.java
@@ -30,20 +30,19 @@ import org.apache.doris.thrift.TTableType;
 
 import com.google.common.base.Strings;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.zip.Adler32;
 
 public class EsTable extends Table {
     private static final Logger LOG = LogManager.getLogger(EsTable.class);
@@ -253,40 +252,26 @@ public class EsTable extends Table {
     }
 
     @Override
-    public int getSignature(int signatureVersion) {
-        Adler32 adler32 = new Adler32();
-        adler32.update(signatureVersion);
-        String charsetName = "UTF-8";
-
-        try {
-            // name
-            adler32.update(name.getBytes(charsetName));
-            // type
-            adler32.update(type.name().getBytes(charsetName));
-            if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_68) {
-                for (Map.Entry<String, String> entry : tableContext.entrySet()) {
-                    adler32.update(entry.getValue().getBytes(charsetName));
-                }
-            } else {
-                // host
-                adler32.update(hosts.getBytes(charsetName));
-                // username
-                adler32.update(userName.getBytes(charsetName));
-                // passwd
-                adler32.update(passwd.getBytes(charsetName));
-                // index name
-                adler32.update(indexName.getBytes(charsetName));
-                // mappingType
-                adler32.update(mappingType.getBytes(charsetName));
-                // transport
-                adler32.update(transport.getBytes(charsetName));
+    public String getSignature(int signatureVersion) {
+        StringBuilder sb = new StringBuilder(signatureVersion);
+        sb.append(name);
+        sb.append(type.name());
+        if (tableContext.isEmpty()) {
+            sb.append(hosts);
+            sb.append(userName);
+            sb.append(passwd);
+            sb.append(indexName);
+            sb.append(mappingType);
+            sb.append(transport);
+        } else {
+            for (Map.Entry<String, String> entry : tableContext.entrySet()) {
+                sb.append(entry.getKey());
+                sb.append(entry.getValue());
             }
-        } catch (UnsupportedEncodingException e) {
-            LOG.error("encoding error", e);
-            return -1;
         }
-
-        return Math.abs((int) adler32.getValue());
+        String md5 = DigestUtils.md5Hex(sb.toString());
+        LOG.debug("get signature of es table {}: {}. signature string: {}", name, md5, sb.toString());
+        return md5;
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetaObject.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetaObject.java
index 20af0e7..b27d883 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MetaObject.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MetaObject.java
@@ -20,10 +20,11 @@ package org.apache.doris.catalog;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Writable;
 
+import org.apache.commons.codec.digest.DigestUtils;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.util.zip.Adler32;
 
 public class MetaObject implements Writable {
     
@@ -36,10 +37,9 @@ public class MetaObject implements Writable {
     }
 
     // implement this in derived class
-    public int getSignature(int signatureVersion) {
-        Adler32 adler32 = new Adler32();
-        adler32.update(signatureVersion);
-        return Math.abs((int) adler32.getValue());
+    public String getSignature(int signatureVersion) {
+        String md5 = DigestUtils.md5Hex(Integer.toString(signatureVersion));
+        return md5;
     }
 
     public long getLastCheckTime() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java
index be7e49e..fcdfb90 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/MysqlTable.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.catalog;
 
-import com.google.common.collect.Maps;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.io.Text;
@@ -28,17 +27,17 @@ import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Map;
-import java.util.zip.Adler32;
 
 public class MysqlTable extends Table {
     private static final Logger LOG = LogManager.getLogger(OlapTable.class);
@@ -205,35 +204,19 @@ public class MysqlTable extends Table {
     }
 
     @Override
-    public int getSignature(int signatureVersion) {
-        Adler32 adler32 = new Adler32();
-        adler32.update(signatureVersion);
-        String charsetName = "UTF-8";
-
-        try {
-            // name
-            adler32.update(name.getBytes(charsetName));
-            // type
-            adler32.update(type.name().getBytes(charsetName));
-            // host
-            adler32.update(getHost().getBytes(charsetName));
-            // port
-            adler32.update(getPort().getBytes(charsetName));
-            // username
-            adler32.update(getUserName().getBytes(charsetName));
-            // passwd
-            adler32.update(getPasswd().getBytes(charsetName));
-            // mysql db
-            adler32.update(mysqlDatabaseName.getBytes(charsetName));
-            // mysql table
-            adler32.update(mysqlTableName.getBytes(charsetName));
-
-        } catch (UnsupportedEncodingException e) {
-            LOG.error("encoding error", e);
-            return -1;
-        }
-
-        return Math.abs((int) adler32.getValue());
+    public String getSignature(int signatureVersion) {
+        StringBuilder sb = new StringBuilder(signatureVersion);
+        sb.append(name);
+        sb.append(type.name());
+        sb.append(getHost());
+        sb.append(getPort());
+        sb.append(getUserName());
+        sb.append(getPasswd());
+        sb.append(mysqlDatabaseName);
+        sb.append(mysqlTableName);
+        String md5 = DigestUtils.md5Hex(sb.toString());
+        LOG.debug("get signature of mysql table {}: {}. signature string: {}", name, md5, sb.toString());
+        return md5;
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcCatalogResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcCatalogResource.java
index 6cad971..9020cd8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcCatalogResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcCatalogResource.java
@@ -19,12 +19,19 @@ package org.apache.doris.catalog;
 
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.proc.BaseProcResult;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.gson.annotations.SerializedName;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.UnsupportedEncodingException;
 import java.util.Map;
+import java.util.zip.Adler32;
+
+import com.google.gson.annotations.SerializedName;
 
 /**
  * External ODBC Catalog resource for external table query.
@@ -45,6 +52,7 @@ import java.util.Map;
  * DROP RESOURCE "odbc_mysql";
  */
 public class OdbcCatalogResource extends Resource {
+    private static final Logger LOG = LogManager.getLogger(OdbcCatalogResource.class);
     // required
     private static final String HOST = "host";
     private static final String PORT = "port";
@@ -58,6 +66,11 @@ public class OdbcCatalogResource extends Resource {
     @SerializedName(value = "configs")
     private Map<String, String> configs;
 
+    // only for deep copy
+    public OdbcCatalogResource() {
+        super();
+    }
+
     public OdbcCatalogResource(String name) {
         this(name, Maps.newHashMap());
     }
@@ -86,6 +99,34 @@ public class OdbcCatalogResource extends Resource {
         return value;
     }
 
+    // TODO(ml): change to md5 of string signature
+    public int getSignature(int signatureVersion) {
+        Adler32 adler32 = new Adler32();
+        adler32.update(signatureVersion);
+        final String charsetName = "UTF-8";
+
+        try {
+            // table name
+            adler32.update(name.getBytes(charsetName));
+            LOG.debug("signature. view name: {}", name);
+            // type
+            adler32.update(type.name().getBytes(charsetName));
+            LOG.debug("signature. view type: {}", type.name());
+            // configs
+            for (Map.Entry<String, String> config: configs.entrySet()) {
+                adler32.update(config.getKey().getBytes(charsetName));
+                adler32.update(config.getValue().getBytes(charsetName));
+                LOG.debug("signature. view config: {}", config);
+            }
+        } catch (UnsupportedEncodingException e) {
+            LOG.error("encoding error", e);
+            return -1;
+        }
+
+        LOG.debug("signature: {}", Math.abs((int) adler32.getValue()));
+        return Math.abs((int) adler32.getValue());
+    }
+
     @Override
     protected void setProperties(Map<String, String> properties) throws DdlException {
         Preconditions.checkState(properties != null);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java
index a42fbd7..398ff6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OdbcTable.java
@@ -17,30 +17,31 @@
 
 package org.apache.doris.catalog;
 
-import com.google.common.collect.Maps;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.io.DeepCopy;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TOdbcTable;
+import org.apache.doris.thrift.TOdbcTableType;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
-import org.apache.doris.thrift.TOdbcTableType;
 
 import com.google.common.base.Strings;
+import com.google.common.collect.Maps;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.zip.Adler32;
 
 public class OdbcTable extends Table {
     private static final Logger LOG = LogManager.getLogger(OlapTable.class);
@@ -292,6 +293,20 @@ public class OdbcTable extends Table {
         return TABLE_TYPE_MAP.get(getOdbcTableTypeName());
     }
 
+    @Override
+    public OdbcTable clone() {
+        OdbcTable copied = new OdbcTable();
+        if (!DeepCopy.copy(this, copied, OdbcTable.class, FeConstants.meta_version)) {
+            LOG.warn("failed to copy odbc table: " + getName());
+            return null;
+        }
+        return copied;
+    }
+
+    public void resetIdsForRestore(Catalog catalog){
+        id = catalog.getNextId();
+    }
+
     public TTableDescriptor toThrift() {
         TOdbcTable tOdbcTable = new TOdbcTable();
 
@@ -311,40 +326,25 @@ public class OdbcTable extends Table {
     }
 
     @Override
-    public int getSignature(int signatureVersion) {
-        Adler32 adler32 = new Adler32();
-        adler32.update(signatureVersion);
-        String charsetName = "UTF-8";
-
-        try {
-            // resource name
-            adler32.update(odbcCatalogResourceName.getBytes(charsetName));
-            // name
-            adler32.update(name.getBytes(charsetName));
-            // type
-            adler32.update(type.name().getBytes(charsetName));
-            // host
-            adler32.update(host.getBytes(charsetName));
-            // port
-            adler32.update(port.getBytes(charsetName));
-            // username
-            adler32.update(userName.getBytes(charsetName));
-            // passwd
-            adler32.update(passwd.getBytes(charsetName));
-            // odbc db
-            adler32.update(odbcDatabaseName.getBytes(charsetName));
-            // odbc table
-            adler32.update(odbcTableName.getBytes(charsetName));
-            // odbc driver
-            adler32.update(driver.getBytes(charsetName));
-            // odbc type
-            adler32.update(odbcTableTypeName.getBytes(charsetName));
-        } catch (UnsupportedEncodingException e) {
-            LOG.error("encoding error", e);
-            return -1;
+    public String getSignature(int signatureVersion) {
+        StringBuilder sb = new StringBuilder(signatureVersion);
+        sb.append(name);
+        sb.append(type);
+        if (odbcCatalogResourceName != null) {
+            sb.append(odbcCatalogResourceName);
+            sb.append(odbcDatabaseName);
+            sb.append(odbcTableName);
+        } else {
+            sb.append(host);
+            sb.append(port);
+            sb.append(userName);
+            sb.append(passwd);
+            sb.append(driver);
+            sb.append(odbcTableTypeName);
         }
-
-        return Math.abs((int) adler32.getValue());
+        String md5 = DigestUtils.md5Hex(sb.toString());
+        LOG.debug("get signature of odbc table {}: {}. signature string: {}", name, md5, sb.toString());
+        return md5;
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
index 8882dde..e2115bc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
@@ -977,6 +977,7 @@ public class OlapTable extends Table {
         return false;
     }
 
+
     @Override
     public void write(DataOutput out) throws IOException {
         super.write(out);
@@ -1203,7 +1204,7 @@ public class OlapTable extends Table {
 
     public OlapTable selectiveCopy(Collection<String> reservedPartitions, boolean resetState, IndexExtState extState) {
         OlapTable copied = new OlapTable();
-        if (!DeepCopy.copy(this, copied, OlapTable.class)) {
+        if (!DeepCopy.copy(this, copied, OlapTable.class, FeConstants.meta_version)) {
             LOG.warn("failed to copy olap table: " + getName());
             return null;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
index fed15e6..96a1ec6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Resource.java
@@ -19,11 +19,16 @@ package org.apache.doris.catalog;
 
 import org.apache.doris.analysis.CreateResourceStmt;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeConstants;
+import org.apache.doris.common.io.DeepCopy;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.persist.gson.GsonUtils;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import com.google.gson.annotations.SerializedName;
 
 import java.io.DataInput;
@@ -32,6 +37,8 @@ import java.io.IOException;
 import java.util.Map;
 
 public abstract class Resource implements Writable {
+    private static final Logger LOG = LogManager.getLogger(OdbcCatalogResource.class);
+
     public enum ResourceType {
         UNKNOWN,
         SPARK,
@@ -52,6 +59,9 @@ public abstract class Resource implements Writable {
     @SerializedName(value = "type")
     protected ResourceType type;
 
+    public Resource() {
+    }
+
     public Resource(String name, ResourceType type) {
         this.name = name;
         this.type = type;
@@ -110,5 +120,15 @@ public abstract class Resource implements Writable {
         String json = Text.readString(in);
         return GsonUtils.GSON.fromJson(json, Resource.class);
     }
+
+    @Override
+    public Resource clone() {
+        Resource copied = DeepCopy.copy(this, Resource.class, FeConstants.meta_version);
+        if (copied == null) {
+            LOG.warn("failed to clone odbc resource: " + getName());
+            return null;
+        }
+        return copied;
+    }
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
index d112b28..7714722 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ResourceMgr.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.catalog;
 
-import com.google.common.collect.Lists;
 import org.apache.doris.analysis.CreateResourceStmt;
 import org.apache.doris.analysis.DropResourceStmt;
 import org.apache.doris.catalog.Resource.ResourceType;
@@ -34,8 +33,9 @@ import org.apache.doris.persist.gson.GsonUtils;
 import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.gson.annotations.SerializedName;
+
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -46,6 +46,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.gson.annotations.SerializedName;
+
 /**
  * Resource manager is responsible for managing external resources used by Doris.
  * For example, Spark/MapReduce used for ETL, Spark/GPU used for queries, HDFS/S3 used for external storage.
@@ -70,17 +72,20 @@ public class ResourceMgr implements Writable {
         if (stmt.getResourceType() != ResourceType.SPARK && stmt.getResourceType() != ResourceType.ODBC_CATALOG) {
             throw new DdlException("Only support SPARK and ODBC_CATALOG resource.");
         }
-
-        String resourceName = stmt.getResourceName();
         Resource resource = Resource.fromStmt(stmt);
-        if (nameToResource.putIfAbsent(resourceName, resource) != null) {
-            throw new DdlException("Resource(" + resourceName + ") already exist");
-        }
+        createResource(resource);
         // log add
         Catalog.getCurrentCatalog().getEditLog().logCreateResource(resource);
         LOG.info("create resource success. resource: {}", resource);
     }
 
+    public void createResource(Resource resource) throws DdlException {
+        String resourceName = resource.getName();
+        if (nameToResource.putIfAbsent(resourceName, resource) != null) {
+            throw new DdlException("Resource(" + resourceName + ") already exist");
+        }
+    }
+
     public void replayCreateResource(Resource resource) {
         nameToResource.put(resource.getName(), resource);
     }
@@ -96,6 +101,15 @@ public class ResourceMgr implements Writable {
         LOG.info("drop resource success. resource name: {}", name);
     }
 
+    // Drop resource whether successful or not
+    public void dropResource(Resource resource) {
+        String name = resource.getName();
+        if (nameToResource.remove(name) == null) {
+            LOG.info("resource " + name + " does not exists.");
+            return;
+        }
+    }
+
     public void replayDropResource(DropResourceOperationLog operationLog) {
         nameToResource.remove(operationLog.getName());
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
index 2dd240e..e7ea689 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/SparkResource.java
@@ -26,14 +26,19 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.proc.BaseProcResult;
 import org.apache.doris.load.loadv2.SparkRepository;
 import org.apache.doris.load.loadv2.SparkYarnConfigFiles;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.gson.annotations.SerializedName;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import java.io.File;
 import java.util.Map;
 
+import com.google.gson.annotations.SerializedName;
+
 /**
  * Spark resource for etl or query.
  * working_dir and broker[.xxx] are optional and used in spark ETL.
@@ -61,6 +66,8 @@ import java.util.Map;
  * DROP RESOURCE "spark0";
  */
 public class SparkResource extends Resource {
+    private static final Logger LOG = LogManager.getLogger(SparkResource.class);
+
     private static final String SPARK_MASTER = "spark.master";
     private static final String SPARK_SUBMIT_DEPLOY_MODE = "spark.submit.deployMode";
     private static final String WORKING_DIR = "working_dir";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/View.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/View.java
index 19f1efd..ce5b945 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/View.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/View.java
@@ -21,13 +21,17 @@ import org.apache.doris.analysis.ParseNode;
 import org.apache.doris.analysis.QueryStmt;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.io.DeepCopy;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.common.util.Util;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -47,7 +51,7 @@ import java.util.List;
  * affect the metadata of the underlying tables (if any).
  */
 public class View extends Table {
-    private static final Logger LOG = LogManager.getLogger(Catalog.class);
+    private static final Logger LOG = LogManager.getLogger(View.class);
 
     // The original SQL-string given as view definition. Set during analysis.
     // Corresponds to Hive's viewOriginalText.
@@ -136,6 +140,10 @@ public class View extends Table {
         this.sqlMode = sqlMode;
     }
 
+    public void setSqlMode(long sqlMode) {
+        this.sqlMode = sqlMode;
+    }
+
     public String getInlineViewDef() {
         return inlineViewDef;
     }
@@ -199,7 +207,41 @@ public class View extends Table {
         return explicitColLabels;
     }
 
-    public boolean hasColLabels() { return colLabels_ != null; }
+    public boolean hasColLabels() {
+        return colLabels_ != null;
+    }
+
+    // Get the md5 of signature string of this view.
+    // This method is used to determine whether the views have the same schema.
+    // Contains:
+    // view name, type, full schema, inline view def, sql mode
+    @Override
+    public String getSignature(int signatureVersion) {
+        StringBuilder sb = new StringBuilder(signatureVersion);
+        sb.append(name);
+        sb.append(type);
+        sb.append(Util.getSchemaSignatureString(fullSchema));
+        sb.append(inlineViewDef);
+        sb.append(sqlMode);
+        String md5 = DigestUtils.md5Hex(sb.toString());
+        LOG.debug("get signature of view {}: {}. signature string: {}", name, md5, sb.toString());
+        return md5;
+    }
+
+    @Override
+    public View clone() {
+        View copied = new View();
+        if (!DeepCopy.copy(this, copied, View.class, FeConstants.meta_version)) {
+            LOG.warn("failed to copy view: " + getName());
+            return null;
+        }
+        copied.setSqlMode(this.sqlMode);
+        return copied;
+    }
+
+    public void resetIdsForRestore(Catalog catalog){
+        id = catalog.getNextId();
+    }
 
     @Override
     public void write(DataOutput out) throws IOException {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/AnalysisException.java b/fe/fe-core/src/main/java/org/apache/doris/common/AnalysisException.java
index 066fc4d..74cce3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/AnalysisException.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/AnalysisException.java
@@ -21,6 +21,7 @@ package org.apache.doris.common;
  * Thrown for errors encountered during analysis of a SQL statement.
  */
 public class AnalysisException extends UserException {
+
     public AnalysisException(String msg, Throwable cause) {
         super(msg, cause);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
index ab17d33..3fee46c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java
@@ -200,6 +200,8 @@ public final class FeMetaVersion {
     public static final int VERSION_93 = 93;
     // refactor load job property persist method
     public static final int VERSION_94 = 94;
+    // serialize resources in restore job
+    public static final int VERSION_95 = 95;
     // note: when increment meta version, should assign the latest version to VERSION_CURRENT
-    public static final int VERSION_CURRENT = VERSION_94;
+    public static final int VERSION_CURRENT = VERSION_95;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/io/DeepCopy.java b/fe/fe-core/src/main/java/org/apache/doris/common/io/DeepCopy.java
index 93a5884..bdeac68 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/io/DeepCopy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/io/DeepCopy.java
@@ -17,7 +17,6 @@
 
 package org.apache.doris.common.io;
 
-import org.apache.doris.common.FeConstants;
 import org.apache.doris.meta.MetaContext;
 
 import org.apache.logging.log4j.LogManager;
@@ -34,14 +33,15 @@ import java.lang.reflect.Method;
 public class DeepCopy {
     private static final Logger LOG = LogManager.getLogger(DeepCopy.class);
 
-    public static final String READ_METHOD_NAME = "readFields";
+    public static final String READ_FIELDS_METHOD_NAME = "readFields";
+    public static final String READ_METHOD_NAME = "read";
 
     // deep copy orig to dest.
     // the param "c" is the implementation class of "dest".
     // And the "dest" class must has method "readFields(DataInput)"
-    public static boolean copy(Writable orig, Writable dest, Class c) {
+    public static boolean copy(Writable orig, Writable dest, Class c, int metaVersion) {
         MetaContext metaContext = new MetaContext();
-        metaContext.setMetaVersion(FeConstants.meta_version);
+        metaContext.setMetaVersion(metaVersion);
         metaContext.setThreadLocalInfo();
 
         FastByteArrayOutputStream byteArrayOutputStream = new FastByteArrayOutputStream();
@@ -53,11 +53,10 @@ public class DeepCopy {
 
             DataInputStream in = new DataInputStream(byteArrayOutputStream.getInputStream());
             
-            Method readMethod = c.getDeclaredMethod(READ_METHOD_NAME, DataInput.class);
+            Method readMethod = c.getDeclaredMethod(READ_FIELDS_METHOD_NAME, DataInput.class);
             readMethod.invoke(dest, in);
             in.close();
         } catch (Exception e) {
-            e.printStackTrace();
             LOG.warn("failed to copy object.", e);
             return false;
         } finally {
@@ -65,4 +64,34 @@ public class DeepCopy {
         }
         return true;
     }
+
+    // Deep copy orig to result
+    // The param "c" is the implementation class of "orig"
+    // And the "orig" class must has method "read(DataInput)"
+    // The result is another object which totally exactly same as the orig
+    public static <T> T copy(Writable orig, Class<T> c, int metaVersion) {
+        MetaContext metaContext = new MetaContext();
+        metaContext.setMetaVersion(metaVersion);
+        metaContext.setThreadLocalInfo();
+
+        FastByteArrayOutputStream byteArrayOutputStream = new FastByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(byteArrayOutputStream);
+        try {
+            orig.write(out);
+            out.flush();
+            out.close();
+
+            DataInputStream in = new DataInputStream(byteArrayOutputStream.getInputStream());
+
+            Method readMethod = c.getDeclaredMethod(READ_METHOD_NAME, DataInput.class);
+            T result = (T) readMethod.invoke(orig, in);
+            in.close();
+            return result;
+        } catch (Exception e) {
+            LOG.warn("failed to copy object.", e);
+            return null;
+        } finally {
+            MetaContext.remove();
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
index ea601fa..3cb2aa6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/Util.java
@@ -73,6 +73,7 @@ public class Util {
         TYPE_STRING_MAP.put(PrimitiveType.HLL, "varchar(%d)");
         TYPE_STRING_MAP.put(PrimitiveType.BOOLEAN, "bool");
         TYPE_STRING_MAP.put(PrimitiveType.BITMAP, "bitmap");
+        TYPE_STRING_MAP.put(PrimitiveType.NULL_TYPE, "null");
     }
     
     private static class CmdWorker extends Thread {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 35988d4..b31f99b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -184,7 +184,7 @@ public class EditLog {
                     RefreshExternalTableInfo info = (RefreshExternalTableInfo) journal.getData();
                     LOG.info("Begin to unprotect alter external table schema. db = "
                             + info.getDbName() + " table = " + info.getTableName());
-                    catalog.replayAlterExteranlTableSchema(info.getDbName(), info.getTableName(), info.getNewSchema());
+                    catalog.replayAlterExternalTableSchema(info.getDbName(), info.getTableName(), info.getNewSchema());
                     break;
                 }
                 case OperationType.OP_DROP_TABLE: {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java
index bf6c632..01ae668 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupHandlerTest.java
@@ -32,6 +32,7 @@ import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.catalog.TabletInvertedIndex;
@@ -208,6 +209,8 @@ public class BackupHandlerTest {
                 OlapTable tbl = (OlapTable) db.getTable(CatalogMocker.TEST_TBL_NAME);
                 List<Table> tbls = Lists.newArrayList();
                 tbls.add(tbl);
+                List<Resource> resources = Lists.newArrayList();
+                BackupMeta backupMeta = new BackupMeta(tbls, resources);
                 Map<Long, SnapshotInfo> snapshotInfos = Maps.newHashMap();
                 for (Partition part : tbl.getPartitions()) {
                     for (MaterializedIndex idx : part.getMaterializedIndices(IndexExtState.VISIBLE)) {
@@ -222,7 +225,7 @@ public class BackupHandlerTest {
                 
                 BackupJobInfo info = BackupJobInfo.fromCatalog(System.currentTimeMillis(),
                                                                "ss2", CatalogMocker.TEST_DB_NAME, 
-                                                               CatalogMocker.TEST_DB_ID, tbls, snapshotInfos);
+                                                               CatalogMocker.TEST_DB_ID, backupMeta, snapshotInfos);
                 infos.add(info);
                 return Status.OK;
             }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobInfoTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobInfoTest.java
index 13fb4e8..b0beb30 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobInfoTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobInfoTest.java
@@ -115,6 +115,12 @@ public class BackupJobInfoTest {
                 + "            },\n"
                 + "            \"id\": 10001\n"
                 + "        }\n"
+                + "    },\n"
+                + "    \"new_backup_objects\": {\n"
+                + "        \"views\":[{\n"
+                + "            \"name\": \"view1\",\n"
+                + "            \"id\": \"10006\"\n"
+                + "        }]\n"
                 + "    }\n"
                 + "}";
 
@@ -144,20 +150,23 @@ public class BackupJobInfoTest {
             Assert.fail();
         }
         Assert.assertNotNull(jobInfo);
-        System.out.println(jobInfo.toString(1));
+        System.out.println(jobInfo.toString());
 
         Assert.assertEquals(1522231864000L, jobInfo.backupTime);
         Assert.assertEquals("snapshot1", jobInfo.name);
-        Assert.assertEquals(2, jobInfo.tables.size());
+        Assert.assertEquals(2, jobInfo.backupOlapTableObjects.size());
 
-        Assert.assertEquals(2, jobInfo.getTableInfo("table1").partitions.size());
-        Assert.assertEquals(2, jobInfo.getTableInfo("table1").getPartInfo("partition1").indexes.size());
+        Assert.assertEquals(2, jobInfo.getOlapTableInfo("table1").partitions.size());
+        Assert.assertEquals(2, jobInfo.getOlapTableInfo("table1").getPartInfo("partition1").indexes.size());
         Assert.assertEquals(2,
-                            jobInfo.getTableInfo("table1").getPartInfo("partition1").getIdx("rollup1").tablets.size());
-        System.out.println(jobInfo.getTableInfo("table1").getPartInfo("partition1").getIdx("rollup1").tablets);
+                            jobInfo.getOlapTableInfo("table1").getPartInfo("partition1").getIdx("rollup1").tablets.size());
+        System.out.println(jobInfo.getOlapTableInfo("table1").getPartInfo("partition1").getIdx("rollup1").tablets);
         Assert.assertEquals(2,
-                            jobInfo.getTableInfo("table1").getPartInfo("partition1")
-                            .getIdx("rollup1").getTablet(10007L).files.size());
+                            jobInfo.getOlapTableInfo("table1").getPartInfo("partition1")
+                            .getIdx("rollup1").getTabletFiles(10007L).size());
+
+        Assert.assertEquals(1, jobInfo.newBackupObjects.views.size());
+        Assert.assertEquals("view1", jobInfo.newBackupObjects.views.get(0).name);
 
         File tmpFile = new File("./tmp");
         try {
@@ -174,6 +183,9 @@ public class BackupJobInfoTest {
             Assert.assertEquals(jobInfo.dbId, newInfo.dbId);
             Assert.assertEquals(jobInfo.dbName, newInfo.dbName);
 
+            Assert.assertEquals(jobInfo.newBackupObjects.views.size(), newInfo.newBackupObjects.views.size());
+            Assert.assertEquals("view1", newInfo.newBackupObjects.views.get(0).name);
+
         } catch (IOException e) {
             e.printStackTrace();
             Assert.fail();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
index 9c1d7c1..62a1a3b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/BackupJobTest.java
@@ -312,7 +312,7 @@ public class BackupJobTest {
             restoreJobInfo = BackupJobInfo.fromFile(job.getLocalJobInfoFilePath());
             Assert.assertEquals(UnitTestUtil.DB_NAME, restoreJobInfo.dbName);
             Assert.assertEquals(job.getLabel(), restoreJobInfo.name);
-            Assert.assertEquals(1, restoreJobInfo.tables.size());
+            Assert.assertEquals(1, restoreJobInfo.backupOlapTableObjects.values().size());
         } catch (IOException e) {
             e.printStackTrace();
             Assert.fail();
diff --git a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
index 5766516..87911f1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/backup/RestoreJobTest.java
@@ -19,7 +19,7 @@ package org.apache.doris.backup;
 
 import org.apache.doris.backup.BackupJobInfo.BackupIndexInfo;
 import org.apache.doris.backup.BackupJobInfo.BackupPartitionInfo;
-import org.apache.doris.backup.BackupJobInfo.BackupTableInfo;
+import org.apache.doris.backup.BackupJobInfo.BackupOlapTableInfo;
 import org.apache.doris.backup.BackupJobInfo.BackupTabletInfo;
 import org.apache.doris.backup.RestoreJob.RestoreJobState;
 import org.apache.doris.catalog.Catalog;
@@ -28,6 +28,7 @@ import org.apache.doris.catalog.MaterializedIndex;
 import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.catalog.Partition;
+import org.apache.doris.catalog.Resource;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.Tablet;
 import org.apache.doris.common.AnalysisException;
@@ -217,31 +218,26 @@ public class RestoreJobTest {
         jobInfo.success = true;
         
         expectedRestoreTbl = (OlapTable) db.getTable(CatalogMocker.TEST_TBL2_ID);
-        BackupTableInfo tblInfo = new BackupTableInfo();
+        BackupOlapTableInfo tblInfo = new BackupOlapTableInfo();
         tblInfo.id = CatalogMocker.TEST_TBL2_ID;
-        tblInfo.name = CatalogMocker.TEST_TBL2_NAME;
-        jobInfo.tables.put(tblInfo.name, tblInfo);
+        jobInfo.backupOlapTableObjects.put(CatalogMocker.TEST_TBL2_NAME, tblInfo);
         
         for (Partition partition : expectedRestoreTbl.getPartitions()) {
             BackupPartitionInfo partInfo = new BackupPartitionInfo();
             partInfo.id = partition.getId();
-            partInfo.name = partition.getName();
-            tblInfo.partitions.put(partInfo.name, partInfo);
+            tblInfo.partitions.put(partition.getName(), partInfo);
             
             for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
                 BackupIndexInfo idxInfo = new BackupIndexInfo();
                 idxInfo.id = index.getId();
-                idxInfo.name = expectedRestoreTbl.getIndexNameById(index.getId());
                 idxInfo.schemaHash = expectedRestoreTbl.getSchemaHashByIndexId(index.getId());
-                partInfo.indexes.put(idxInfo.name, idxInfo);
+                partInfo.indexes.put(expectedRestoreTbl.getIndexNameById(index.getId()), idxInfo);
                 
                 for (Tablet tablet : index.getTablets()) {
-                    BackupTabletInfo tabletInfo = new BackupTabletInfo();
-                    tabletInfo.id = tablet.getId();
-                    tabletInfo.files.add(tabletInfo.id + ".dat");
-                    tabletInfo.files.add(tabletInfo.id + ".idx");
-                    tabletInfo.files.add(tabletInfo.id + ".hdr");
-                    idxInfo.tablets.add(tabletInfo);
+                    List<String> files = Lists.newArrayList(tablet.getId() + ".dat",
+                            tablet.getId()+ ".idx",  tablet.getId()+".hdr");
+                    BackupTabletInfo tabletInfo = new BackupTabletInfo(tablet.getId(), files);
+                    idxInfo.sortedTabletInfoList.add(tabletInfo);
                 }
             }
         }
@@ -253,8 +249,9 @@ public class RestoreJobTest {
                 jobInfo, false, 3, 100000, -1, catalog, repo.getId());
         
         List<Table> tbls = Lists.newArrayList();
+        List<Resource> resources = Lists.newArrayList();
         tbls.add(expectedRestoreTbl);
-        backupMeta = new BackupMeta(tbls);
+        backupMeta = new BackupMeta(tbls, resources);
     }
 
     @Ignore
diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/io/DeepCopyTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/io/DeepCopyTest.java
index 06d1561..71b42d7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/common/io/DeepCopyTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/common/io/DeepCopyTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.common.io;
 
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.persist.TableInfo;
 
 import org.junit.Assert;
@@ -28,7 +29,7 @@ public class DeepCopyTest {
     public void test() {
         TableInfo info = TableInfo.createForTableRename(1, 2, "newTbl");
         TableInfo copied = new TableInfo();
-        boolean res = DeepCopy.copy(info, copied, TableInfo.class);
+        boolean res = DeepCopy.copy(info, copied, TableInfo.class, FeConstants.meta_version);
         Assert.assertTrue(res);
         Assert.assertEquals(1, copied.getDbId());
         Assert.assertEquals(2, copied.getTableId());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org