You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/26 03:08:49 UTC

[kylin] branch kylin5 updated: KYLIN-5377 Import models auto load data source tables.

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new ea119412aa KYLIN-5377 Import models auto load data source tables.
ea119412aa is described below

commit ea119412aa082999b7c8881e4828778e1955ff81
Author: Jiale He <35...@users.noreply.github.com>
AuthorDate: Fri Oct 28 17:35:05 2022 +0800

    KYLIN-5377 Import models auto load data source tables.
    
    Co-authored-by: Jiale He <ji...@kyligence.io>
---
 .../org/apache/kylin/metadata/model/TableDesc.java |  20 +-
 .../metadata/model/schema/ImportModelContext.java  |  56 ++--
 .../model/schema/SchemaChangeCheckResult.java      |  14 +-
 .../apache/kylin/metadata/model/TableDescTest.java |  55 ++++
 .../model/schema/ImportModelContextTest.java       |   5 +-
 .../metadata/model/schema/SchemaUtilTest.java      |   2 +-
 .../localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json |  41 +++
 .../rest/controller/NMetaStoreController.java      |  17 +-
 .../rest/controller/NMetaStoreControllerTest.java  |   8 +-
 .../kylin/rest/service/MetaStoreService.java       | 171 ++++++++----
 .../kylin/rest/service/MetaStoreServiceTest.java   | 309 ++++++++++++++++-----
 .../kylin/rest/service/TableServiceTest.java       |  59 ++--
 12 files changed, 553 insertions(+), 204 deletions(-)

diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 6c4da9b82e..33231ff7b9 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -36,8 +37,8 @@ import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.StringSplitter;
 import org.apache.kylin.metadata.MetadataConstants;
-import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.streaming.KafkaConfig;
 import org.apache.kylin.metadata.streaming.KafkaConfigManager;
 
@@ -306,6 +307,23 @@ public class TableDesc extends RootPersistentEntity implements Serializable, ISo
         return null;
     }
 
+    public Pair<Set<ColumnDesc>, Set<ColumnDesc>> findColumns(Set<ColumnDesc> columnDescSet) {
+        Set<ColumnDesc> existColSet = Sets.newHashSet();
+        Set<ColumnDesc> notExistColSet = Sets.newHashSet();
+        if (CollectionUtils.isEmpty(columnDescSet)) {
+            return Pair.newPair(existColSet, notExistColSet);
+        }
+        for (ColumnDesc searchColumnDesc : columnDescSet) {
+            ColumnDesc columnDesc = findColumnByName(searchColumnDesc.getName());
+            if (Objects.isNull(columnDesc)) {
+                notExistColSet.add(searchColumnDesc);
+            } else {
+                existColSet.add(columnDesc);
+            }
+        }
+        return Pair.newPair(existColSet, notExistColSet);
+    }
+
     @Override
     public String getResourcePath() {
         return concatResourcePath(getIdentity(), project);
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ImportModelContext.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ImportModelContext.java
index d0b673f883..24fa7559ca 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ImportModelContext.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ImportModelContext.java
@@ -41,10 +41,9 @@ import org.apache.kylin.common.persistence.InMemResourceStore;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.cube.model.SelectRule;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.cube.cuboid.NAggregationGroup;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
 import org.apache.kylin.metadata.cube.model.IndexPlan;
@@ -56,6 +55,8 @@ import org.apache.kylin.metadata.cube.model.RuleBasedIndex;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.recommendation.candidate.RawRecItem;
 import org.apache.kylin.metadata.recommendation.entity.DimensionRecItemV2;
 import org.apache.kylin.metadata.recommendation.entity.LayoutRecItemV2;
@@ -94,6 +95,10 @@ public class ImportModelContext implements AutoCloseable {
     private final NTableMetadataManager importTableMetadataManager;
     private final NIndexPlanManager importIndexPlanManager;
 
+    @Getter
+    private final List<TableDesc> targetMissTableList;
+    @Getter
+    private final List<TableDesc> loadTableList;
     @Getter
     private final Map<String, String> newModels;
     private final List<String> unImportModels;
@@ -133,31 +138,39 @@ public class ImportModelContext implements AutoCloseable {
 
         targetKylinConfig.setProperty("kylin.metadata.validate-computed-column", "false");
 
+        val pairTable = getPairTable();
+        targetMissTableList = pairTable.getFirst();
+        loadTableList = pairTable.getSecond();
         loadTable();
         loadModel();
     }
 
-    private void loadTable() {
+    private Pair<List<TableDesc>, List<TableDesc>> getPairTable() {
+        List<TableDesc> missTables = Lists.newArrayList();
+        List<TableDesc> loadTables = Lists.newArrayList();
+
         List<TableDesc> tables = importTableMetadataManager.listAllTables();
         for (TableDesc tableDesc : tables) {
             TableDesc newTable = targetTableMetadataManager.copyForWrite(tableDesc);
             TableDesc originalTable = targetTableMetadataManager.getTableDesc(newTable.getIdentity());
-            long mvcc = -1;
-            if (originalTable != null) {
-                mvcc = originalTable.getMvcc();
-            }
-            newTable.setMvcc(mvcc);
             newTable.setLastModified(System.currentTimeMillis());
-            targetTableMetadataManager.saveSourceTable(newTable);
+            if (Objects.isNull(originalTable)) {
+                newTable.setMvcc(-1);
+                missTables.add(newTable);
+            } else {
+                newTable.setMvcc(originalTable.getMvcc());
+            }
+            loadTables.add(newTable);
+        }
+        return Pair.newPair(missTables, loadTables);
+    }
+
+    private void loadTable() {
+        for (TableDesc tableDesc : loadTableList) {
+            targetTableMetadataManager.saveSourceTable(tableDesc);
         }
     }
 
-    /**
-     *
-     * @param newDataModel
-     * @param importModel
-     * @throws IOException
-     */
     private void createNewModel(NDataModel newDataModel, NDataModel importModel) throws IOException {
         newDataModel.setProject(targetProject);
         newDataModel.setAlias(newModels.getOrDefault(importModel.getAlias(), newDataModel.getAlias()));
@@ -180,7 +193,6 @@ public class ImportModelContext implements AutoCloseable {
      *
      * @param originalDataModel model from current env
      * @param newDataModel model from import
-     * @return
      */
     private static Map<Integer, Integer> prepareIdChangedMap(NDataModel originalDataModel, NDataModel newDataModel) {
         Map<Integer, Integer> idChangedMap = new HashMap<>();
@@ -237,12 +249,6 @@ public class ImportModelContext implements AutoCloseable {
         return idChangedMap;
     }
 
-    /**
-     *
-     * @param newDataModel
-     * @param originalDataModel
-     * @param hasModelOverrideProps
-     */
     private void updateModel(NDataModel newDataModel, NDataModel originalDataModel, boolean hasModelOverrideProps) {
         newDataModel.setUuid(originalDataModel.getUuid());
         newDataModel.setProject(targetProject);
@@ -254,12 +260,6 @@ public class ImportModelContext implements AutoCloseable {
         targetDataModelManager.updateDataModelDesc(newDataModel);
     }
 
-    /**
-     *
-     * @param originalDataModel
-     * @param targetIndexPlan
-     * @param hasModelOverrideProps
-     */
     private void updateIndexPlan(NDataModel originalDataModel, IndexPlan targetIndexPlan,
             boolean hasModelOverrideProps) {
         targetIndexPlanManger.updateIndexPlan(originalDataModel.getUuid(), copyForWrite -> {
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java
index b64c1321fb..90b116cd85 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java
@@ -24,11 +24,13 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Stream;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonUnwrapped;
+import com.google.common.collect.Sets;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
@@ -65,13 +67,13 @@ public class SchemaChangeCheckResult {
         @JsonProperty("importable")
         public boolean importable() {
             return Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream)
-                    .allMatch(BaseItem::isImportable);
+                    .allMatch(BaseItem::isImportable) || isLoadTableAble();
         }
 
         @JsonProperty("creatable")
         public boolean creatable() {
             return Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream)
-                    .allMatch(BaseItem::isCreatable);
+                    .allMatch(BaseItem::isCreatable) || isLoadTableAble();
         }
 
         @JsonProperty("")
@@ -89,6 +91,14 @@ public class SchemaChangeCheckResult {
             return Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream)
                     .allMatch(BaseItem::isHasSameName);
         }
+
+        @Setter
+        @JsonIgnore
+        private boolean loadTableAble = false;
+
+        @Getter
+        @JsonIgnore
+        private Set<String> loadTables = Sets.newHashSet();
     }
 
     @Data
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java
index 66e6036843..7eff5cffa9 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.model;
 import static org.apache.kylin.metadata.model.NTableMetadataManager.getInstance;
 
 import java.util.Locale;
+import java.util.Set;
 
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.junit.After;
@@ -28,6 +29,10 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Sets;
+
+import lombok.val;
+
 public class TableDescTest extends NLocalFileMetadataTestCase {
     private final String project = "default";
     private NTableMetadataManager tableMetadataManager;
@@ -67,4 +72,54 @@ public class TableDescTest extends NLocalFileMetadataTestCase {
         final TableDesc tableDesc = tableMetadataManager.getTableDesc(tableName);
         Assert.assertFalse(tableDesc.isRangePartition());
     }
+
+    @Test
+    public void testFindColumns() {
+        final String tableName = "DEFAULT.TEST_KYLIN_FACT";
+        final TableDesc tableDesc = tableMetadataManager.getTableDesc(tableName);
+        ColumnDesc[] columns = tableDesc.getColumns();
+        Assert.assertEquals(12, columns.length);
+
+        {
+            // test search column empty
+            Set<ColumnDesc> searchColSet = Sets.newHashSet();
+            val pair = tableDesc.findColumns(searchColSet);
+            Assert.assertTrue(pair.getFirst().isEmpty());
+            Assert.assertTrue(pair.getSecond().isEmpty());
+        }
+
+        {
+            // test all founded
+            Set<ColumnDesc> searchColSet = Sets.newHashSet(
+                    new ColumnDesc("1", "TRANS_ID", "bigint", "TRANS_ID", "", "", ""),
+                    new ColumnDesc("2", "ORDER_ID", "bigint", "TRANS_ID", "", "", ""));
+            val pair = tableDesc.findColumns(searchColSet);
+            Assert.assertFalse(pair.getFirst().isEmpty());
+            Assert.assertTrue(pair.getSecond().isEmpty());
+            Assert.assertEquals(2, pair.getFirst().size());
+        }
+
+        {
+            // test part founded
+            Set<ColumnDesc> searchColSet = Sets.newHashSet(
+                    new ColumnDesc("1", "TRANS_ID_1", "bigint", "TRANS_ID", "", "", ""),
+                    new ColumnDesc("2", "ORDER_ID", "bigint", "TRANS_ID", "", "", ""));
+            val pair = tableDesc.findColumns(searchColSet);
+            Assert.assertFalse(pair.getFirst().isEmpty());
+            Assert.assertFalse(pair.getSecond().isEmpty());
+            Assert.assertEquals(1, pair.getFirst().size());
+            Assert.assertEquals(1, pair.getSecond().size());
+        }
+
+        {
+            // test part founded
+            Set<ColumnDesc> searchColSet = Sets.newHashSet(
+                    new ColumnDesc("1", "TRANS_ID_1", "bigint", "TRANS_ID", "", "", ""),
+                    new ColumnDesc("2", "ORDER_ID_1", "bigint", "TRANS_ID", "", "", ""));
+            val pair = tableDesc.findColumns(searchColSet);
+            Assert.assertTrue(pair.getFirst().isEmpty());
+            Assert.assertFalse(pair.getSecond().isEmpty());
+            Assert.assertEquals(2, pair.getSecond().size());
+        }
+    }
 }
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/ImportModelContextTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/ImportModelContextTest.java
index 8cd432a3ef..a066863c12 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/ImportModelContextTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/ImportModelContextTest.java
@@ -150,9 +150,8 @@ public class ImportModelContextTest extends NLocalFileMetadataTestCase {
 
         ResourceStore.setRS(importKylinConfig, importResourceStore);
 
-        rawResourceMap.forEach((resPath, raw) -> {
-            importResourceStore.putResourceWithoutCheck(resPath, raw.getByteSource(), raw.getTimestamp(), 0);
-        });
+        rawResourceMap.forEach((resPath, raw) -> importResourceStore.putResourceWithoutCheck(resPath,
+                raw.getByteSource(), raw.getTimestamp(), 0));
 
         return importKylinConfig;
     }
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java
index 92cb7d2d53..4b43a9e538 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java
@@ -40,8 +40,8 @@ import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.msg.MsgPicker;
 import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.metadata.model.ModelJoinRelationTypeEnum;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.ModelJoinRelationTypeEnum;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
diff --git a/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json b/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json
new file mode 100644
index 0000000000..88ecdea540
--- /dev/null
+++ b/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json
@@ -0,0 +1,41 @@
+{
+  "uuid" : "d70320ec-949f-44df-8bf4-92dc005dd07d",
+  "version" : "2.1",
+  "name" : "CUSTOMER_NEW",
+  "columns" : [ {
+    "id" : "1",
+    "name" : "C_CUSTKEY",
+    "datatype" : "integer"
+  }, {
+    "id" : "2",
+    "name" : "C_NAME",
+    "datatype" : "varchar(25)"
+  }, {
+    "id" : "3",
+    "name" : "C_ADDRESS",
+    "datatype" : "varchar(40)"
+  }, {
+    "id" : "4",
+    "name" : "C_CITY",
+    "datatype" : "varchar(10)"
+  }, {
+    "id" : "5",
+    "name" : "C_NATION",
+    "datatype" : "varchar(15)"
+  }, {
+    "id" : "6",
+    "name" : "C_REGION",
+    "datatype" : "varchar(12)"
+  }, {
+    "id" : "7",
+    "name" : "C_PHONE",
+    "datatype" : "varchar(15)"
+  }, {
+    "id" : "8",
+    "name" : "C_MKTSEGMENT",
+    "datatype" : "varchar(10)"
+  } ],
+  "database" : "SSB",
+  "last_modified" : 1457444146362,
+  "source_type" : 9
+}
\ No newline at end of file
diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NMetaStoreController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NMetaStoreController.java
index fbccca969e..84649c4bcb 100644
--- a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NMetaStoreController.java
+++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/NMetaStoreController.java
@@ -18,10 +18,12 @@
 
 package org.apache.kylin.rest.controller;
 
+import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
 import static org.apache.kylin.common.exception.ServerErrorCode.EMPTY_MODEL_ID;
 import static org.apache.kylin.common.exception.ServerErrorCode.FILE_FORMAT_ERROR;
 import static org.apache.kylin.common.exception.ServerErrorCode.FILE_NOT_EXIST;
-import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
+import static org.apache.kylin.rest.request.ModelImportRequest.ImportType.NEW;
+import static org.apache.kylin.rest.request.ModelImportRequest.ImportType.OVERWRITE;
 import static org.springframework.http.MediaType.MULTIPART_FORM_DATA_VALUE;
 
 import java.io.ByteArrayInputStream;
@@ -37,14 +39,14 @@ import javax.xml.bind.DatatypeConverter;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.kylin.common.exception.KylinException;
-import org.apache.kylin.common.util.ZipFileUtils;
-import org.apache.kylin.rest.response.EnvelopeResponse;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
+import org.apache.kylin.common.util.ZipFileUtils;
 import org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult;
 import org.apache.kylin.rest.request.MetadataCleanupRequest;
 import org.apache.kylin.rest.request.ModelImportRequest;
 import org.apache.kylin.rest.request.ModelPreviewRequest;
 import org.apache.kylin.rest.request.StorageCleanupRequest;
+import org.apache.kylin.rest.response.EnvelopeResponse;
 import org.apache.kylin.rest.response.ModelPreviewResponse;
 import org.apache.kylin.rest.service.MetaStoreService;
 import org.apache.kylin.tool.util.HashFunction;
@@ -61,6 +63,8 @@ import org.springframework.web.bind.annotation.RequestPart;
 import org.springframework.web.bind.annotation.ResponseBody;
 import org.springframework.web.multipart.MultipartFile;
 
+import com.google.common.collect.Lists;
+
 import io.swagger.annotations.ApiOperation;
 
 @Controller
@@ -71,6 +75,8 @@ public class NMetaStoreController extends NBasicController {
     @Qualifier("metaStoreService")
     private MetaStoreService metaStoreService;
 
+    private static final List<ModelImportRequest.ImportType> IMPORT_TYPE = Lists.newArrayList(NEW, OVERWRITE);
+
     @ApiOperation(value = "previewModels", tags = { "MID" })
     @GetMapping(value = "/previews/models")
     @ResponseBody
@@ -133,13 +139,10 @@ public class NMetaStoreController extends NBasicController {
         checkProjectName(project);
         checkUploadFile(metadataFile);
         if (request.getModels().stream()
-                .noneMatch(modelImport -> modelImport.getImportType() == ModelImportRequest.ImportType.NEW
-                        || modelImport.getImportType() == ModelImportRequest.ImportType.OVERWRITE)) {
+                .noneMatch(modelImport -> IMPORT_TYPE.contains(modelImport.getImportType()))) {
             throw new KylinException(EMPTY_MODEL_ID, "At least one model should be selected to import!");
         }
-
         metaStoreService.importModelMetadata(project, metadataFile, request);
-
         return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, "", "");
     }
 
diff --git a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NMetaStoreControllerTest.java b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NMetaStoreControllerTest.java
index 2999d12365..d592092295 100644
--- a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NMetaStoreControllerTest.java
+++ b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NMetaStoreControllerTest.java
@@ -21,8 +21,8 @@ package org.apache.kylin.rest.controller;
 import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -30,9 +30,9 @@ import java.util.List;
 import javax.servlet.http.HttpServletResponse;
 
 import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult;
+import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.ModelImportRequest;
 import org.apache.kylin.rest.request.ModelPreviewRequest;
 import org.apache.kylin.rest.service.MetaStoreService;
@@ -116,7 +116,7 @@ public class NMetaStoreControllerTest extends NLocalFileMetadataTestCase {
     public void testUploadAndCheckModelMetadata() throws Exception {
         File file = new File("src/test/resources/ut_model_metadata/ut_model_matadata.zip");
         MockMultipartFile multipartFile = new MockMultipartFile("file", "ut_model_matadata.zip", "text/plain",
-                new FileInputStream(file));
+                Files.newInputStream(file.toPath()));
 
         SchemaChangeCheckResult schemaChangeCheckResult = new SchemaChangeCheckResult();
         Mockito.when(metaStoreService.checkModelMetadata("default", multipartFile, null))
@@ -135,7 +135,7 @@ public class NMetaStoreControllerTest extends NLocalFileMetadataTestCase {
     public void testImportModelMetadata() throws Throwable {
         File file = new File("src/test/resources/ut_model_metadata/ut_model_matadata.zip");
         MockMultipartFile multipartFile = new MockMultipartFile("file", "ut_model_matadata.zip", "text/plain",
-                new FileInputStream(file));
+                Files.newInputStream(file.toPath()));
 
         final ModelImportRequest request = new ModelImportRequest();
         List<ModelImportRequest.ModelImport> models = new ArrayList<>();
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
index d6df59f000..21097bcd85 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
@@ -28,6 +28,10 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_
 import static org.apache.kylin.common.persistence.ResourceStore.METASTORE_UUID_TAG;
 import static org.apache.kylin.common.persistence.ResourceStore.VERSION_FILE;
 import static org.apache.kylin.metadata.model.schema.ImportModelContext.MODEL_REC_PATH;
+import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.DIFFERENT_CC_NAME_HAS_SAME_EXPR;
+import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.SAME_CC_NAME_HAS_DIFFERENT_EXPR;
+import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.TABLE_COLUMN_DATATYPE_CHANGED;
+import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_TABLE;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -35,6 +39,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.LinkedHashMap;
@@ -73,6 +78,7 @@ import org.apache.kylin.metadata.cube.model.IndexPlan;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.apache.kylin.metadata.cube.model.RuleBasedIndex;
+import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.MultiPartitionDesc;
 import org.apache.kylin.metadata.model.NDataModel;
@@ -87,16 +93,20 @@ import org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult;
 import org.apache.kylin.metadata.model.schema.SchemaNodeType;
 import org.apache.kylin.metadata.model.schema.SchemaUtil;
 import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.query.util.QueryHisStoreUtil;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.aspect.Transaction;
 import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum;
 import org.apache.kylin.rest.request.ModelImportRequest;
 import org.apache.kylin.rest.request.UpdateRuleBasedCuboidRequest;
+import org.apache.kylin.rest.response.LoadTableResponse;
 import org.apache.kylin.rest.response.ModelPreviewResponse;
 import org.apache.kylin.rest.response.SimplifiedTablePreviewResponse;
 import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.AclPermissionUtil;
+import org.apache.kylin.source.ISourceMetadataExplorer;
+import org.apache.kylin.source.SourceFactory;
 import org.apache.kylin.tool.routine.RoutineTool;
 import org.apache.kylin.tool.util.HashFunction;
 import org.slf4j.Logger;
@@ -107,6 +117,7 @@ import org.springframework.web.multipart.MultipartFile;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import io.kyligence.kap.guava20.shaded.common.io.ByteSource;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
@@ -126,6 +137,9 @@ public class MetaStoreService extends BasicService {
     private static final Pattern MD5_PATTERN = Pattern.compile(".*([a-fA-F\\d]{32})\\.zip");
     private static final String RULE_SCHEDULER_DATA_KEY = "kylin.index.rule-scheduler-data";
 
+    private static final Set<SchemaChangeCheckResult.UN_IMPORT_REASON> UN_IMPORT_REASONS = Sets.newHashSet(
+            SAME_CC_NAME_HAS_DIFFERENT_EXPR, DIFFERENT_CC_NAME_HAS_SAME_EXPR, TABLE_COLUMN_DATATYPE_CHANGED);
+
     @Autowired
     public AclEvaluate aclEvaluate;
 
@@ -135,6 +149,9 @@ public class MetaStoreService extends BasicService {
     @Autowired
     public IndexPlanService indexPlanService;
 
+    @Autowired
+    public TableExtService tableExtService;
+
     @Setter
     @Autowired(required = false)
     private List<ModelChangeSupporter> modelChangeSupporters = Lists.newArrayList();
@@ -409,7 +426,75 @@ public class MetaStoreService extends BasicService {
         SchemaUtil.SchemaDifference difference = SchemaUtil.diff(targetProject, KylinConfig.getInstanceFromEnv(),
                 context.getTargetKylinConfig());
 
-        return ModelImportChecker.check(difference, context);
+        SchemaChangeCheckResult checkResult = ModelImportChecker.check(difference, context);
+
+        Set<String> loadAbleTables = getLoadAbleTables(targetProject, context.getTargetMissTableList());
+        if (CollectionUtils.isEmpty(loadAbleTables)) {
+            return checkResult;
+        }
+        // mark every model loadTableAble
+        return checkTableLoadAble(loadAbleTables, checkResult);
+    }
+
+    public SchemaChangeCheckResult checkTableLoadAble(Set<String> loadAbleTables, SchemaChangeCheckResult checkResult) {
+        checkResult.getModels().forEach((modelName, change) -> {
+            if (change.creatable() || change.importable() || change.overwritable()) {
+                return;
+            }
+            // Verify that tables used by the model can be fully loaded
+            Set<String> missedTableSet = change.getMissingItems().stream()//
+                    .filter(item -> item.getType().equals(MODEL_TABLE))
+                    .map(SchemaChangeCheckResult.ChangedItem::getDetail).collect(Collectors.toSet());
+            if (missedTableSet.isEmpty() || !loadAbleTables.containsAll(missedTableSet)) {
+                return;
+            }
+            // Verify that model has no conflicts
+            List<SchemaChangeCheckResult.BaseItem> items = Lists.newArrayList();
+            items.addAll(change.getNewItems());
+            items.addAll(change.getUpdateItems());
+            boolean hasConflict = items.stream().anyMatch(item -> {
+                val reason = item.getConflictReason().getReason();
+                return UN_IMPORT_REASONS.contains(reason);
+            });
+            if (hasConflict) {
+                return;
+            }
+            change.setLoadTableAble(true);
+            change.getLoadTables().addAll(missedTableSet);
+        });
+        return checkResult;
+    }
+
+    public Set<String> getLoadAbleTables(String targetProject, List<TableDesc> missTableList) {
+        if (CollectionUtils.isEmpty(missTableList)) {
+            return Sets.newHashSet();
+        }
+        ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
+                .getProject(targetProject);
+        ISourceMetadataExplorer explorer = SourceFactory.getSource(projectInstance).getSourceMetadataExplorer();
+        Set<String> loadAbleList = Sets.newHashSet();
+        for (TableDesc missTableDesc : missTableList) {
+            try {
+                // get new table desc from datasource
+                TableDesc newTableDesc = explorer
+                        .loadTableMetadata(missTableDesc.getDatabase(), missTableDesc.getName(), targetProject)
+                        .getFirst();
+                // check column all exists
+                Set<ColumnDesc> columnDescList = Arrays.stream(missTableDesc.getColumns())
+                        .filter(col -> !col.isComputedColumn()).collect(Collectors.toSet());
+                Set<ColumnDesc> notExistColSet = newTableDesc.findColumns(columnDescList).getSecond();
+                if (CollectionUtils.isNotEmpty(notExistColSet)) {
+                    // some column not exist in new table desc, mark table cannot load
+                    String missCols = notExistColSet.stream().map(ColumnDesc::getName).collect(Collectors.joining(","));
+                    logger.warn("Can not find columns [{}] in table [{}]", missCols, newTableDesc.getIdentity());
+                    continue;
+                }
+                loadAbleList.add(newTableDesc.getIdentity());
+            } catch (Exception e) {
+                logger.warn("try load table: {} failed.", missTableDesc.getIdentity(), e);
+            }
+        }
+        return loadAbleList;
     }
 
     private void checkModelMetadataFile(MetadataStore metadataStore, Set<String> rawResourceList) {
@@ -431,13 +516,6 @@ public class MetaStoreService extends BasicService {
         return anyPath.split(File.separator)[1];
     }
 
-    /**
-     *
-     * @param nDataModel
-     * @param modelImport
-     * @param project
-     * @param importIndexPlanManager
-     */
     private void createNewModel(NDataModel nDataModel, ModelImportRequest.ModelImport modelImport, String project,
             NIndexPlanManager importIndexPlanManager) {
         NDataModelManager dataModelManager = getManager(NDataModelManager.class, project);
@@ -460,13 +538,6 @@ public class MetaStoreService extends BasicService {
         dataflowManager.createDataflow(indexPlan, nDataModel.getOwner(), RealizationStatusEnum.OFFLINE);
     }
 
-    /**
-     *
-     * @param project
-     * @param nDataModel
-     * @param modelImport
-     * @param hasModelOverrideProps
-     */
     private void updateModel(String project, NDataModel nDataModel, ModelImportRequest.ModelImport modelImport,
             boolean hasModelOverrideProps) {
         NDataModelManager dataModelManager = getManager(NDataModelManager.class, project);
@@ -497,13 +568,6 @@ public class MetaStoreService extends BasicService {
         dataModelManager.updateDataModelDesc(nDataModel);
     }
 
-    /**
-     *
-     * @param project
-     * @param nDataModel
-     * @param targetIndexPlan
-     * @param hasModelOverrideProps
-     */
     private void updateIndexPlan(String project, NDataModel nDataModel, IndexPlan targetIndexPlan,
             boolean hasModelOverrideProps) {
         NIndexPlanManager indexPlanManager = getManager(NIndexPlanManager.class, project);
@@ -532,12 +596,6 @@ public class MetaStoreService extends BasicService {
         });
     }
 
-    /**
-     *
-     * @param project
-     * @param modelSchemaChange
-     * @param targetIndexPlan
-     */
     private void removeIndexes(String project, SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange,
             IndexPlan targetIndexPlan) {
         if (modelSchemaChange != null) {
@@ -555,12 +613,6 @@ public class MetaStoreService extends BasicService {
         }
     }
 
-    /**
-     *
-     * @param project
-     * @param modelSchemaChange
-     * @param targetIndexPlan
-     */
     private void addWhiteListIndex(String project, SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange,
             IndexPlan targetIndexPlan) {
         if (modelSchemaChange != null) {
@@ -585,7 +637,7 @@ public class MetaStoreService extends BasicService {
 
     @Transaction(project = 0, retry = 1)
     public void importModelMetadata(String project, MultipartFile metadataFile, ModelImportRequest request)
-            throws IOException {
+            throws Exception {
         aclEvaluate.checkProjectWritePermission(project);
 
         List<Exception> exceptions = new ArrayList<>();
@@ -602,23 +654,50 @@ public class MetaStoreService extends BasicService {
         }
     }
 
+    public LoadTableResponse innerLoadTables(String project, Set<SchemaChangeCheckResult.ModelSchemaChange> changes)
+            throws Exception {
+        Set<String> loadTables = Sets.newHashSet();
+        changes.forEach(change -> loadTables.addAll(change.getLoadTables()));
+        return tableExtService.loadDbTables(loadTables.toArray(new String[0]), project, false);
+    }
+
     private void innerImportModelMetadata(String project, MultipartFile metadataFile, ModelImportRequest request,
-            ImportModelContext importModelContext, List<Exception> exceptions) throws IOException {
-        val schemaChangeCheckResult = checkModelMetadata(project, importModelContext, metadataFile);
+            ImportModelContext context, List<Exception> exceptions) throws Exception {
+        val schemaChangeCheckResult = checkModelMetadata(project, context, metadataFile);
 
-        val importDataModelManager = NDataModelManager.getInstance(importModelContext.getTargetKylinConfig(), project);
-        val importIndexPlanManager = NIndexPlanManager.getInstance(importModelContext.getTargetKylinConfig(), project);
+        val schemaChanges = schemaChangeCheckResult.getModels().entrySet().stream()//
+                .filter(entry -> context.getNewModels().containsValue(entry.getKey())).map(Map.Entry::getValue)
+                .collect(Collectors.toSet());
+        boolean needLoadTable = schemaChanges.stream().anyMatch(change -> !change.getLoadTables().isEmpty());
+        LoadTableResponse loadTableResponse = null;
+        if (needLoadTable) {
+            loadTableResponse = innerLoadTables(project, schemaChanges);
+            if (CollectionUtils.isNotEmpty(loadTableResponse.getFailed())) {
+                String loadFailedTables = String.join(",", loadTableResponse.getFailed());
+                logger.warn("Load Table failed: [{}]", loadFailedTables);
+            }
+        }
+
+        KylinConfig targetKylinConfig = context.getTargetKylinConfig();
+        val importDataModelManager = NDataModelManager.getInstance(targetKylinConfig, project);
+        val importIndexPlanManager = NIndexPlanManager.getInstance(targetKylinConfig, project);
 
         for (ModelImportRequest.ModelImport modelImport : request.getModels()) {
             try {
                 validateModelImport(project, modelImport, schemaChangeCheckResult);
                 if (modelImport.getImportType() == ModelImportRequest.ImportType.NEW) {
+                    val modelSchemaChange = schemaChangeCheckResult.getModels().get(modelImport.getTargetName());
+                    if (needLoadTable && modelSchemaChange.isLoadTableAble()) {
+                        Set<String> needLoadTables = modelSchemaChange.getLoadTables();
+                        if (!loadTableResponse.getLoaded().containsAll(needLoadTables)) {
+                            logger.warn("Import model [{}] failed, skip import.", modelImport.getOriginalName());
+                            continue;
+                        }
+                    }
                     var importDataModel = importDataModelManager.getDataModelDescByAlias(modelImport.getTargetName());
                     var nDataModel = importDataModelManager.copyForWrite(importDataModel);
-
                     createNewModel(nDataModel, modelImport, project, importIndexPlanManager);
-                    importRecommendations(project, nDataModel.getUuid(), importDataModel.getUuid(),
-                            importModelContext.getTargetKylinConfig());
+                    importRecommendations(project, nDataModel.getUuid(), importDataModel.getUuid(), targetKylinConfig);
                 } else if (modelImport.getImportType() == ModelImportRequest.ImportType.OVERWRITE) {
                     val importDataModel = importDataModelManager.getDataModelDescByAlias(modelImport.getOriginalName());
                     val nDataModel = importDataModelManager.copyForWrite(importDataModel);
@@ -640,7 +719,7 @@ public class MetaStoreService extends BasicService {
                     addWhiteListIndex(project, modelSchemaChange, targetIndexPlan);
 
                     importRecommendations(project, nDataModel.getUuid(), importDataModel.getUuid(),
-                            importModelContext.getTargetKylinConfig());
+                            targetKylinConfig);
                 }
             } catch (Exception e) {
                 logger.warn("Import model {} exception", modelImport.getOriginalName(), e);
@@ -698,12 +777,6 @@ public class MetaStoreService extends BasicService {
         }
     }
 
-    /**
-     * @param project
-     * @param targetModelId
-     * @param srcModelId
-     * @param kylinConfig
-     */
     private void importRecommendations(String project, String targetModelId, String srcModelId, KylinConfig kylinConfig)
             throws IOException {
         val projectManager = getManager(NProjectManager.class);
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java
index 50ea987fc5..dbbedc48bc 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java
@@ -20,6 +20,8 @@ package org.apache.kylin.rest.service;
 
 import static org.apache.kylin.common.constant.Constants.KE_VERSION;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_ID_NOT_EXIST;
+import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.SAME_CC_NAME_HAS_DIFFERENT_EXPR;
+import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_TABLE;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -28,6 +30,7 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -36,6 +39,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
@@ -65,6 +69,7 @@ import org.apache.kylin.metadata.model.MultiPartitionDesc;
 import org.apache.kylin.metadata.model.MultiPartitionKeyMappingImpl;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult;
@@ -74,6 +79,7 @@ import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.rest.constant.Constant;
 import org.apache.kylin.rest.request.ModelConfigRequest;
 import org.apache.kylin.rest.request.ModelImportRequest;
+import org.apache.kylin.rest.response.LoadTableResponse;
 import org.apache.kylin.rest.response.ModelPreviewResponse;
 import org.hamcrest.BaseMatcher;
 import org.hamcrest.Description;
@@ -99,6 +105,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 import io.kyligence.kap.guava20.shaded.common.io.ByteSource;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
@@ -129,7 +136,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void setup() {
         createTestMetadata("src/test/resources/ut_meta/metastore_model");
         overwriteSystemProp("HADOOP_USER_NAME", "root");
-        ReflectionTestUtils.setField(metaStoreService, "modelChangeSupporters", Arrays.asList(modelChangeSupporter));
+        ReflectionTestUtils.setField(metaStoreService, "modelChangeSupporters",
+                Collections.singletonList(modelChangeSupporter));
         try {
             SecurityContextHolder.getContext().setAuthentication(authentication);
             jdbcTemplate = JdbcUtil.getJdbcTemplate(getTestConfig());
@@ -166,11 +174,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
         Assert.assertTrue(
                 modelPreviewResponseList.stream().anyMatch(ModelPreviewResponse::isHasMultiplePartitionValues));
 
-        val dfMgr = modelService.getManager(NDataflowManager.class, "default");
         val id = "7212bf0c-0716-4cef-b623-69c161981262";
-        val dataflow = dfMgr.getDataflow(id);
         val idxPlanMgr = modelService.getManager(NIndexPlanManager.class, "default");
-        val indexPlan = idxPlanMgr.getIndexPlan(id);
 
         idxPlanMgr.updateIndexPlan(id, updater -> {
             val overrideProps = new LinkedHashMap<String, String>();
@@ -365,7 +370,7 @@ public class MetaStoreServiceTest extends ServiceTestBase {
 
         RawResource rw = rawResourceMap.get(ResourceStore.VERSION_FILE);
         try (InputStream inputStream = rw.getByteSource().openStream()) {
-            Assert.assertEquals("unknown", IOUtils.toString(inputStream));
+            Assert.assertEquals("unknown", IOUtils.toString(inputStream, StandardCharsets.UTF_8));
         }
 
         overwriteSystemProp(KE_VERSION, "4.3.x");
@@ -378,38 +383,37 @@ public class MetaStoreServiceTest extends ServiceTestBase {
 
         rw = rawResourceMap.get(ResourceStore.VERSION_FILE);
         try (InputStream inputStream = rw.getByteSource().openStream()) {
-            Assert.assertEquals("4.3.x", IOUtils.toString(inputStream));
+            Assert.assertEquals("4.3.x", IOUtils.toString(inputStream, StandardCharsets.UTF_8));
         }
     }
 
     @Test
-    public void testExportNotExistsModel() throws Exception {
+    public void testExportNotExistsModel() {
         String notExistsUuid = RandomUtil.randomUUIDStr();
-        thrown.expect(KylinException.class);
-        thrown.expectMessage(MODEL_ID_NOT_EXIST.getMsg(notExistsUuid));
-        metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, Lists.newArrayList(notExistsUuid), false, false,
-                false);
+        List<String> modelList = Lists.newArrayList(notExistsUuid);
+        Assert.assertThrows(MODEL_ID_NOT_EXIST.getMsg(notExistsUuid), KylinException.class,
+                () -> metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, modelList, false, false, false));
     }
 
     @Test
-    public void testExportBrokenModel() throws Exception {
+    public void testExportBrokenModel() {
         // broken model id
         String brokenModelId = "8b5a2d39-304f-4a20-a9da-942f461534d8";
-        thrown.expect(KylinException.class);
-        thrown.expectMessage(String.format(Locale.ROOT,
+        String msg = String.format(Locale.ROOT,
                 "Can’t export model \"%s\"  as it’s in \"BROKEN\" status. Please re-select and try again.",
-                brokenModelId));
-        metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, Lists.newArrayList(brokenModelId), false, false,
-                false);
+                brokenModelId);
+        List<String> modelList = Lists.newArrayList(brokenModelId);
+        Assert.assertThrows(msg, KylinException.class,
+                () -> metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, modelList, false, false, false));
 
     }
 
     @Test
-    public void testExportEmptyModel() throws Exception {
+    public void testExportEmptyModel() {
         // empty model list
-        thrown.expect(KylinException.class);
-        thrown.expectMessage("Please select at least one model to export.");
-        metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, Lists.newArrayList(), false, false, false);
+        List<String> emptyList = Lists.newArrayList();
+        Assert.assertThrows("Please select at least one model to export.", KylinException.class,
+                () -> metaStoreService.getCompressedModelMetadata(PROJECT_DEFAULT, emptyList, false, false, false));
     }
 
     private Map<String, RawResource> getRawResourceFromZipFile(InputStream inputStream) throws IOException {
@@ -434,7 +438,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testCheckModelMetadataModelCCUpdate() throws IOException {
         val file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -455,7 +460,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testCheckModelMetadataNoChanges() throws IOException {
         val file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -469,7 +475,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testCheckModelMetadataModelAggUpdate() throws IOException {
         val file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -488,7 +495,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testCheckModelMetadataModelDimConflict() throws IOException {
         val file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -512,7 +520,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testCheckModelMetadataModelJoinConflict() throws IOException {
         val file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -545,7 +554,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testCheckModelMetadataModelFactConflict() throws IOException {
         val file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -570,7 +580,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testCheckModelMetadataModelColumnUpdate() throws IOException {
         val file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -594,7 +605,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testCheckModelMetadataModelFilterConflict() throws IOException {
         val file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -609,7 +621,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testCheckModelMetadataModelPartitionConflict() throws IOException {
         val file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -679,7 +692,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testCheckModelMetadataModelDifferentMultiplePartitionColumnWithEmptyValue() throws IOException {
         val file = new File(
                 "src/test/resources/ut_meta/schema_utils/model_different_multiple_column_with_empty_partition_value/model_different_multiple_column_with_empty_partition_value_2021_01_18_11_30_10_E70AE88EBB2371A8F3FE3979B9DCBB06.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -693,7 +707,7 @@ public class MetaStoreServiceTest extends ServiceTestBase {
                         && pair.getFirstAttributes().get("partitions")
                                 .equals(Arrays.asList(Collections.singletonList("p1"), Collections.singletonList("p2"),
                                         Collections.singletonList("p3")))
-                        && ((List) pair.getSecondAttributes().get("partitions")).isEmpty()));
+                        && ((List<?>) pair.getSecondAttributes().get("partitions")).isEmpty()));
     }
 
     @Test
@@ -739,7 +753,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testCheckModelMetadataModelMissingTable() throws IOException {
         val file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -750,14 +765,15 @@ public class MetaStoreServiceTest extends ServiceTestBase {
         Assert.assertTrue(
                 modelSchemaChange.getMissingItems().stream().anyMatch(sc -> sc.getType() == SchemaNodeType.MODEL_TABLE
                         && sc.getDetail().equals("SSB.CUSTOMER_NEW") && !sc.isImportable()));
-        Assert.assertFalse(modelSchemaChange.importable());
+        Assert.assertTrue(modelSchemaChange.importable());
     }
 
     @Test
     public void testCheckModelMetadataModelIndex() throws IOException {
         val file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
 
         SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
@@ -770,7 +786,7 @@ public class MetaStoreServiceTest extends ServiceTestBase {
                 .filter(sc -> sc.getType() == SchemaNodeType.WHITE_LIST_INDEX)
                 .filter(sc -> sc.getDetail().equals("20000000001"))
                 .filter(SchemaChangeCheckResult.BaseItem::isOverwritable).anyMatch(sc -> {
-                    String col_orders = String.join(",", ((ArrayList<String>) sc.getAttributes().get("col_orders")));
+                    String col_orders = String.join(",", ((List<String>) sc.getAttributes().get("col_orders")));
                     return col_orders.equals(
                             "P_LINEORDER.LO_CUSTKEY,P_LINEORDER.LO_SUPPKEY,P_LINEORDER.LO_ORDERDATE,P_LINEORDER.LO_QUANTITY,P_LINEORDER.LO_DISCOUNT,P_LINEORDER.LO_LINENUMBER,P_LINEORDER.LO_PARTKEY,P_LINEORDER.LO_ORDERKEY");
                 }));
@@ -780,7 +796,7 @@ public class MetaStoreServiceTest extends ServiceTestBase {
                         .filter(sc -> sc.getDetail().equals("20000000001"))
                         .filter(SchemaChangeCheckResult.BaseItem::isOverwritable).anyMatch(sc -> {
                             String col_orders = String.join(",",
-                                    ((ArrayList<String>) sc.getAttributes().get("col_orders")));
+                                    ((List<String>) sc.getAttributes().get("col_orders")));
                             return col_orders.equals(
                                     "P_LINEORDER.LO_LINENUMBER,P_LINEORDER.LO_SUPPKEY,P_LINEORDER.LO_QUANTITY,P_LINEORDER.LO_PARTKEY,P_LINEORDER.LO_ORDERKEY,P_LINEORDER.LO_CUSTKEY,P_LINEORDER.LO_DISCOUNT,P_LINEORDER.LO_ORDERDATE");
                         }));
@@ -790,7 +806,7 @@ public class MetaStoreServiceTest extends ServiceTestBase {
                         .filter(sc -> sc.getDetail().equals("20000010001"))
                         .filter(SchemaChangeCheckResult.BaseItem::isOverwritable).anyMatch(sc -> {
                             String col_orders = String.join(",",
-                                    ((ArrayList<String>) sc.getAttributes().get("col_orders")));
+                                    ((List<String>) sc.getAttributes().get("col_orders")));
                             return col_orders.equals("P_LINEORDER.LO_SUPPKEY,P_LINEORDER.LO_QUANTITY");
                         }));
     }
@@ -798,29 +814,30 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     @Test
     public void testCheckModelMetadataWithoutMD5Checksum() throws Exception {
         File file = new File("src/test/resources/ut_model_metadata/metastore_model_metadata.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
-        thrown.expect(KylinException.class);
-        thrown.expectMessage(
-                "Can’t parse the metadata file. Please don’t modify the content or zip the file manually after unzip.");
-        metaStoreService.checkModelMetadata("default", multipartFile, null);
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
+        Assert.assertThrows(
+                "Can’t parse the metadata file. Please don’t modify the content or zip the file manually after unzip.",
+                KylinException.class, () -> metaStoreService.checkModelMetadata("default", multipartFile, null));
     }
 
     @Test
     public void testCheckModelMetadataWithWrongMD5Checksum() throws Exception {
         File file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b1.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
-        thrown.expect(KylinException.class);
-        thrown.expectMessage(
-                "Can’t parse the metadata file. Please don’t modify the content or zip the file manually after unzip.");
-        metaStoreService.checkModelMetadata("default", multipartFile, null);
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
+        Assert.assertThrows(
+                "Can’t parse the metadata file. Please don’t modify the content or zip the file manually after unzip.",
+                KylinException.class, () -> metaStoreService.checkModelMetadata("default", multipartFile, null));
     }
 
     @Test
     public void testImportModelMetadata() throws Exception {
         File file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         ModelImportRequest request = new ModelImportRequest();
         List<ModelImportRequest.ModelImport> models = new ArrayList<>();
         models.add(new ModelImportRequest.ModelImport("model_index", "model_index",
@@ -885,12 +902,11 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testImportModelMetadataWithRecInExpertModeProject() throws Exception {
         String id = "761215ee-3f21-4d1a-aae5-3d0d9d6ede85";
         NIndexPlanManager indexPlanManager = NIndexPlanManager.getInstance(getTestConfig(), "original_project");
-        indexPlanManager.updateIndexPlan(id, copyForWrite -> {
-            copyForWrite.setRuleBasedIndex(new RuleBasedIndex());
-        });
+        indexPlanManager.updateIndexPlan(id, copyForWrite -> copyForWrite.setRuleBasedIndex(new RuleBasedIndex()));
         String fileName = "issue_model_metadata_2022_06_17_14_54_54_F89122A7E22F485D8359616BC1C30718.zip";
         File file = new File("src/test/resources/ut_model_metadata/" + fileName);
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         ModelImportRequest request = new ModelImportRequest();
         List<ModelImportRequest.ModelImport> models = new ArrayList<>();
         models.add(new ModelImportRequest.ModelImport("model_index", "model_index",
@@ -1219,27 +1235,43 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     public void testImportModelMetadataWithUnCreatable() throws Exception {
         File file = new File(
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
-        var multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        var multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         ModelImportRequest request = new ModelImportRequest();
-        List<ModelImportRequest.ModelImport> models = new ArrayList<>();
-        models.add(new ModelImportRequest.ModelImport("missing_table_model", "missing_table_model_1",
-                ModelImportRequest.ImportType.NEW));
-
-        request.setModels(models);
+        request.setModels(Lists.newArrayList(new ModelImportRequest.ModelImport("missing_table_model",
+                "missing_table_model_1", ModelImportRequest.ImportType.NEW)));
+        val manager = NTableMetadataManager.getInstance(getTestConfig(), "original_project");
+        Assert.assertNull(manager.getTableDesc("SSB.CUSTOMER_NEW"));
+        metaStoreService.importModelMetadata("original_project", multipartFile, request);
+        Assert.assertNotNull(manager.getTableDesc("SSB.CUSTOMER_NEW"));
 
-        thrown.expectCause(new BaseMatcher<Throwable>() {
-            @Override
-            public boolean matches(Object item) {
-                return ((Exception) item).getMessage().contains(
-                        "Can’t select ImportType \"NEW\" for the model \"missing_table_model_1\". Please select \"UN_IMPORT\".");
-            }
+        {
 
-            @Override
-            public void describeTo(Description description) {
+        }
+    }
 
-            }
-        });
-        metaStoreService.importModelMetadata("original_project", multipartFile, request);
+    @Test
+    public void testImportModelWithLoadTableFailed() throws Exception {
+        File file = new File(
+                "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
+        var multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
+        ModelImportRequest request = new ModelImportRequest();
+        request.setModels(Lists.newArrayList(new ModelImportRequest.ModelImport("missing_table_model",
+                "missing_table_model_1", ModelImportRequest.ImportType.NEW)));
+        val manager = NTableMetadataManager.getInstance(getTestConfig(), "original_project");
+        Assert.assertNull(manager.getTableDesc("SSB.CUSTOMER_NEW"));
+        val spyService = Mockito.spy(metaStoreService);
+        val tableExtService = (TableExtService) ReflectionTestUtils.getField(spyService, "tableExtService");
+        val spyTableService = Mockito.spy(tableExtService);
+        LoadTableResponse loadTableResponse = new LoadTableResponse();
+        loadTableResponse.getFailed().add("SSB.CUSTOMER_NEW");
+        Mockito.doReturn(loadTableResponse).when(spyTableService).loadDbTables(new String[] { "SSB.CUSTOMER_NEW" },
+                "original_project", false);
+        ReflectionTestUtils.setField(spyService, "tableExtService", spyTableService);
+        Mockito.doReturn(loadTableResponse).when(spyService).innerLoadTables(Mockito.anyString(), Mockito.anySet());
+        spyService.importModelMetadata("original_project", multipartFile, request);
+        Assert.assertNull(manager.getTableDesc("SSB.CUSTOMER_NEW"));
     }
 
     @Test
@@ -1355,6 +1387,140 @@ public class MetaStoreServiceTest extends ServiceTestBase {
                         && changedItem.getDetail().equals("SSB.P_LINEORDER.LO_SUPPKEY")));
     }
 
+    @Test
+    public void testMissTable() throws IOException {
+        val file = new File(
+                "src/test/resources/ut_meta/schema_utils/model_missing_table_update/model_table_missing_update_model_metadata_2020_11_16_02_37_33_3182D4A7694DA64E3D725C140CF80A47.zip");
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
+        val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
+
+        val modelSchemaChange = metadataCheckResponse.getModels().get("ssb_model");
+        Assert.assertNotNull(modelSchemaChange);
+        Assert.assertTrue(modelSchemaChange.isLoadTableAble());
+        Set<String> loadTables = modelSchemaChange.getLoadTables();
+        Assert.assertEquals(1, loadTables.size());
+        Assert.assertEquals("SSB.CUSTOMER_NEW", loadTables.iterator().next());
+        Assert.assertTrue(modelSchemaChange.creatable());
+        Assert.assertTrue(modelSchemaChange.importable());
+        Assert.assertFalse(modelSchemaChange.overwritable());
+
+        testModelImportable(metadataCheckResponse);
+
+        {
+            val mockChange = Mockito.spy(modelSchemaChange);
+            mockChange.setLoadTableAble(false);
+            mockChange.setLoadTables(Sets.newHashSet());
+            Mockito.doReturn(false).when(mockChange).overwritable();
+            Mockito.doReturn(false).when(mockChange).creatable();
+            Mockito.doReturn(false).when(mockChange).importable();
+            metadataCheckResponse.getModels().put("ssb_model", mockChange);
+            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
+            val change = metadataCheckResponse.getModels().get("ssb_model");
+            Assert.assertTrue(change.isLoadTableAble());
+            Assert.assertFalse(change.getLoadTables().isEmpty());
+        }
+
+        {
+            val mockChange = Mockito.spy(modelSchemaChange);
+            mockChange.setLoadTableAble(false);
+            mockChange.setLoadTables(Sets.newHashSet());
+            metadataCheckResponse.getModels().put("ssb_model", mockChange);
+            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEWNEW"), metadataCheckResponse);
+            val change = metadataCheckResponse.getModels().get("ssb_model");
+            Assert.assertFalse(change.isLoadTableAble());
+            Assert.assertTrue(change.getLoadTables().isEmpty());
+        }
+
+        {
+            val mockChange = Mockito.spy(modelSchemaChange);
+            mockChange.setLoadTableAble(false);
+            mockChange.setLoadTables(Sets.newHashSet());
+            val missItems = mockChange.getMissingItems().stream().filter(item -> item.getType() != MODEL_TABLE)
+                    .collect(Collectors.toList());
+            ReflectionTestUtils.setField(mockChange, "missingItems", missItems);
+            metadataCheckResponse.getModels().put("ssb_model", mockChange);
+            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
+            val change = metadataCheckResponse.getModels().get("ssb_model");
+            Assert.assertFalse(change.isLoadTableAble());
+            Assert.assertTrue(change.getLoadTables().isEmpty());
+        }
+
+        {
+            val mockChange = Mockito.spy(modelSchemaChange);
+            mockChange.setLoadTableAble(false);
+            mockChange.setLoadTables(Sets.newHashSet());
+            val newItems = mockChange.getNewItems().stream()
+                    .peek(item -> item.getConflictReason().setReason(SAME_CC_NAME_HAS_DIFFERENT_EXPR))
+                    .collect(Collectors.toList());
+            ReflectionTestUtils.setField(mockChange, "newItems", newItems);
+            metadataCheckResponse.getModels().put("ssb_model", mockChange);
+            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
+            val change = metadataCheckResponse.getModels().get("ssb_model");
+            Assert.assertFalse(change.isLoadTableAble());
+            Assert.assertTrue(change.getLoadTables().isEmpty());
+        }
+    }
+
+    private void testModelImportable(SchemaChangeCheckResult metadataCheckResponse) {
+        val modelSchemaChange = metadataCheckResponse.getModels().get("ssb_model");
+        {
+            val mockChange = Mockito.spy(modelSchemaChange);
+            mockChange.setLoadTableAble(false);
+            mockChange.setLoadTables(Sets.newHashSet());
+            Mockito.doReturn(true).when(mockChange).overwritable();
+            Mockito.doReturn(true).when(mockChange).creatable();
+            Mockito.doReturn(true).when(mockChange).importable();
+            metadataCheckResponse.getModels().put("ssb_model", mockChange);
+            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
+            val change = metadataCheckResponse.getModels().get("ssb_model");
+            Assert.assertFalse(change.isLoadTableAble());
+            Assert.assertTrue(change.getLoadTables().isEmpty());
+        }
+
+        {
+            val mockChange = Mockito.spy(modelSchemaChange);
+            mockChange.setLoadTableAble(false);
+            mockChange.setLoadTables(Sets.newHashSet());
+            Mockito.doReturn(true).when(mockChange).overwritable();
+            Mockito.doReturn(false).when(mockChange).creatable();
+            Mockito.doReturn(false).when(mockChange).importable();
+            metadataCheckResponse.getModels().put("ssb_model", mockChange);
+            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
+            val change = metadataCheckResponse.getModels().get("ssb_model");
+            Assert.assertFalse(change.isLoadTableAble());
+            Assert.assertTrue(change.getLoadTables().isEmpty());
+        }
+
+        {
+            val mockChange = Mockito.spy(modelSchemaChange);
+            mockChange.setLoadTableAble(false);
+            mockChange.setLoadTables(Sets.newHashSet());
+            Mockito.doReturn(false).when(mockChange).overwritable();
+            Mockito.doReturn(true).when(mockChange).creatable();
+            Mockito.doReturn(false).when(mockChange).importable();
+            metadataCheckResponse.getModels().put("ssb_model", mockChange);
+            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
+            val change = metadataCheckResponse.getModels().get("ssb_model");
+            Assert.assertFalse(change.isLoadTableAble());
+            Assert.assertTrue(change.getLoadTables().isEmpty());
+        }
+
+        {
+            val mockChange = Mockito.spy(modelSchemaChange);
+            mockChange.setLoadTableAble(false);
+            mockChange.setLoadTables(Sets.newHashSet());
+            Mockito.doReturn(false).when(mockChange).overwritable();
+            Mockito.doReturn(false).when(mockChange).creatable();
+            Mockito.doReturn(true).when(mockChange).importable();
+            metadataCheckResponse.getModels().put("ssb_model", mockChange);
+            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
+            val change = metadataCheckResponse.getModels().get("ssb_model");
+            Assert.assertFalse(change.isLoadTableAble());
+            Assert.assertTrue(change.getLoadTables().isEmpty());
+        }
+    }
+
     @Test
     public void testGetModelMetadataProjectName() throws IOException {
         File file = new File(
@@ -1373,7 +1539,8 @@ public class MetaStoreServiceTest extends ServiceTestBase {
     @Test
     public void testMetadataChecker() throws IOException {
         File file = new File("src/test/resources/ut_model_metadata/ut_model_matadata.zip");
-        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null, new FileInputStream(file));
+        val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
+                Files.newInputStream(file.toPath()));
         KylinConfig modelConfig = KylinConfig.createKylinConfig(KylinConfig.getInstanceFromEnv());
         MetadataChecker metadataChecker = new MetadataChecker(MetadataStore.createMetadataStore(modelConfig));
         Map<String, RawResource> rawResourceMap = getRawResourceFromUploadFile(multipartFile);
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
index ff8a49d330..1747593dbb 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/TableServiceTest.java
@@ -148,7 +148,7 @@ public class TableServiceTest extends CSVSourceTestCase {
     @InjectMocks
     private FusionModelService fusionModelService = Mockito.spy(new FusionModelService());
 
-    private StreamingJobListener eventListener = new StreamingJobListener();
+    private final StreamingJobListener eventListener = new StreamingJobListener();
 
     @Before
     public void setup() {
@@ -458,8 +458,8 @@ public class TableServiceTest extends CSVSourceTestCase {
     @Test
     public void testAddAndBroadcastSparkSession() {
         getTestConfig().setProperty("kylin.env.use-dynamic-S3-role-credential-in-table", "true");
-        TableExtDesc.S3RoleCredentialInfo roleCredentialInfo = null;
-        tableService.addAndBroadcastSparkSession(roleCredentialInfo);
+        tableService.addAndBroadcastSparkSession(null);
+        TableExtDesc.S3RoleCredentialInfo roleCredentialInfo;
         roleCredentialInfo = new TableExtDesc.S3RoleCredentialInfo("testbucket2", "", "");
         tableService.addAndBroadcastSparkSession(roleCredentialInfo);
         assert !SparderEnv.getSparkSession().conf().contains("fs.s3a.bucket2.testbucket.aws.credentials.provider");
@@ -588,7 +588,6 @@ public class TableServiceTest extends CSVSourceTestCase {
         val originSize = nTableMetadataManager.listAllTables().size();
 
         // Add partition_key and data_loading_range
-        DateRangeRequest request = mockDateRangeRequest();
         tableService.setPartitionKey(tableName, "default", "CAL_DT", "yyyy-MM-dd");
 
         // unload table
@@ -607,7 +606,6 @@ public class TableServiceTest extends CSVSourceTestCase {
     @Test
     public void testUnloadKafkaTable() {
         String project = "streaming_test";
-        NProjectManager npr = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
         NTableMetadataManager tableManager = NTableMetadataManager.getInstance(KylinConfig.getInstanceFromEnv(),
                 project);
         StreamingJobManager mgr = StreamingJobManager.getInstance(getTestConfig(), project);
@@ -868,7 +866,7 @@ public class TableServiceTest extends CSVSourceTestCase {
         NDataLoadingRangeManager rangeManager = NDataLoadingRangeManager.getInstance(KylinConfig.getInstanceFromEnv(),
                 "default");
         NDataLoadingRange dataLoadingRange = rangeManager.getDataLoadingRange("DEFAULT.TEST_KYLIN_FACT");
-        SegmentRange segmentRange = new SegmentRange.TimePartitionedSegmentRange(1294364400000L, 1294364500000L);
+        SegmentRange<Long> segmentRange = new SegmentRange.TimePartitionedSegmentRange(1294364400000L, 1294364500000L);
         dataLoadingRange.setCoveredRange(segmentRange);
         NDataLoadingRange updateRange = rangeManager.copyForWrite(dataLoadingRange);
         rangeManager.updateDataLoadingRange(updateRange);
@@ -1059,13 +1057,6 @@ public class TableServiceTest extends CSVSourceTestCase {
         return request;
     }
 
-    private DateRangeRequest mockeDateRangeRequestWithoutTime() {
-        DateRangeRequest request = new DateRangeRequest();
-        request.setProject("default");
-        request.setTable("DEFAULT.TEST_KYLIN_FACT");
-        return request;
-    }
-
     @Test
     public void testGetProjectTables() throws Exception {
         NInitTablesResponse response;
@@ -1073,36 +1064,31 @@ public class TableServiceTest extends CSVSourceTestCase {
                 (databaseName, tableName) -> tableService.getTableNameResponses("default", databaseName, tableName));
         Assert.assertEquals(0, response.getDatabases().size());
 
-        response = tableService.getProjectTables("default", "SSB.CU", 0, 14, true, (databaseName, tableName) -> {
-            return tableService.getTableNameResponses("default", databaseName, tableName);
-        });
+        response = tableService.getProjectTables("default", "SSB.CU", 0, 14, true,
+                (databaseName, tableName) -> tableService.getTableNameResponses("default", databaseName, tableName));
         Assert.assertEquals(1, response.getDatabases().size());
-        Assert.assertEquals(1, response.getDatabases().get(0).getTables().size());
+        Assert.assertEquals(2, response.getDatabases().get(0).getTables().size());
 
-        response = tableService.getProjectTables("default", "", 0, 14, true, (databaseName, tableName) -> {
-            return tableService.getTableNameResponses("default", databaseName, tableName);
-        });
+        response = tableService.getProjectTables("default", "", 0, 14, true,
+                (databaseName, tableName) -> tableService.getTableNameResponses("default", databaseName, tableName));
         Assert.assertEquals(3, response.getDatabases().size());
-        Assert.assertEquals(20,
+        Assert.assertEquals(21,
                 response.getDatabases().get(0).getTables().size() + response.getDatabases().get(1).getTables().size()
                         + response.getDatabases().get(2).getTables().size());
 
-        response = tableService.getProjectTables("default", "TEST", 0, 14, true, (databaseName, tableName) -> {
-            return tableService.getTableNameResponses("default", databaseName, tableName);
-        });
+        response = tableService.getProjectTables("default", "TEST", 0, 14, true,
+                (databaseName, tableName) -> tableService.getTableNameResponses("default", databaseName, tableName));
         Assert.assertEquals(2, response.getDatabases().size());
         Assert.assertEquals(13,
                 response.getDatabases().get(0).getTables().size() + response.getDatabases().get(1).getTables().size());
 
-        response = tableService.getProjectTables("default", "EDW.", 0, 14, true, (databaseName, tableName) -> {
-            return tableService.getTableNameResponses("default", databaseName, tableName);
-        });
+        response = tableService.getProjectTables("default", "EDW.", 0, 14, true,
+                (databaseName, tableName) -> tableService.getTableNameResponses("default", databaseName, tableName));
         Assert.assertEquals(1, response.getDatabases().size());
         Assert.assertEquals(3, response.getDatabases().get(0).getTables().size());
 
-        response = tableService.getProjectTables("default", "EDW.", 0, 14, false, (databaseName, tableName) -> {
-            return tableService.getTableDesc("default", true, tableName, databaseName, true);
-        });
+        response = tableService.getProjectTables("default", "EDW.", 0, 14, false,
+                (databaseName, tableName) -> tableService.getTableDesc("default", true, tableName, databaseName, true));
         Assert.assertEquals(1, response.getDatabases().size());
         Assert.assertEquals(3, response.getDatabases().get(0).getTables().size());
 
@@ -1167,7 +1153,7 @@ public class TableServiceTest extends CSVSourceTestCase {
                 .setProperty("kylin.source.hive.databases", "ssb");
         tableService.loadProjectHiveTableNameToCacheImmediately("default", true);
         tables = tableService.getTableNameResponsesInCache("default", "SSB", "");
-        Assert.assertEquals(6, tables.size());
+        Assert.assertEquals(7, tables.size());
 
         NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).getProject("default").setPrincipal("default");
         tableService.loadHiveTableNameToCache();
@@ -1199,7 +1185,6 @@ public class TableServiceTest extends CSVSourceTestCase {
         testData.put("t", Arrays.asList("aa", "ab", "bc"));
         NHiveSourceInfo sourceInfo = new NHiveSourceInfo();
         sourceInfo.setTables(testData);
-        UserGroupInformation ugi = UserGroupInformation.getLoginUser();
         DataSourceState.getInstance().putCache("project#default", sourceInfo, Arrays.asList("aa", "ab", "bc"));
         List<?> tables = tableService.getTableNameResponsesInCache("default", "t", "a");
         Assert.assertEquals(2, tables.size());
@@ -1306,7 +1291,7 @@ public class TableServiceTest extends CSVSourceTestCase {
         KylinConfig config = getTestConfig();
         config.setProperty("kylin.source.load-hive-tablename-enabled", "false");
         config.setProperty("kylin.query.security.acl-tcr-enabled", "true");
-        Assert.assertEquals(6, tableService.getHiveTableNameResponses("default", "SSB", "").size());
+        Assert.assertEquals(7, tableService.getHiveTableNameResponses("default", "SSB", "").size());
         Assert.assertEquals(11, tableService.getHiveTableNameResponses("default", "DEFAULT", "").size());
 
         val table = NTableMetadataManager.getInstance(getTestConfig(), "default").getTableDesc("DEFAULT.TEST_ENCODING");
@@ -1321,7 +1306,7 @@ public class TableServiceTest extends CSVSourceTestCase {
         acl.setTable(aclTable);
         manager.updateAclTCR(acl, "test", true);
 
-        Assert.assertEquals(6, tableService.getHiveTableNameResponses("default", "SSB", "").size());
+        Assert.assertEquals(7, tableService.getHiveTableNameResponses("default", "SSB", "").size());
         Assert.assertEquals(11, tableService.getHiveTableNameResponses("default", "DEFAULT", "").size());
         config.setProperty("kylin.source.load-hive-tablename-enabled", "true");
         config.setProperty("kylin.query.security.acl-tcr-enabled", "false");
@@ -1393,10 +1378,8 @@ public class TableServiceTest extends CSVSourceTestCase {
         try {
             List<Integer> sourceTypes = Arrays.asList(1, 9);
             val tableDescs2 = tableService.getTableDescByTypes(project, true, "", "SSB", false, sourceTypes);
-            assert tableDescs2.stream().filter(tableDesc -> tableDesc.getSourceType() == 1).collect(Collectors.toList())
-                    .size() > 0;
-            assert tableDescs2.stream().filter(tableDesc -> tableDesc.getSourceType() == 9).collect(Collectors.toList())
-                    .size() > 0;
+            assert tableDescs2.stream().anyMatch(tableDesc -> tableDesc.getSourceType() == 1);
+            assert tableDescs2.stream().anyMatch(tableDesc -> tableDesc.getSourceType() == 9);
         } catch (Exception e) {
             Assert.fail();
         }