You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/01/06 11:10:05 UTC

[kylin] branch kylin5 updated (c0290399fc -> f760606ff3)

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a change to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


    from c0290399fc KYLIN-5368 Fix when stop ke job on yarn are not be killed for yarn cluster mode
     new 60a2b5b4fa KYLIN-5342 upgrade spark version 3.2.0-kylin-4.6.1.0 before Release Candidate
     new f241d43dca KYLIN-5343 Add column datatype check when import model
     new dea257ad3b KYLIN-5344 Fix epoch update when epoch checker disabled
     new 4af690015c KYLIN-5345 remove non existent user/usergroup in recommand advice list
     new bda0206639 KYLIN-5346 add monitor metrics for long running jobs
     new 1edec930c1 Revert KYLIN-5372 due to this fix cannot override snapshot build scenarios
     new 7bd187ae0d KYLIN-5347 use spark session hadoop config when aws serverless environment on build snapshot for partition table
     new eade3c6e1e KYLIN-5348 add server.servlet.encoding.force=true to enable setting UTF-8
     new 5bca044690 KYLIN-5349 Support project-level configuration of concurrent task limits
     new be5e5fa209 KYLIN-5356 Backend configuration of users supports the project administrator role
     new 236172adec KYLIN-5357 fix snyk vulnerabilities
     new f760606ff3 remove unused profile

The 12 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |  24 +--
 .../apache/kylin/rest/config/MetricsConfig.java    |   1 +
 .../rest/config/initialize/MetricsRegistry.java    |  63 +++++++-
 ...sResponse.java => OpenAccessGroupResponse.java} |  16 +-
 ...rsResponse.java => OpenAccessUserResponse.java} |  16 +-
 .../config/initialize/MetricsRegistryTest.java     |  65 ++++++---
 .../apache/kylin/common/metrics/MetricsTag.java    |   2 +
 .../metrics/prometheus/PrometheusMetrics.java      |   1 +
 .../job/impl/threadpool/NDefaultScheduler.java     |  19 ++-
 .../apache/kylin/job/runners/FetcherRunner.java    |  27 +++-
 .../job/impl/threadpool/NDefaultSchedulerTest.java |  79 +++++++++-
 .../kylin/metadata/epoch/EpochOrchestrator.java    |   8 +-
 .../org/apache/kylin/metadata/model/TableDesc.java |  18 ---
 .../metadata/model/schema/ModelImportChecker.java  |  31 ++--
 .../model/schema/SchemaChangeCheckResult.java      |  99 +++++++------
 .../kylin/metadata/model/schema/SchemaUtil.java    |  24 ++-
 .../schema/strategy/ComputedColumnStrategy.java    |  37 +++--
 .../schema/strategy/MultiplePartitionStrategy.java |   8 +-
 .../schema/strategy/OverWritableStrategy.java      |  16 +-
 .../schema/strategy/SchemaChangeStrategy.java      |  32 ++--
 .../model/schema/strategy/TableColumnStrategy.java |  27 ++--
 .../model/schema/strategy/TableStrategy.java       |   9 +-
 .../schema/strategy/UnOverWritableStrategy.java    |  18 ++-
 .../org/apache/kylin/rest/constant/Constant.java   |   2 +-
 .../metadata/epoch/EpochOrchestratorTest.java      |  13 +-
 .../apache/kylin/metadata/model/TableDescTest.java |  55 -------
 .../metadata/model/schema/SchemaUtilTest.java      |  14 ++
 .../kylin/rest/service/ModelBuildService.java      |  17 ++-
 .../kylin/rest/service/ModelServiceBuildTest.java  |  46 ++++++
 .../localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json |  14 +-
 src/kylin-it/pom.xml                               |   5 -
 .../rest/controller/open/OpenUserController.java   |  10 +-
 .../rest/controller/v2/NAccessControllerV2.java    | 124 ++++++++++++++--
 .../rest/controller/NAccessControllerV2Test.java   | 125 +++++++++++++++-
 .../kylin/rest/service/MetaStoreService.java       | 158 ++++++++++----------
 .../kylin/rest/service/MetaStoreServiceTest.java   | 161 ++++-----------------
 .../org/apache/kylin/query/schema/OLAPTable.java   |  18 ---
 src/server/pom.xml                                 |   3 +-
 src/server/src/main/resources/application.yaml     |   3 +-
 .../engine/spark/application/SparkApplication.java |   5 -
 .../engine/spark/builder/SnapshotBuilder.scala     |   4 +
 .../org/apache/spark/application/JobWorker.scala   |   8 +-
 .../apache/spark/application/TestJobMonitor.scala  |   2 +-
 .../apache/spark/application/TestJobWorker.scala   | 141 +-----------------
 44 files changed, 909 insertions(+), 659 deletions(-)
 copy src/common-service/src/main/java/org/apache/kylin/rest/response/{ServersResponse.java => OpenAccessGroupResponse.java} (79%)
 copy src/common-service/src/main/java/org/apache/kylin/rest/response/{ServersResponse.java => OpenAccessUserResponse.java} (79%)


[kylin] 08/12: KYLIN-5348 add server.servlet.encoding.force=true to enable setting UTF-8

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit eade3c6e1e5e30ef91ab73eb1b80590a97f98fdd
Author: lixiang <44...@qq.com>
AuthorDate: Wed Oct 26 14:21:36 2022 +0800

    KYLIN-5348 add server.servlet.encoding.force=true to enable setting UTF-8
---
 src/server/src/main/resources/application.yaml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/src/server/src/main/resources/application.yaml b/src/server/src/main/resources/application.yaml
index c9d3b74e5a..583a28a682 100644
--- a/src/server/src/main/resources/application.yaml
+++ b/src/server/src/main/resources/application.yaml
@@ -31,6 +31,7 @@ server:
     context-path: "/kylin"
     encoding:
       charset: UTF-8
+      force: true
   max-http-header-size: 32KB
 
 management:
@@ -129,4 +130,4 @@ spring:
     import: optional:${KYLIN_CONF}/kylin.properties
   web:
     resources:
-      static-locations: "file://${KYLIN_HOME}/kystudio/dist"
\ No newline at end of file
+      static-locations: "file://${KYLIN_HOME}/kystudio/dist"


[kylin] 06/12: Revert KYLIN-5372 due to this fix cannot override snapshot build scenarios

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 1edec930c1c6331d4c98452061c8820f2d70f4ae
Author: huangsheng <hu...@163.com>
AuthorDate: Fri Nov 4 16:37:34 2022 +0800

    Revert KYLIN-5372 due to this fix cannot override snapshot build scenarios
---
 .../engine/spark/application/SparkApplication.java |   5 -
 .../org/apache/spark/application/JobWorker.scala   |   8 +-
 .../apache/spark/application/TestJobMonitor.scala  |   2 +-
 .../apache/spark/application/TestJobWorker.scala   | 141 +--------------------
 4 files changed, 3 insertions(+), 153 deletions(-)

diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 2e8a503156..9cc144902d 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.AccessControlException;
 import org.apache.kylin.cluster.IClusterManager;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
@@ -325,10 +324,6 @@ public abstract class SparkApplication implements Application {
     }
 
     protected void handleException(Exception e) throws Exception {
-        if (e instanceof AccessControlException) {
-            logger.error("Permission denied.", e);
-            throw new NoRetryException("Permission denied.");
-        }
         throw e;
     }
 
diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
index 77b8834f02..069ac171cc 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/spark/application/JobWorker.scala
@@ -18,7 +18,6 @@
 
 package org.apache.spark.application
 
-
 import java.util.concurrent.Executors
 
 import org.apache.kylin.engine.spark.application.SparkApplication
@@ -49,6 +48,7 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K
     execute()
   }
 
+
   private def execute(): Unit = {
     pool.execute(new Runnable {
       override def run(): Unit = {
@@ -56,12 +56,6 @@ class JobWorker(application: SparkApplication, args: Array[String], eventLoop: K
           application.execute(args)
           eventLoop.post(JobSucceeded())
         } catch {
-          // Compatible with runtime exceptions thrown by the SparkApplication.execute(args: Array[String])
-          case runtimeException: RuntimeException =>
-            runtimeException.getCause match {
-              case noRetryException: NoRetryException => eventLoop.post(UnknownThrowable(noRetryException))
-              case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
-            }
           case exception: NoRetryException => eventLoop.post(UnknownThrowable(exception))
           case throwable: Throwable => eventLoop.post(ResourceLack(throwable))
         }
diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
index 1864d8749c..b9a1ff87c2 100644
--- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
+++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobMonitor.scala
@@ -22,7 +22,6 @@ import java.util
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
 import com.amazonaws.services.s3.model.AmazonS3Exception
-import org.apache.hadoop.security.AccessControlException
 import org.apache.kylin.cluster.{AvailableResource, IClusterManager, ResourceInfo}
 import org.apache.kylin.common.KylinConfig
 import org.apache.kylin.engine.spark.job.KylinBuildEnv
@@ -308,6 +307,7 @@ class TestJobMonitor extends SparderBaseFunSuite with BeforeAndAfterEach {
     }
   }
 
+
   test("post JobFailed event when receive class not found event") {
     withEventLoop { eventLoop =>
       Mockito.when(config.getSparkEngineMaxRetryTime).thenReturn(1)
diff --git a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
index 64afc26b85..73035a24c3 100644
--- a/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
+++ b/src/spark-project/engine-spark/src/test/scala/org/apache/spark/application/TestJobWorker.scala
@@ -18,10 +18,9 @@
 
 package org.apache.spark.application
 
-import org.apache.hadoop.security.AccessControlException
-
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicBoolean
+
 import org.apache.kylin.engine.spark.application.SparkApplication
 import org.apache.kylin.engine.spark.scheduler._
 import org.apache.spark.scheduler.KylinJobEventLoop
@@ -54,30 +53,6 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
     eventLoop.stop()
   }
 
-  test("post ResourceLack event when job failed with runtime exception for lack of resource") {
-    val eventLoop = new KylinJobEventLoop
-    eventLoop.start()
-    val worker = new JobWorker(new ResourceLackJobWithRuntimeException(), Array.empty, eventLoop)
-    val latch = new CountDownLatch(2)
-    val receiveResourceLack = new AtomicBoolean(false)
-    val listener = new KylinJobListener {
-      override def onReceive(event: KylinJobEvent): Unit = {
-        if (event.isInstanceOf[ResourceLack]) {
-          receiveResourceLack.getAndSet(true)
-        }
-        latch.countDown()
-      }
-    }
-    eventLoop.registerListener(listener)
-    eventLoop.post(RunJob())
-    // receive RunJob and ResourceLack
-    latch.await()
-    assert(receiveResourceLack.get())
-    eventLoop.unregisterListener(listener)
-    worker.stop()
-    eventLoop.stop()
-  }
-
   test("post JobSucceeded event when job succeeded") {
     val eventLoop = new KylinJobEventLoop
     eventLoop.start()
@@ -125,78 +100,6 @@ class TestJobWorker extends SparderBaseFunSuite with BeforeAndAfter {
     worker.stop()
     eventLoop.stop()
   }
-
-  test("post Permission denied event when PermissionDenied occurred with handle Exception function") {
-    val eventLoop = new KylinJobEventLoop
-    eventLoop.start()
-    val worker = new JobWorker(new PermissionDeniedJobWithHandleException(), Array.empty, eventLoop)
-    val latch = new CountDownLatch(2)
-    val receivePermissionDenied = new AtomicBoolean(false)
-    val listener = new KylinJobListener {
-      override def onReceive(event: KylinJobEvent): Unit = {
-        if (event.isInstanceOf[UnknownThrowable]) {
-          receivePermissionDenied.getAndSet(true)
-        }
-        latch.countDown()
-      }
-    }
-    eventLoop.registerListener(listener)
-    eventLoop.post(RunJob())
-    // receive RunJob and PermissionDenied
-    latch.await()
-    assert(receivePermissionDenied.get())
-    eventLoop.unregisterListener(listener)
-    worker.stop()
-    eventLoop.stop()
-  }
-
-  test("post Permission denied event when RuntimeException occurred") {
-    val eventLoop = new KylinJobEventLoop
-    eventLoop.start()
-    val worker = new JobWorker(new PermissionDeniedJobWithRuntimeException(), Array.empty, eventLoop)
-    val latch = new CountDownLatch(2)
-    val receivePermissionDenied = new AtomicBoolean(false)
-    val listener = new KylinJobListener {
-      override def onReceive(event: KylinJobEvent): Unit = {
-        if (event.isInstanceOf[UnknownThrowable]) {
-          receivePermissionDenied.getAndSet(true)
-        }
-        latch.countDown()
-      }
-    }
-    eventLoop.registerListener(listener)
-    eventLoop.post(RunJob())
-    // receive RunJob and PermissionDenied
-    latch.await()
-    assert(receivePermissionDenied.get())
-    eventLoop.unregisterListener(listener)
-    worker.stop()
-    eventLoop.stop()
-  }
-
-  test("post Permission denied event when AccessControlException occurred") {
-    val eventLoop = new KylinJobEventLoop
-    eventLoop.start()
-    val worker = new JobWorker(new PermissionDeniedJobWithNoRetryException(), Array.empty, eventLoop)
-    val latch = new CountDownLatch(2)
-    val receivePermissionDenied = new AtomicBoolean(false)
-    val listener = new KylinJobListener {
-      override def onReceive(event: KylinJobEvent): Unit = {
-        if (event.isInstanceOf[UnknownThrowable]) {
-          receivePermissionDenied.getAndSet(true)
-        }
-        latch.countDown()
-      }
-    }
-    eventLoop.registerListener(listener)
-    eventLoop.post(RunJob())
-    // receive RunJob and PermissionDenied
-    latch.await()
-    assert(receivePermissionDenied.get())
-    eventLoop.unregisterListener(listener)
-    worker.stop()
-    eventLoop.stop()
-  }
 }
 
 class UnknownThrowableJob extends SparkApplication {
@@ -207,35 +110,6 @@ class UnknownThrowableJob extends SparkApplication {
   override protected def doExecute(): Unit = {}
 }
 
-class PermissionDeniedJobWithHandleException extends SparkApplication {
-  override def execute(args: Array[String]): Unit = {
-    try {
-      throw new AccessControlException()
-    } catch {
-      case e : Exception => handleException(e)
-    }
-  }
-  override protected def doExecute(): Unit = {}
-}
-
-class PermissionDeniedJobWithRuntimeException extends SparkApplication {
-  override def execute(args: Array[String]): Unit = {
-    try {
-      throw new AccessControlException()
-    } catch {
-      case e : Exception => throw new RuntimeException("Error execute " + this.getClass.getName, new NoRetryException("Permission denied."))
-    }
-  }
-  override protected def doExecute(): Unit = {}
-}
-
-class PermissionDeniedJobWithNoRetryException extends SparkApplication {
-  override def execute(args: Array[String]): Unit = {
-      throw new NoRetryException("Permission Denied")
-  }
-  override protected def doExecute(): Unit = {}
-}
-
 class ResourceLackJob extends SparkApplication {
 
   override def execute(args: Array[String]): Unit = {
@@ -245,19 +119,6 @@ class ResourceLackJob extends SparkApplication {
   override protected def doExecute(): Unit = {}
 }
 
-class ResourceLackJobWithRuntimeException extends SparkApplication {
-
-  override def execute(args: Array[String]): Unit = {
-    try {
-      throw new Exception()
-    } catch {
-      case e: Exception => throw new RuntimeException("Error execute " + this.getClass.getName, e)
-    }
-  }
-
-  override protected def doExecute(): Unit = {}
-}
-
 class MockSucceedJob extends SparkApplication {
   override def execute(args: Array[String]): Unit = {}
 


[kylin] 01/12: KYLIN-5342 upgrade spark version 3.2.0-kylin-4.6.1.0 before Release Candidate

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 60a2b5b4fa4f7a1c71dc9776b23a99dc48f7c1c3
Author: huangsheng <hu...@163.com>
AuthorDate: Thu Nov 3 14:36:25 2022 +0800

    KYLIN-5342 upgrade spark version 3.2.0-kylin-4.6.1.0 before Release Candidate
---
 pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/pom.xml b/pom.xml
index d9401bc208..cf05b751a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,7 +65,7 @@
 
         <!-- Spark versions -->
         <delta.version>1.2.1</delta.version>
-        <spark.version>3.2.0-kylin-4.6.1.0-SNAPSHOT</spark.version>
+        <spark.version>3.2.0-kylin-4.6.1.0</spark.version>
 
         <roaring.version>0.9.2-kylin-r4</roaring.version>
 


[kylin] 07/12: KYLIN-5347 use spark session hadoop config when aws serverless environment on build snapshot for partition table

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 7bd187ae0d6dc88c3518d61b8afd665f4ee13797
Author: xingjian.zheng <xi...@kyligence.io>
AuthorDate: Fri Nov 4 20:37:20 2022 +0800

    KYLIN-5347 use spark session hadoop config when aws serverless environment on build snapshot for partition table
---
 .../scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
index ab2c13e3d0..ce5a080601 100644
--- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
+++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
@@ -459,6 +459,10 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable {
   }
 
   private[builder] def decideSparkJobArg(sourceData: Dataset[Row]): (Int, Double) = {
+    var hadoopConf = SparderEnv.getHadoopConfiguration()
+    if (kylinConfig.getClusterManagerClassName.contains("AWSServerless")) {
+      hadoopConf = sourceData.sparkSession.sparkContext.hadoopConfiguration
+    }
     try {
       val sizeInMB = ResourceDetectUtils.getPaths(sourceData.queryExecution.sparkPlan)
         .map(path => HadoopUtil.getContentSummary(path.getFileSystem(SparderEnv.getHadoopConfiguration()), path).getLength)


[kylin] 05/12: KYLIN-5346 add monitor metrics for long running jobs

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit bda0206639c03fb124959e48e346894adb4881f2
Author: Liang.Hua <36...@users.noreply.github.com>
AuthorDate: Fri Nov 4 11:47:07 2022 +0800

    KYLIN-5346 add monitor metrics for long running jobs
---
 .../apache/kylin/rest/config/MetricsConfig.java    |  1 +
 .../rest/config/initialize/MetricsRegistry.java    | 63 ++++++++++++++++++++-
 .../config/initialize/MetricsRegistryTest.java     | 65 ++++++++++++++++------
 .../apache/kylin/common/metrics/MetricsTag.java    |  2 +
 .../metrics/prometheus/PrometheusMetrics.java      |  1 +
 5 files changed, 113 insertions(+), 19 deletions(-)

diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java
index f5920789d0..a0126eee3e 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/MetricsConfig.java
@@ -79,6 +79,7 @@ public class MetricsConfig {
             Set<String> allProjects = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv()).listAllProjects()
                     .stream().map(ProjectInstance::getName).collect(Collectors.toSet());
 
+            MetricsRegistry.refreshProjectLongRunningJobs(KylinConfig.getInstanceFromEnv(), allProjects);
             Sets.SetView<String> newProjects = Sets.difference(allProjects, allControlledProjects);
             for (String newProject : newProjects) {
                 log.info("Register project metrics for {}", newProject);
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java
index f8d653dcba..987df10e52 100644
--- a/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/config/initialize/MetricsRegistry.java
@@ -43,6 +43,7 @@ import org.apache.kylin.common.metrics.prometheus.PrometheusMetrics;
 import org.apache.kylin.common.persistence.metadata.JdbcDataSource;
 import org.apache.kylin.common.scheduler.EventBusFactory;
 import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableState;
 import org.apache.kylin.job.execution.JobTypeEnum;
 import org.apache.kylin.job.execution.NExecutableManager;
@@ -57,8 +58,6 @@ import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metadata.user.ManagedUser;
-import org.apache.kylin.metadata.user.NKylinUserManager;
 import org.apache.kylin.query.util.LoadCounter;
 import org.apache.kylin.query.util.LoadDesc;
 import org.apache.kylin.rest.service.ProjectService;
@@ -72,6 +71,8 @@ import com.codahale.metrics.jvm.GarbageCollectorMetricSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.kylin.metadata.user.ManagedUser;
+import org.apache.kylin.metadata.user.NKylinUserManager;
 import io.micrometer.core.instrument.Gauge;
 import io.micrometer.core.instrument.Meter;
 import io.micrometer.core.instrument.MeterRegistry;
@@ -88,6 +89,10 @@ public class MetricsRegistry {
     private static final String GLOBAL = "global";
 
     private static final Map<String, Long> totalStorageSizeMap = Maps.newHashMap();
+    private static volatile Map<String, Map<Integer, Long>> projectPendingJobMap = Maps.newHashMap();
+    private static volatile Map<String, Map<Double, Long>> projectRunningJobMap = Maps.newHashMap();
+    private static final int[] PENDING_JOB_TIMEOUT_MINUTE = new int[] { 5, 10, 15, 30 };
+    private static final double[] RUNNING_JOB_TIMEOUT_HOUR = new double[] { 0.5d, 1d, 1.5d, 2d, 3d };
 
     private static final Logger logger = LoggerFactory.getLogger(MetricsRegistry.class);
 
@@ -99,6 +104,42 @@ public class MetricsRegistry {
         });
     }
 
+    public static void refreshProjectLongRunningJobs(KylinConfig kylinConfig, Set<String> projects) {
+        Map<String, Map<Integer, Long>> tempProjectPendingJobMap = Maps.newHashMap();
+        Map<String, Map<Double, Long>> tempProjectRunningJobMap = Maps.newHashMap();
+        for (String project : projects) {
+            final NExecutableManager executableManager = NExecutableManager.getInstance(kylinConfig, project);
+            tempProjectPendingJobMap.put(project, collectTimeoutToPendingJobsMap(executableManager));
+            tempProjectRunningJobMap.put(project, collectTimeoutToRunningJobsMap(executableManager));
+        }
+        projectPendingJobMap = tempProjectPendingJobMap;
+        projectRunningJobMap = tempProjectRunningJobMap;
+    }
+    
+    private static Map<Integer, Long> collectTimeoutToPendingJobsMap(NExecutableManager executableManager) {
+        Map<Integer, Long> timeoutToPendingJobsMap = Maps.newHashMap();
+        List<AbstractExecutable> pendingJobs = executableManager.getAllJobs().stream()
+                .filter(e -> ExecutableState.READY.name().equals(e.getOutput().getStatus()))
+                .map(executableManager::fromPO).collect(Collectors.toList());
+        for (int pendingJobMin : PENDING_JOB_TIMEOUT_MINUTE) {
+            timeoutToPendingJobsMap.put(pendingJobMin,
+                    pendingJobs.stream().filter(e -> e.getWaitTime() / 1000.0 > pendingJobMin * 60).count());
+        }
+        return timeoutToPendingJobsMap;
+    }
+
+    private static Map<Double, Long> collectTimeoutToRunningJobsMap(NExecutableManager executableManager) {
+        Map<Double, Long> timeoutToRunningJobsMap = Maps.newHashMap();
+        List<AbstractExecutable> runningJobs = executableManager.getAllJobs().stream()
+                .filter(e -> ExecutableState.RUNNING.name().equals(e.getOutput().getStatus()))
+                .map(executableManager::fromPO).collect(Collectors.toList());
+        for (double runningJobHour : RUNNING_JOB_TIMEOUT_HOUR) {
+            timeoutToRunningJobsMap.put(runningJobHour,
+                    runningJobs.stream().filter(e -> e.getDuration() / 1000.0 > runningJobHour * 3600).count());
+        }
+        return timeoutToRunningJobsMap;
+    }
+
     public static void removeProjectFromStorageSizeMap(String project) {
         totalStorageSizeMap.remove(project);
     }
@@ -189,6 +230,24 @@ public class MetricsRegistry {
                         : scheduler.getContext().getRunningJobs().values().stream()
                                 .filter(job -> ExecutableState.RUNNING.equals(job.getOutput().getState())).count())
                 .tags(projectTag).tags(MetricsTag.STATE.getVal(), MetricsTag.RUNNING.getVal()).register(meterRegistry);
+
+        for (double runningTimeoutHour : RUNNING_JOB_TIMEOUT_HOUR) {
+            Gauge.builder(PrometheusMetrics.JOB_LONG_RUNNING.getValue(),
+                    () -> MetricsRegistry.projectRunningJobMap.getOrDefault(project, Maps.newHashMap())
+                            .getOrDefault(runningTimeoutHour, 0L))
+                    .tags(projectTag).tags(MetricsTag.STATE.getVal(), MetricsTag.RUNNING.getVal(),
+                            MetricsTag.TIMEOUT.getVal(), runningTimeoutHour + "h")
+                    .register(meterRegistry);
+        }
+
+        for (int waitTimeoutMin : PENDING_JOB_TIMEOUT_MINUTE) {
+            Gauge.builder(PrometheusMetrics.JOB_LONG_RUNNING.getValue(),
+                    () -> MetricsRegistry.projectPendingJobMap.getOrDefault(project, Maps.newHashMap())
+                            .getOrDefault(waitTimeoutMin, 0L))
+                    .tags(projectTag).tags(MetricsTag.STATE.getVal(), MetricsTag.WAITING.getVal(),
+                            MetricsTag.TIMEOUT.getVal(), waitTimeoutMin + "m")
+                    .register(meterRegistry);
+        }
     }
 
     public static void registerHostMetrics(String host) {
diff --git a/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java b/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java
index d414abf2e5..a3e02ca687 100644
--- a/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java
+++ b/src/common-service/src/test/java/org/apache/kylin/rest/config/initialize/MetricsRegistryTest.java
@@ -22,23 +22,15 @@ import static org.apache.kylin.common.persistence.metadata.jdbc.JdbcUtil.datasou
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.commons.dbcp2.BasicDataSourceFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.execution.BaseTestExecutable;
-import org.apache.kylin.job.execution.DefaultOutput;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.JobTypeEnum;
-import org.apache.kylin.job.execution.NExecutableManager;
-import org.apache.kylin.job.execution.SucceedTestExecutable;
-import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
-import org.apache.kylin.rest.util.SpringContext;
 import org.apache.kylin.common.metrics.MetricsController;
 import org.apache.kylin.common.metrics.MetricsGroup;
 import org.apache.kylin.common.metrics.MetricsName;
@@ -46,9 +38,19 @@ import org.apache.kylin.common.metrics.MetricsTag;
 import org.apache.kylin.common.metrics.prometheus.PrometheusMetrics;
 import org.apache.kylin.common.persistence.metadata.JdbcDataSource;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.job.dao.ExecutableOutputPO;
+import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultOutput;
+import org.apache.kylin.job.execution.Executable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableState;
+import org.apache.kylin.job.execution.NExecutableManager;
+import org.apache.kylin.job.impl.threadpool.NDefaultScheduler;
 import org.apache.kylin.query.util.LoadCounter;
 import org.apache.kylin.rest.response.StorageVolumeInfoResponse;
 import org.apache.kylin.rest.service.ProjectService;
+import org.apache.kylin.rest.util.SpringContext;
 import org.apache.spark.sql.SparderEnv;
 import org.junit.Assert;
 import org.junit.Before;
@@ -77,7 +79,7 @@ import lombok.var;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({ SpringContext.class, MetricsGroup.class, UserGroupInformation.class, JdbcDataSource.class,
-        SparderEnv.class, NDefaultScheduler.class, LoadCounter.class})
+        SparderEnv.class, NDefaultScheduler.class, NExecutableManager.class, LoadCounter.class })
 public class MetricsRegistryTest extends NLocalFileMetadataTestCase {
 
     private MeterRegistry meterRegistry;
@@ -107,6 +109,7 @@ public class MetricsRegistryTest extends NLocalFileMetadataTestCase {
         PowerMockito.mockStatic(SpringContext.class);
         PowerMockito.mockStatic(SparderEnv.class);
         PowerMockito.mockStatic(NDefaultScheduler.class);
+        PowerMockito.mockStatic(NExecutableManager.class);
         PowerMockito.mockStatic(LoadCounter.class);
     }
 
@@ -209,6 +212,33 @@ public class MetricsRegistryTest extends NLocalFileMetadataTestCase {
         MetricsRegistry.registerProjectPrometheusMetrics(kylinConfig, project);
         Collection<Gauge> gauges6 = meterRegistry.find(PrometheusMetrics.JOB_COUNTS.getValue()).gauges();
         gauges6.forEach(e -> Assert.assertEquals(1, e.value(), 0));
+        Collection<Meter> meters4 = meterRegistry.find(PrometheusMetrics.JOB_LONG_RUNNING.getValue()).meters();
+        meters4.forEach(meter -> meterRegistry.remove(meter));
+        MetricsRegistry.registerProjectPrometheusMetrics(kylinConfig, project);
+        Collection<Gauge> gauges7 = meterRegistry.find(PrometheusMetrics.JOB_LONG_RUNNING.getValue()).gauges();
+        Assert.assertEquals(0, gauges7.stream().filter(e -> e.value() == 1).count());
+        NExecutableManager executableManager = PowerMockito.mock(NExecutableManager.class);
+        PowerMockito.when(NExecutableManager.getInstance(kylinConfig, "default")).thenReturn(executableManager);
+        ExecutablePO mockExecutablePO = Mockito.mock(ExecutablePO.class);
+        ExecutablePO mockExecutablePO1 = Mockito.mock(ExecutablePO.class);
+        AbstractExecutable mockAbstractExecutable = Mockito.mock(AbstractExecutable.class);
+        AbstractExecutable mockAbstractExecutable1 = Mockito.mock(AbstractExecutable.class);
+        ExecutableOutputPO mockExecutableOutputPO = Mockito.mock(ExecutableOutputPO.class);
+        ExecutableOutputPO mockExecutableOutputPO1 = Mockito.mock(ExecutableOutputPO.class);
+        Mockito.when(mockExecutablePO.getOutput()).thenReturn(mockExecutableOutputPO);
+        Mockito.when(mockExecutablePO1.getOutput()).thenReturn(mockExecutableOutputPO1);
+        Mockito.when(mockExecutableOutputPO.getStatus()).thenReturn(ExecutableState.READY.name());
+        Mockito.when(mockExecutableOutputPO1.getStatus()).thenReturn(ExecutableState.RUNNING.name());
+        Mockito.when(mockAbstractExecutable.getWaitTime()).thenReturn(8 * 60 * 1000L);
+        Mockito.when(mockAbstractExecutable1.getDuration()).thenReturn(3 * 60 * 60 * 1000L);
+        Mockito.when(executableManager.fromPO(mockExecutablePO)).thenReturn(mockAbstractExecutable);
+        Mockito.when(executableManager.fromPO(mockExecutablePO1)).thenReturn(mockAbstractExecutable1);
+        Mockito.when(executableManager.getAllJobs())
+                .thenReturn(Lists.newArrayList(mockExecutablePO, mockExecutablePO1));
+        Set<String> projectSet = new HashSet<>();
+        projectSet.add(project);
+        MetricsRegistry.refreshProjectLongRunningJobs(kylinConfig, projectSet);
+        Assert.assertEquals(5, gauges7.stream().filter(e -> e.value() == 1).count());
     }
 
 
@@ -220,17 +250,18 @@ public class MetricsRegistryTest extends NLocalFileMetadataTestCase {
         Mockito.when(projectService.getStorageVolumeInfoResponse(project)).thenReturn(response);
         PowerMockito.when(SpringContext.getBean(ProjectService.class)).thenReturn(projectService);
 
+        val manager = PowerMockito.mock(NExecutableManager.class);
+        PowerMockito.when(NExecutableManager.getInstance(getTestConfig(), project)).thenReturn(manager);
         MetricsRegistry.registerProjectMetrics(getTestConfig(), project, "localhost");
         MetricsRegistry.registerHostMetrics("localhost");
         List<Meter> meters = meterRegistry.getMeters();
         Assert.assertEquals(0, meters.size());
 
-        val manager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
-        BaseTestExecutable executable = new SucceedTestExecutable();
-        executable.setParam("test1", "test1");
-        executable.setProject(project);
-        executable.setJobType(JobTypeEnum.INDEX_BUILD);
-        manager.addJob(executable);
+        ExecutablePO mockExecutablePO = Mockito.mock(ExecutablePO.class);
+        ExecutableOutputPO mockExecutableOutputPO = Mockito.mock(ExecutableOutputPO.class);
+        Mockito.when(mockExecutablePO.getOutput()).thenReturn(mockExecutableOutputPO);
+        Mockito.when(mockExecutableOutputPO.getStatus()).thenReturn(ExecutableState.READY.name());
+        Mockito.when(manager.getAllJobs()).thenReturn(Lists.newArrayList(mockExecutablePO));
 
         var result = MetricsController.getDefaultMetricRegistry()
                 .getGauges(MetricFilter.contains(MetricsName.JOB_RUNNING_GAUGE.getVal()));
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java b/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java
index de7f235048..926070d66a 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/metrics/MetricsTag.java
@@ -36,6 +36,8 @@ public enum MetricsTag {
     RUNNING("running"), //
     JOB_CATEGORY("category"), //
     HOST("host"), //
+    TIMEOUT("timeout"), //
+    WAITING("waiting"), //
     HIT_SECOND_STORAGE("hit_second_storage");
 
     private final String value;
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java b/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java
index 550edbe5ed..52c1e3d892 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/metrics/prometheus/PrometheusMetrics.java
@@ -42,6 +42,7 @@ public enum PrometheusMetrics {
 
     JOB_COUNTS("ke_job_counts", Type.PROJECT_METRIC), //
     JOB_MINUTES("ke_job_minutes", Type.PROJECT_METRIC), //
+    JOB_LONG_RUNNING("ke_long_running_jobs", Type.PROJECT_METRIC), //
 
     MODEL_BUILD_DURATION("ke_model_build_minutes", Type.PROJECT_METRIC | Type.MODEL_METRIC);
 


[kylin] 12/12: remove unused profile

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f760606ff3ed99a5189d0f66295a7515fa604220
Author: junqing.cai <ju...@kyligence.io>
AuthorDate: Wed Nov 9 20:19:13 2022 +0800

    remove unused profile
---
 pom.xml                                                |  1 -
 .../kylin/rest/controller/open/OpenUserController.java | 10 +++++-----
 .../java/org/apache/kylin/query/schema/OLAPTable.java  | 18 ------------------
 3 files changed, 5 insertions(+), 24 deletions(-)

diff --git a/pom.xml b/pom.xml
index c2bea76ab1..6ad384d72d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,7 +41,6 @@
         <maven-model.version>3.3.9</maven-model.version>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-        <kylin.module.version>5.0.0-SNAPSHOT</kylin.module.version>
         <arthas.version>3.6.3-kyarthas-r2</arthas.version>
 
         <!-- Keep same order with dependencyManagement -->
diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/open/OpenUserController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/open/OpenUserController.java
index db19bb337a..0346534bbc 100644
--- a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/open/OpenUserController.java
+++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/open/OpenUserController.java
@@ -26,16 +26,15 @@ import java.util.stream.Collectors;
 
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.controller.NBasicController;
+import org.apache.kylin.rest.controller.NUserController;
 import org.apache.kylin.rest.request.CachedUserUpdateRequest;
+import org.apache.kylin.rest.request.PasswordChangeRequest;
+import org.apache.kylin.rest.request.UserRequest;
 import org.apache.kylin.rest.response.DataResult;
 import org.apache.kylin.rest.response.EnvelopeResponse;
 import org.apache.kylin.rest.response.UserInfoResponse;
 import org.apache.kylin.rest.service.UserService;
-import org.apache.kylin.metadata.user.ManagedUser;
-import org.apache.kylin.rest.controller.NBasicController;
-import org.apache.kylin.rest.controller.NUserController;
-import org.apache.kylin.rest.request.PasswordChangeRequest;
-import org.apache.kylin.rest.request.UserRequest;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.security.access.prepost.PreAuthorize;
 import org.springframework.stereotype.Controller;
@@ -49,6 +48,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import org.apache.kylin.metadata.user.ManagedUser;
 import io.swagger.annotations.ApiOperation;
 
 @Controller
diff --git a/src/query-common/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/src/query-common/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
index 96e8fa02f4..9551f54bd0 100644
--- a/src/query-common/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
+++ b/src/query-common/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
@@ -16,24 +16,6 @@
  * limitations under the License.
  */
 
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.kylin.query.schema;
 
 import java.util.ArrayList;


[kylin] 02/12: KYLIN-5343 Add column datatype check when import model

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit f241d43dca49719c9848510ca936ecc63c06bab1
Author: Jiale He <35...@users.noreply.github.com>
AuthorDate: Thu Nov 3 19:14:06 2022 +0800

    KYLIN-5343 Add column datatype check when import model
---
 .../org/apache/kylin/metadata/model/TableDesc.java |  18 ---
 .../metadata/model/schema/ModelImportChecker.java  |  31 ++--
 .../model/schema/SchemaChangeCheckResult.java      |  99 +++++++------
 .../kylin/metadata/model/schema/SchemaUtil.java    |  24 ++-
 .../schema/strategy/ComputedColumnStrategy.java    |  37 +++--
 .../schema/strategy/MultiplePartitionStrategy.java |   8 +-
 .../schema/strategy/OverWritableStrategy.java      |  16 +-
 .../schema/strategy/SchemaChangeStrategy.java      |  32 ++--
 .../model/schema/strategy/TableColumnStrategy.java |  27 ++--
 .../model/schema/strategy/TableStrategy.java       |   9 +-
 .../schema/strategy/UnOverWritableStrategy.java    |  18 ++-
 .../apache/kylin/metadata/model/TableDescTest.java |  55 -------
 .../metadata/model/schema/SchemaUtilTest.java      |  14 ++
 .../localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json |  14 +-
 .../kylin/rest/service/MetaStoreService.java       | 158 ++++++++++----------
 .../kylin/rest/service/MetaStoreServiceTest.java   | 161 ++++-----------------
 16 files changed, 316 insertions(+), 405 deletions(-)

diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
index 33231ff7b9..1202ba6b07 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/TableDesc.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -307,23 +306,6 @@ public class TableDesc extends RootPersistentEntity implements Serializable, ISo
         return null;
     }
 
-    public Pair<Set<ColumnDesc>, Set<ColumnDesc>> findColumns(Set<ColumnDesc> columnDescSet) {
-        Set<ColumnDesc> existColSet = Sets.newHashSet();
-        Set<ColumnDesc> notExistColSet = Sets.newHashSet();
-        if (CollectionUtils.isEmpty(columnDescSet)) {
-            return Pair.newPair(existColSet, notExistColSet);
-        }
-        for (ColumnDesc searchColumnDesc : columnDescSet) {
-            ColumnDesc columnDesc = findColumnByName(searchColumnDesc.getName());
-            if (Objects.isNull(columnDesc)) {
-                notExistColSet.add(searchColumnDesc);
-            } else {
-                existColSet.add(columnDesc);
-            }
-        }
-        return Pair.newPair(existColSet, notExistColSet);
-    }
-
     @Override
     public String getResourcePath() {
         return concatResourcePath(getIdentity(), project);
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java
index 560ca58538..49e8977d2d 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/ModelImportChecker.java
@@ -24,6 +24,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metadata.cube.model.NDataflow;
+import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.schema.strategy.ComputedColumnStrategy;
 import org.apache.kylin.metadata.model.schema.strategy.MultiplePartitionStrategy;
@@ -43,24 +45,29 @@ public class ModelImportChecker {
             new UnOverWritableStrategy(), new TableColumnStrategy(), new TableStrategy(), new OverWritableStrategy(),
             new MultiplePartitionStrategy());
 
-    public static SchemaChangeCheckResult check(SchemaUtil.SchemaDifference difference,
-            ImportModelContext importModelContext) {
+    public static SchemaChangeCheckResult check(SchemaUtil.SchemaDifference diff, ImportModelContext context) {
+        String targetProject = context.getTargetProject();
+        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
+
         Set<String> importModels = NDataModelManager
-                .getInstance(importModelContext.getImportKylinConfig(), importModelContext.getTargetProject())
-                .listAllModelAlias().stream().map(model -> importModelContext.getNewModels().getOrDefault(model, model))
+                .getInstance(context.getImportKylinConfig(), targetProject).listAllModelAlias().stream()
+                .map(model -> context.getNewModels().getOrDefault(model, model))
                 .collect(Collectors.toSet());
 
-        Set<String> originalModels = NDataModelManager
-                .getInstance(KylinConfig.getInstanceFromEnv(), importModelContext.getTargetProject())
-                .listAllModelAlias();
+        // all models include broken
+        Set<String> originalModels = NDataModelManager.getInstance(kylinConfig, targetProject).listAllModelAlias();
+        // broken models
+        Set<String> originBrokenModels = NDataflowManager.getInstance(kylinConfig, targetProject).listAllDataflows(true)
+                .stream().filter(NDataflow::checkBrokenWithRelatedInfo).map(df -> df.getModel().getAlias())
+                .collect(Collectors.toSet());
 
         val result = new SchemaChangeCheckResult();
         for (SchemaChangeStrategy strategy : strategies) {
-            result.addMissingItems(strategy.missingItems(difference, importModels, originalModels));
-            result.addNewItems(strategy.newItems(difference, importModels, originalModels));
-            result.addReduceItems(strategy.reduceItems(difference, importModels, originalModels));
-            result.addUpdateItems(strategy.updateItems(difference, importModels, originalModels));
-            result.areEqual(strategy.areEqual(difference, importModels));
+            result.addMissingItems(strategy.missingItems(diff, importModels, originalModels, originBrokenModels));
+            result.addNewItems(strategy.newItems(diff, importModels, originalModels, originBrokenModels));
+            result.addReduceItems(strategy.reduceItems(diff, importModels, originalModels, originBrokenModels));
+            result.addUpdateItems(strategy.updateItems(diff, importModels, originalModels, originBrokenModels));
+            result.areEqual(strategy.areEqual(diff, importModels));
         }
         return result;
     }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java
index 90b116cd85..fbe1b0489d 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaChangeCheckResult.java
@@ -24,19 +24,20 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import org.apache.kylin.metadata.model.TableDesc;
+
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonUnwrapped;
-import com.google.common.collect.Sets;
 
 import lombok.AllArgsConstructor;
 import lombok.Data;
-import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
+import lombok.val;
 
 @Data
 public class SchemaChangeCheckResult {
@@ -44,6 +45,9 @@ public class SchemaChangeCheckResult {
     @JsonProperty
     private Map<String, ModelSchemaChange> models = new HashMap<>();
 
+    @JsonIgnore
+    private List<TableDesc> existTableList = new ArrayList<>();
+
     @Data
     public static class ModelSchemaChange {
         private int differences;
@@ -67,13 +71,13 @@ public class SchemaChangeCheckResult {
         @JsonProperty("importable")
         public boolean importable() {
             return Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream)
-                    .allMatch(BaseItem::isImportable) || isLoadTableAble();
+                    .allMatch(BaseItem::isImportable);
         }
 
         @JsonProperty("creatable")
         public boolean creatable() {
             return Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream)
-                    .allMatch(BaseItem::isCreatable) || isLoadTableAble();
+                    .allMatch(BaseItem::isCreatable);
         }
 
         @JsonProperty("")
@@ -92,13 +96,12 @@ public class SchemaChangeCheckResult {
                     .allMatch(BaseItem::isHasSameName);
         }
 
-        @Setter
-        @JsonIgnore
-        private boolean loadTableAble = false;
-
-        @Getter
-        @JsonIgnore
-        private Set<String> loadTables = Sets.newHashSet();
+        @JsonProperty("has_same_name_broken")
+        public boolean hasSameNameBroken() {
+            val set = Stream.of(missingItems, newItems, updateItems, reduceItems).flatMap(Collection::stream)
+                    .collect(Collectors.toSet());
+            return !set.isEmpty() && set.stream().allMatch(BaseItem::isHasSameNameBroken);
+        }
     }
 
     @Data
@@ -117,6 +120,9 @@ public class SchemaChangeCheckResult {
         @JsonProperty("has_same_name")
         boolean hasSameName;
 
+        @JsonProperty("has_same_name_broken")
+        boolean hasSameNameBroken;
+
         @JsonProperty("importable")
         boolean importable;
         @JsonProperty("creatable")
@@ -142,40 +148,45 @@ public class SchemaChangeCheckResult {
     @NoArgsConstructor
     @AllArgsConstructor
     public static class ChangedItem extends BaseItem {
-        @Getter(PRIVATE)
+
         private SchemaNode schemaNode;
 
         public ChangedItem(SchemaNodeType type, SchemaNode schemaNode, String modelAlias, UN_IMPORT_REASON reason,
-                String conflictItem, boolean hasSameName, boolean importable, boolean creatable, boolean overwritable) {
-            super(type, modelAlias, new ConflictReason(reason, conflictItem), hasSameName, importable, creatable,
-                    overwritable);
+                String conflictItem, BaseItemParameter parameter) {
+            super(type, modelAlias, new ConflictReason(reason, conflictItem), parameter.hasSameName,
+                    parameter.hasSameNameBroken, parameter.importable, parameter.creatable, parameter.overwritable);
             this.schemaNode = schemaNode;
         }
 
         public static ChangedItem createUnImportableSchemaNode(SchemaNodeType type, SchemaNode schemaNode,
-                UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName) {
-            return new ChangedItem(type, schemaNode, null, reason, conflictItem, hasSameName, false, false, false);
+                UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName, boolean hasSameNameBroken) {
+            return new ChangedItem(type, schemaNode, null, reason, conflictItem,
+                    new BaseItemParameter(hasSameName, hasSameNameBroken, false, false, false));
         }
 
         public static ChangedItem createUnImportableSchemaNode(SchemaNodeType type, SchemaNode schemaNode,
-                String modelAlias, UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName) {
-            return new ChangedItem(type, schemaNode, modelAlias, reason, conflictItem, hasSameName, false, false,
-                    false);
+                String modelAlias, UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName,
+                boolean hasSameNameBroken) {
+            return new ChangedItem(type, schemaNode, modelAlias, reason, conflictItem,
+                    new BaseItemParameter(hasSameName, hasSameNameBroken, false, false, false));
         }
 
         public static ChangedItem createOverwritableSchemaNode(SchemaNodeType type, SchemaNode schemaNode,
-                boolean hasSameName) {
-            return new ChangedItem(type, schemaNode, null, null, null, hasSameName, true, true, true);
+                boolean hasSameName, boolean hasSameNameBroken) {
+            return new ChangedItem(type, schemaNode, null, null, null,
+                    new BaseItemParameter(hasSameName, hasSameNameBroken, true, true, true));
         }
 
         public static ChangedItem createOverwritableSchemaNode(SchemaNodeType type, SchemaNode schemaNode,
-                String modelAlias, boolean hasSameName) {
-            return new ChangedItem(type, schemaNode, modelAlias, null, null, hasSameName, true, true, true);
+                String modelAlias, boolean hasSameName, boolean hasSameNameBroken) {
+            return new ChangedItem(type, schemaNode, modelAlias, null, null,
+                    new BaseItemParameter(hasSameName, hasSameNameBroken, true, true, true));
         }
 
         public static ChangedItem createCreatableSchemaNode(SchemaNodeType type, SchemaNode schemaNode,
-                boolean hasSameName) {
-            return new ChangedItem(type, schemaNode, null, null, null, hasSameName, true, true, false);
+                boolean hasSameName, boolean hasSameNameBroken) {
+            return new ChangedItem(type, schemaNode, null, null, null,
+                    new BaseItemParameter(hasSameName, hasSameNameBroken, true, true, false));
         }
 
         public String getModelAlias() {
@@ -219,26 +230,22 @@ public class SchemaChangeCheckResult {
             return getDetail(secondSchemaNode);
         }
 
-        public UpdatedItem(SchemaNode firstSchemaNode, SchemaNode secondSchemaNode, String modelAlias,
-                UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName, boolean importable,
-                boolean creatable, boolean overwritable) {
-            super(secondSchemaNode.getType(), modelAlias, new ConflictReason(reason, conflictItem), hasSameName,
-                    importable, creatable, overwritable);
-            this.firstSchemaNode = firstSchemaNode;
-            this.secondSchemaNode = secondSchemaNode;
+        public UpdatedItem(SchemaNode first, SchemaNode second, String modelAlias, UN_IMPORT_REASON reason,
+                String conflictItem, BaseItemParameter parameter) {
+            super(second.getType(), modelAlias, new ConflictReason(reason, conflictItem), parameter.hasSameName,
+                    parameter.hasSameNameBroken, parameter.importable, parameter.creatable, parameter.overwritable);
+            this.firstSchemaNode = first;
+            this.secondSchemaNode = second;
         }
 
         public static UpdatedItem getSchemaUpdate(SchemaNode first, SchemaNode second, String modelAlias,
-                UN_IMPORT_REASON reason, String conflictItem, boolean hasSameName, boolean importable,
-                boolean creatable, boolean overwritable) {
-            return new UpdatedItem(first, second, modelAlias, reason, conflictItem, hasSameName, importable, creatable,
-                    overwritable);
+                UN_IMPORT_REASON reason, String conflictItem, BaseItemParameter parameter) {
+            return new UpdatedItem(first, second, modelAlias, reason, conflictItem, parameter);
         }
 
         public static UpdatedItem getSchemaUpdate(SchemaNode first, SchemaNode second, String modelAlias,
-                boolean hasSameName, boolean importable, boolean creatable, boolean overwritable) {
-            return getSchemaUpdate(first, second, modelAlias, UN_IMPORT_REASON.NONE, null, hasSameName, importable,
-                    creatable, overwritable);
+                BaseItemParameter parameter) {
+            return getSchemaUpdate(first, second, modelAlias, UN_IMPORT_REASON.NONE, null, parameter);
         }
     }
 
@@ -302,4 +309,14 @@ public class SchemaChangeCheckResult {
         MISSING_TABLE, //
         NONE;
     }
+
+    @Data
+    @AllArgsConstructor
+    public static class BaseItemParameter {
+        private boolean hasSameName;
+        private boolean hasSameNameBroken;
+        private boolean importable;
+        private boolean creatable;
+        private boolean overwritable;
+    }
 }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java
index e90867b965..24b2512a49 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/SchemaUtil.java
@@ -25,15 +25,15 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.JoinTableDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableRef;
 import org.apache.kylin.metadata.cube.model.IndexPlan;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TableRef;
 
 import com.google.common.collect.Lists;
 
@@ -48,6 +48,22 @@ import lombok.val;
 
 public class SchemaUtil {
 
+    public static SchemaDifference diff(String project, KylinConfig sourceConfig, KylinConfig targetConfig,
+            List<TableDesc> incrTableDescList) {
+        val sourceGraph = dependencyGraph(project, sourceConfig, incrTableDescList);
+        val targetGraph = dependencyGraph(project, targetConfig);
+        return new SchemaDifference(sourceGraph, targetGraph);
+    }
+
+    public static Graph<SchemaNode> dependencyGraph(String project, KylinConfig config,
+            List<TableDesc> incrTableDescList) {
+        val tableManager = NTableMetadataManager.getInstance(config, project);
+        val planManager = NIndexPlanManager.getInstance(config, project);
+        List<TableDesc> tableDescs = Lists.newArrayList(tableManager.listAllTables());
+        tableDescs.addAll(incrTableDescList);
+        return dependencyGraph(tableDescs, planManager.listAllIndexPlans());
+    }
+
     public static SchemaDifference diff(String project, KylinConfig sourceConfig, KylinConfig targetConfig) {
         val sourceGraph = dependencyGraph(project, sourceConfig);
         val targetGraph = dependencyGraph(project, targetConfig);
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java
index 6b3af9e78a..8274da560b 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/ComputedColumnStrategy.java
@@ -48,7 +48,7 @@ public class ComputedColumnStrategy implements SchemaChangeStrategy {
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> newItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         List<SchemaNode> allComputedColumns = difference.getSourceGraph().nodes().stream()
                 .filter(schemaNode -> supportedSchemaNodeTypes().contains(schemaNode.getType()))
                 .collect(Collectors.toList());
@@ -59,7 +59,7 @@ public class ComputedColumnStrategy implements SchemaChangeStrategy {
         if (hasComputedColumnNameWithDifferentExpression(entry.getValue(), allComputedColumns)) {
             return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode(
                     entry.getKey().getType(), entry.getValue(), SAME_CC_NAME_HAS_DIFFERENT_EXPR, null,
-                    hasSameName(modelAlias, originalModels)));
+                    hasSameName(modelAlias, originalModels), hasSameWithBroken(modelAlias, originalBrokenModels)));
         }
 
         // different cc name with same expression
@@ -67,21 +67,25 @@ public class ComputedColumnStrategy implements SchemaChangeStrategy {
         if (optional.isPresent()) {
             return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode(
                     entry.getKey().getType(), entry.getValue(), DIFFERENT_CC_NAME_HAS_SAME_EXPR,
-                    optional.get().getDetail(), hasSameName(modelAlias, originalModels)));
+                    optional.get().getDetail(), hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         }
 
         if (overwritable(importModels, originalModels, modelAlias)) {
             return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         } else {
             return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         }
     }
 
     @Override
     public List<SchemaChangeCheckResult.UpdatedItem> updateItemFunction(SchemaUtil.SchemaDifference difference,
-            MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels) {
+            MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels,
+            Set<String> originalBrokenModels) {
         List<SchemaNode> allComputedColumns = difference.getSourceGraph().nodes().stream()
                 .filter(schemaNode -> supportedSchemaNodeTypes().contains(schemaNode.getType()))
                 .collect(Collectors.toList());
@@ -90,36 +94,43 @@ public class ComputedColumnStrategy implements SchemaChangeStrategy {
         String modelAlias = diff.rightValue().getSubject();
         // same cc name with different expression
         if (hasComputedColumnNameWithDifferentExpression(schemaNode, allComputedColumns)) {
+            val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels), false, false, false);
             return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                    diff.rightValue(), modelAlias, SAME_CC_NAME_HAS_DIFFERENT_EXPR, null,
-                    hasSameName(modelAlias, originalModels), false, false, false));
+                    diff.rightValue(), modelAlias, SAME_CC_NAME_HAS_DIFFERENT_EXPR, null, parameter));
         }
 
         // different cc name with same expression
         val optional = hasExpressionWithDifferentComputedColumn(schemaNode, allComputedColumns);
         if (optional.isPresent()) {
+            val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels), false, false, false);
             return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
                     diff.rightValue(), modelAlias, DIFFERENT_CC_NAME_HAS_SAME_EXPR, optional.get().getDetail(),
-                    hasSameName(modelAlias, originalModels), false, false, false));
+                    parameter));
         }
 
         boolean overwritable = overwritable(importModels, originalModels, modelAlias);
+        val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels), true, true, overwritable);
         return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                diff.rightValue(), modelAlias, hasSameName(modelAlias, originalModels), true, true, overwritable));
+                diff.rightValue(), modelAlias, parameter));
     }
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> reduceItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         String modelAlias = entry.getValue().getSubject();
         boolean overwritable = overwritable(importModels, originalModels, modelAlias);
         if (overwritable) {
             return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         } else {
             return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         }
     }
 
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java
index 17c63fdec3..b53420bfd4 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/MultiplePartitionStrategy.java
@@ -31,6 +31,7 @@ import org.apache.kylin.metadata.model.schema.SchemaUtil;
 import org.apache.kylin.metadata.model.util.MultiPartitionUtil;
 
 import io.kyligence.kap.guava20.shaded.common.collect.MapDifference;
+import lombok.val;
 
 public class MultiplePartitionStrategy extends UnOverWritableStrategy {
     @Override
@@ -40,7 +41,8 @@ public class MultiplePartitionStrategy extends UnOverWritableStrategy {
 
     @Override
     public List<SchemaChangeCheckResult.UpdatedItem> updateItemFunction(SchemaUtil.SchemaDifference difference,
-            MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels) {
+            MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels,
+            Set<String> originalBrokenModels) {
         String modelAlias = diff.rightValue().getSubject();
 
         boolean overwritable = overwritable(diff);
@@ -66,8 +68,10 @@ public class MultiplePartitionStrategy extends UnOverWritableStrategy {
             }
         }
 
+        val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels), true, true, overwritable);
         return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                diff.rightValue(), modelAlias, hasSameName(modelAlias, originalModels), true, true, overwritable));
+                diff.rightValue(), modelAlias, parameter));
     }
 
     /**
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java
index 998b3c201a..6ccf37feda 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/OverWritableStrategy.java
@@ -40,27 +40,29 @@ public class OverWritableStrategy implements SchemaChangeStrategy {
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> newItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
-        return createSchemaChange(difference, entry, importModels, originalModels);
+            Set<String> originalModels, Set<String> originalBrokenModels) {
+        return createSchemaChange(difference, entry, importModels, originalModels, originalBrokenModels);
     }
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> reduceItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
-        return createSchemaChange(difference, entry, importModels, originalModels);
+            Set<String> originalModels, Set<String> originalBrokenModels) {
+        return createSchemaChange(difference, entry, importModels, originalModels, originalBrokenModels);
     }
 
     private List<SchemaChangeCheckResult.ChangedItem> createSchemaChange(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         String modelAlias = entry.getValue().getSubject();
         if (overwritable(importModels, originalModels, modelAlias)) {
             return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         } else {
             return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode(
-                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels)));
+                    entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels),
+                    hasSameWithBroken(modelAlias, originalBrokenModels)));
         }
     }
 }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java
index ce77f66bf1..30c182d440 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/SchemaChangeStrategy.java
@@ -33,64 +33,68 @@ import org.apache.kylin.metadata.model.schema.SchemaUtil;
 import io.kyligence.kap.guava20.shaded.common.collect.MapDifference;
 import io.kyligence.kap.guava20.shaded.common.graph.Graph;
 import io.kyligence.kap.guava20.shaded.common.graph.Graphs;
+import lombok.val;
 
 public interface SchemaChangeStrategy {
     List<SchemaNodeType> supportedSchemaNodeTypes();
 
     default List<SchemaChangeCheckResult.ChangedItem> missingItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return Collections.emptyList();
     }
 
     default List<SchemaChangeCheckResult.ChangedItem> missingItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) {
         return Collections.emptyList();
     }
 
     default List<SchemaChangeCheckResult.ChangedItem> newItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return Collections.emptyList();
     }
 
     default List<SchemaChangeCheckResult.ChangedItem> newItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) {
         return difference.getNodeDiff().entriesOnlyOnRight().entrySet().stream()
                 .filter(entry -> supportedSchemaNodeTypes().contains(entry.getKey().getType()))
-                .map(entry -> newItemFunction(difference, entry, importModels, originalModels))
+                .map(entry -> newItemFunction(difference, entry, importModels, originalModels, originalBrokenModels))
                 .flatMap(Collection::stream).filter(schemaChange -> importModels.contains(schemaChange.getModelAlias()))
                 .collect(Collectors.toList());
     }
 
     default List<SchemaChangeCheckResult.UpdatedItem> updateItemFunction(SchemaUtil.SchemaDifference difference,
-            MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels) {
+            MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels,
+            Set<String> originalBrokenModels) {
         String modelAlias = diff.rightValue().getSubject();
         boolean overwritable = overwritable(importModels, originalModels, modelAlias);
+        val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels), true, true, overwritable);
         return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                diff.rightValue(), modelAlias, hasSameName(modelAlias, originalModels), true, true, overwritable));
+                diff.rightValue(), modelAlias, parameter));
     }
 
     default List<SchemaChangeCheckResult.UpdatedItem> updateItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) {
         return difference.getNodeDiff().entriesDiffering().values().stream()
                 .filter(entry -> supportedSchemaNodeTypes().contains(entry.leftValue().getType()))
-                .map(diff -> updateItemFunction(difference, diff, importModels, originalModels))
+                .map(diff -> updateItemFunction(difference, diff, importModels, originalModels, originalBrokenModels))
                 .flatMap(Collection::stream).filter(schemaChange -> importModels.contains(schemaChange.getModelAlias()))
                 .collect(Collectors.toList());
     }
 
     default List<SchemaChangeCheckResult.ChangedItem> reduceItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return Collections.emptyList();
     }
 
     default List<SchemaChangeCheckResult.ChangedItem> reduceItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) {
         return difference.getNodeDiff().entriesOnlyOnLeft().entrySet().stream()
                 .filter(entry -> supportedSchemaNodeTypes().contains(entry.getKey().getType()))
-                .map(entry -> reduceItemFunction(difference, entry, importModels, originalModels))
+                .map(entry -> reduceItemFunction(difference, entry, importModels, originalModels, originalBrokenModels))
                 .flatMap(Collection::stream).filter(schemaChange -> importModels.contains(schemaChange.getModelAlias()))
                 .collect(Collectors.toList());
     }
@@ -115,4 +119,8 @@ public interface SchemaChangeStrategy {
                 .map(SchemaNode::getSubject).collect(Collectors.toSet());
     }
 
+    default boolean hasSameWithBroken(String modelAlias, Set<String> originalBrokenModels) {
+        return originalBrokenModels.contains(modelAlias);
+    }
+
 }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java
index 2157d95a88..752f57dfed 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableColumnStrategy.java
@@ -35,6 +35,7 @@ import org.apache.kylin.metadata.model.schema.SchemaUtil;
 
 import io.kyligence.kap.guava20.shaded.common.collect.MapDifference;
 import io.kyligence.kap.guava20.shaded.common.graph.Graphs;
+import lombok.val;
 
 public class TableColumnStrategy implements SchemaChangeStrategy {
     @Override
@@ -44,10 +45,10 @@ public class TableColumnStrategy implements SchemaChangeStrategy {
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> missingItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) {
         return difference.getNodeDiff().entriesOnlyOnRight().entrySet().stream()
                 .filter(pair -> supportedSchemaNodeTypes().contains(pair.getKey().getType()))
-                .map(pair -> missingItemFunction(difference, pair, importModels, originalModels))
+                .map(pair -> missingItemFunction(difference, pair, importModels, originalModels, originalBrokenModels))
                 .flatMap(Collection::stream).filter(schemaChange -> importModels.contains(schemaChange.getModelAlias()))
                 .collect(Collectors.toList());
     }
@@ -55,34 +56,40 @@ public class TableColumnStrategy implements SchemaChangeStrategy {
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> missingItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return reachableModel(difference.getTargetGraph(), entry.getValue()).stream()
                 .map(modelAlias -> SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode(
                         entry.getKey().getType(), entry.getValue(), modelAlias, USED_UNLOADED_TABLE,
-                        entry.getValue().getDetail(), hasSameName(modelAlias, originalModels)))
+                        entry.getValue().getDetail(), hasSameName(modelAlias, originalModels),
+                        hasSameWithBroken(modelAlias, originalBrokenModels)))
                 .collect(Collectors.toList());
     }
 
     @Override
     public List<SchemaChangeCheckResult.UpdatedItem> updateItemFunction(SchemaUtil.SchemaDifference difference,
-            MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels) {
+            MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels,
+            Set<String> originalBrokenModels) {
         return Graphs.reachableNodes(difference.getTargetGraph(), diff.rightValue()).stream()
                 .filter(SchemaNode::isModelNode).map(SchemaNode::getSubject).distinct()
-                .map(modelAlias -> SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                        diff.rightValue(), modelAlias, TABLE_COLUMN_DATATYPE_CHANGED, diff.rightValue().getDetail(),
-                        hasSameName(modelAlias, originalModels), false, false, false))
+                .map(modelAlias -> {
+                    val parameter = new SchemaChangeCheckResult.BaseItemParameter(
+                            hasSameName(modelAlias, originalModels),
+                            hasSameWithBroken(modelAlias, originalBrokenModels), false, false, false);
+                    return SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(), diff.rightValue(),
+                            modelAlias, TABLE_COLUMN_DATATYPE_CHANGED, diff.rightValue().getDetail(), parameter);
+                })
                 .collect(Collectors.toList());
     }
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> reduceItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return Graphs.reachableNodes(difference.getSourceGraph(), entry.getValue()).stream()
                 .filter(SchemaNode::isModelNode).map(SchemaNode::getSubject).distinct()
                 .map(modelAlias -> SchemaChangeCheckResult.ChangedItem.createOverwritableSchemaNode(
                         entry.getKey().getType(), entry.getValue(), modelAlias,
-                        hasSameName(modelAlias, originalModels)))
+                        hasSameName(modelAlias, originalModels), hasSameWithBroken(modelAlias, originalBrokenModels)))
                 .collect(Collectors.toList());
     }
 }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java
index b727aa8faa..55746d6fcf 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/TableStrategy.java
@@ -40,10 +40,10 @@ public class TableStrategy implements SchemaChangeStrategy {
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> missingItems(SchemaUtil.SchemaDifference difference,
-            Set<String> importModels, Set<String> originalModels) {
+            Set<String> importModels, Set<String> originalModels, Set<String> originalBrokenModels) {
         return difference.getNodeDiff().entriesOnlyOnRight().entrySet().stream()
                 .filter(pair -> supportedSchemaNodeTypes().contains(pair.getKey().getType()))
-                .map(pair -> missingItemFunction(difference, pair, importModels, originalModels))
+                .map(pair -> missingItemFunction(difference, pair, importModels, originalModels, originalBrokenModels))
                 .flatMap(Collection::stream).filter(schemaChange -> importModels.contains(schemaChange.getModelAlias()))
                 .collect(Collectors.toList());
     }
@@ -51,11 +51,12 @@ public class TableStrategy implements SchemaChangeStrategy {
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> missingItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         return reachableModel(difference.getTargetGraph(), entry.getValue()).stream()
                 .map(modelAlias -> SchemaChangeCheckResult.ChangedItem.createUnImportableSchemaNode(
                         entry.getKey().getType(), entry.getValue(), modelAlias, MISSING_TABLE,
-                        entry.getValue().getDetail(), hasSameName(modelAlias, originalModels)))
+                        entry.getValue().getDetail(), hasSameName(modelAlias, originalModels),
+                        hasSameWithBroken(modelAlias, originalBrokenModels)))
                 .collect(Collectors.toList());
     }
 }
diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java
index 20f85b7b24..f405c855e7 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/model/schema/strategy/UnOverWritableStrategy.java
@@ -30,6 +30,7 @@ import org.apache.kylin.metadata.model.schema.SchemaNodeType;
 import org.apache.kylin.metadata.model.schema.SchemaUtil;
 
 import io.kyligence.kap.guava20.shaded.common.collect.MapDifference;
+import lombok.val;
 
 public class UnOverWritableStrategy implements SchemaChangeStrategy {
 
@@ -42,26 +43,31 @@ public class UnOverWritableStrategy implements SchemaChangeStrategy {
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> newItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         String modelAlias = entry.getValue().getSubject();
         return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode(
-                entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels)));
+                entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels)));
     }
 
     @Override
     public List<SchemaChangeCheckResult.UpdatedItem> updateItemFunction(SchemaUtil.SchemaDifference difference,
-            MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels) {
+            MapDifference.ValueDifference<SchemaNode> diff, Set<String> importModels, Set<String> originalModels,
+            Set<String> originalBrokenModels) {
         String modelAlias = diff.rightValue().getSubject();
+        val parameter = new SchemaChangeCheckResult.BaseItemParameter(hasSameName(modelAlias, originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels), true, true, false);
         return Collections.singletonList(SchemaChangeCheckResult.UpdatedItem.getSchemaUpdate(diff.leftValue(),
-                diff.rightValue(), modelAlias, hasSameName(modelAlias, originalModels), true, true, false));
+                diff.rightValue(), modelAlias, parameter));
     }
 
     @Override
     public List<SchemaChangeCheckResult.ChangedItem> reduceItemFunction(SchemaUtil.SchemaDifference difference,
             Map.Entry<SchemaNode.SchemaNodeIdentifier, SchemaNode> entry, Set<String> importModels,
-            Set<String> originalModels) {
+            Set<String> originalModels, Set<String> originalBrokenModels) {
         String modelAlias = entry.getValue().getSubject();
         return Collections.singletonList(SchemaChangeCheckResult.ChangedItem.createCreatableSchemaNode(
-                entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels)));
+                entry.getKey().getType(), entry.getValue(), hasSameName(modelAlias, originalModels),
+                hasSameWithBroken(modelAlias, originalBrokenModels)));
     }
 }
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java
index 7eff5cffa9..66e6036843 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/TableDescTest.java
@@ -21,7 +21,6 @@ package org.apache.kylin.metadata.model;
 import static org.apache.kylin.metadata.model.NTableMetadataManager.getInstance;
 
 import java.util.Locale;
-import java.util.Set;
 
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.junit.After;
@@ -29,10 +28,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.Sets;
-
-import lombok.val;
-
 public class TableDescTest extends NLocalFileMetadataTestCase {
     private final String project = "default";
     private NTableMetadataManager tableMetadataManager;
@@ -72,54 +67,4 @@ public class TableDescTest extends NLocalFileMetadataTestCase {
         final TableDesc tableDesc = tableMetadataManager.getTableDesc(tableName);
         Assert.assertFalse(tableDesc.isRangePartition());
     }
-
-    @Test
-    public void testFindColumns() {
-        final String tableName = "DEFAULT.TEST_KYLIN_FACT";
-        final TableDesc tableDesc = tableMetadataManager.getTableDesc(tableName);
-        ColumnDesc[] columns = tableDesc.getColumns();
-        Assert.assertEquals(12, columns.length);
-
-        {
-            // test search column empty
-            Set<ColumnDesc> searchColSet = Sets.newHashSet();
-            val pair = tableDesc.findColumns(searchColSet);
-            Assert.assertTrue(pair.getFirst().isEmpty());
-            Assert.assertTrue(pair.getSecond().isEmpty());
-        }
-
-        {
-            // test all founded
-            Set<ColumnDesc> searchColSet = Sets.newHashSet(
-                    new ColumnDesc("1", "TRANS_ID", "bigint", "TRANS_ID", "", "", ""),
-                    new ColumnDesc("2", "ORDER_ID", "bigint", "TRANS_ID", "", "", ""));
-            val pair = tableDesc.findColumns(searchColSet);
-            Assert.assertFalse(pair.getFirst().isEmpty());
-            Assert.assertTrue(pair.getSecond().isEmpty());
-            Assert.assertEquals(2, pair.getFirst().size());
-        }
-
-        {
-            // test part founded
-            Set<ColumnDesc> searchColSet = Sets.newHashSet(
-                    new ColumnDesc("1", "TRANS_ID_1", "bigint", "TRANS_ID", "", "", ""),
-                    new ColumnDesc("2", "ORDER_ID", "bigint", "TRANS_ID", "", "", ""));
-            val pair = tableDesc.findColumns(searchColSet);
-            Assert.assertFalse(pair.getFirst().isEmpty());
-            Assert.assertFalse(pair.getSecond().isEmpty());
-            Assert.assertEquals(1, pair.getFirst().size());
-            Assert.assertEquals(1, pair.getSecond().size());
-        }
-
-        {
-            // test part founded
-            Set<ColumnDesc> searchColSet = Sets.newHashSet(
-                    new ColumnDesc("1", "TRANS_ID_1", "bigint", "TRANS_ID", "", "", ""),
-                    new ColumnDesc("2", "ORDER_ID_1", "bigint", "TRANS_ID", "", "", ""));
-            val pair = tableDesc.findColumns(searchColSet);
-            Assert.assertTrue(pair.getFirst().isEmpty());
-            Assert.assertFalse(pair.getSecond().isEmpty());
-            Assert.assertEquals(2, pair.getSecond().size());
-        }
-    }
 }
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java
index 4b43a9e538..61a605201f 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/model/schema/SchemaUtilTest.java
@@ -42,6 +42,8 @@ import org.apache.kylin.common.persistence.RawResource;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
 import org.apache.kylin.metadata.model.ModelJoinRelationTypeEnum;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -49,6 +51,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import io.kyligence.kap.guava20.shaded.common.collect.Sets;
@@ -555,6 +558,17 @@ public class SchemaUtilTest extends NLocalFileMetadataTestCase {
                         && schemaChange.getConflictReason()
                                 .getReason() == SchemaChangeCheckResult.UN_IMPORT_REASON.MISSING_TABLE
                         && schemaChange.getConflictReason().getConflictItem().equals("SSB.CUSTOMER_NEW")));
+
+        NTableMetadataManager manager = NTableMetadataManager.getInstance(getTestConfig(), getTargetProject());
+        TableDesc tableDesc = manager.getTableDesc("SSB.CUSTOMER");
+        tableDesc.setName("CUSTOMER_NEW");
+        tableDesc.init(getTargetProject());
+        val diff = SchemaUtil.diff(getTargetProject(), KylinConfig.getInstanceFromEnv(),
+                importModelContext.getTargetKylinConfig(), Lists.newArrayList(tableDesc));
+        val checkResult = ModelImportChecker.check(diff, importModelContext);
+        Assert.assertFalse(checkResult.getModels().isEmpty());
+        val change = checkResult.getModels().get(getTargetModel());
+        Assert.assertTrue(change.getMissingItems().isEmpty());
     }
 
     @Test
diff --git a/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json b/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json
index 88ecdea540..076c6e26ee 100644
--- a/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json
+++ b/src/examples/test_case_data/localmeta/data/tableDesc/SSB.CUSTOMER_NEW.json
@@ -9,31 +9,31 @@
   }, {
     "id" : "2",
     "name" : "C_NAME",
-    "datatype" : "varchar(25)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "3",
     "name" : "C_ADDRESS",
-    "datatype" : "varchar(40)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "4",
     "name" : "C_CITY",
-    "datatype" : "varchar(10)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "5",
     "name" : "C_NATION",
-    "datatype" : "varchar(15)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "6",
     "name" : "C_REGION",
-    "datatype" : "varchar(12)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "7",
     "name" : "C_PHONE",
-    "datatype" : "varchar(15)"
+    "datatype": "varchar(4096)"
   }, {
     "id" : "8",
     "name" : "C_MKTSEGMENT",
-    "datatype" : "varchar(10)"
+    "datatype": "varchar(4096)"
   } ],
   "database" : "SSB",
   "last_modified" : 1457444146362,
diff --git a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
index 21097bcd85..32d3f93901 100644
--- a/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
+++ b/src/modeling-service/src/main/java/org/apache/kylin/rest/service/MetaStoreService.java
@@ -28,10 +28,8 @@ import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_NAME_
 import static org.apache.kylin.common.persistence.ResourceStore.METASTORE_UUID_TAG;
 import static org.apache.kylin.common.persistence.ResourceStore.VERSION_FILE;
 import static org.apache.kylin.metadata.model.schema.ImportModelContext.MODEL_REC_PATH;
-import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.DIFFERENT_CC_NAME_HAS_SAME_EXPR;
-import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.SAME_CC_NAME_HAS_DIFFERENT_EXPR;
-import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.TABLE_COLUMN_DATATYPE_CHANGED;
-import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_TABLE;
+import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_DIM;
+import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_FACT;
 
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -39,8 +37,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -72,13 +70,13 @@ import org.apache.kylin.common.persistence.metadata.MetadataStore;
 import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.MetadataChecker;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.RandomUtil;
 import org.apache.kylin.metadata.cube.model.IndexEntity;
 import org.apache.kylin.metadata.cube.model.IndexPlan;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
 import org.apache.kylin.metadata.cube.model.RuleBasedIndex;
-import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.MultiPartitionDesc;
 import org.apache.kylin.metadata.model.NDataModel;
@@ -99,6 +97,7 @@ import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.rest.aspect.Transaction;
 import org.apache.kylin.rest.constant.ModelStatusToDisplayEnum;
 import org.apache.kylin.rest.request.ModelImportRequest;
+import org.apache.kylin.rest.request.ModelImportRequest.ImportType;
 import org.apache.kylin.rest.request.UpdateRuleBasedCuboidRequest;
 import org.apache.kylin.rest.response.LoadTableResponse;
 import org.apache.kylin.rest.response.ModelPreviewResponse;
@@ -137,9 +136,6 @@ public class MetaStoreService extends BasicService {
     private static final Pattern MD5_PATTERN = Pattern.compile(".*([a-fA-F\\d]{32})\\.zip");
     private static final String RULE_SCHEDULER_DATA_KEY = "kylin.index.rule-scheduler-data";
 
-    private static final Set<SchemaChangeCheckResult.UN_IMPORT_REASON> UN_IMPORT_REASONS = Sets.newHashSet(
-            SAME_CC_NAME_HAS_DIFFERENT_EXPR, DIFFERENT_CC_NAME_HAS_SAME_EXPR, TABLE_COLUMN_DATATYPE_CHANGED);
-
     @Autowired
     public AclEvaluate aclEvaluate;
 
@@ -378,12 +374,12 @@ public class MetaStoreService extends BasicService {
 
         if (request != null) {
             val newModels = request.getModels().stream()
-                    .filter(modelImport -> modelImport.getImportType() == ModelImportRequest.ImportType.NEW)
+                    .filter(modelImport -> modelImport.getImportType() == ImportType.NEW)
                     .collect(Collectors.toMap(ModelImportRequest.ModelImport::getOriginalName,
                             ModelImportRequest.ModelImport::getTargetName));
 
             val unImportModels = request.getModels().stream()
-                    .filter(modelImport -> modelImport.getImportType() == ModelImportRequest.ImportType.UN_IMPORT)
+                    .filter(modelImport -> modelImport.getImportType() == ImportType.UN_IMPORT)
                     .map(ModelImportRequest.ModelImport::getOriginalName).collect(Collectors.toList());
 
             return new ImportModelContext(targetProject, srcProject, rawResourceMap, newModels, unImportModels);
@@ -418,83 +414,44 @@ public class MetaStoreService extends BasicService {
 
     public SchemaChangeCheckResult checkModelMetadata(String targetProject, ImportModelContext context,
             MultipartFile uploadFile) throws IOException {
-        Map<String, RawResource> rawResourceMap = getRawResourceFromUploadFile(uploadFile);
 
-        checkModelMetadataFile(ResourceStore.getKylinMetaStore(context.getTargetKylinConfig()).getMetadataStore(),
+        KylinConfig targetKylinConfig = context.getTargetKylinConfig();
+        Map<String, RawResource> rawResourceMap = getRawResourceFromUploadFile(uploadFile);
+        checkModelMetadataFile(ResourceStore.getKylinMetaStore(targetKylinConfig).getMetadataStore(),
                 rawResourceMap.keySet());
 
-        SchemaUtil.SchemaDifference difference = SchemaUtil.diff(targetProject, KylinConfig.getInstanceFromEnv(),
-                context.getTargetKylinConfig());
-
-        SchemaChangeCheckResult checkResult = ModelImportChecker.check(difference, context);
-
-        Set<String> loadAbleTables = getLoadAbleTables(targetProject, context.getTargetMissTableList());
-        if (CollectionUtils.isEmpty(loadAbleTables)) {
-            return checkResult;
-        }
-        // mark every model loadTableAble
-        return checkTableLoadAble(loadAbleTables, checkResult);
-    }
-
-    public SchemaChangeCheckResult checkTableLoadAble(Set<String> loadAbleTables, SchemaChangeCheckResult checkResult) {
-        checkResult.getModels().forEach((modelName, change) -> {
-            if (change.creatable() || change.importable() || change.overwritable()) {
-                return;
-            }
-            // Verify that tables used by the model can be fully loaded
-            Set<String> missedTableSet = change.getMissingItems().stream()//
-                    .filter(item -> item.getType().equals(MODEL_TABLE))
-                    .map(SchemaChangeCheckResult.ChangedItem::getDetail).collect(Collectors.toSet());
-            if (missedTableSet.isEmpty() || !loadAbleTables.containsAll(missedTableSet)) {
-                return;
-            }
-            // Verify that model has no conflicts
-            List<SchemaChangeCheckResult.BaseItem> items = Lists.newArrayList();
-            items.addAll(change.getNewItems());
-            items.addAll(change.getUpdateItems());
-            boolean hasConflict = items.stream().anyMatch(item -> {
-                val reason = item.getConflictReason().getReason();
-                return UN_IMPORT_REASONS.contains(reason);
-            });
-            if (hasConflict) {
-                return;
-            }
-            change.setLoadTableAble(true);
-            change.getLoadTables().addAll(missedTableSet);
-        });
+        // check missing table exists in datasource
+        List<TableDesc> existTableList = searchTablesInDataSource(targetProject, context.getTargetMissTableList());
+        // diff (local metadata + searched tables) and import metadata
+        val diff = SchemaUtil.diff(targetProject, KylinConfig.getInstanceFromEnv(), targetKylinConfig, existTableList);
+        SchemaChangeCheckResult checkResult = ModelImportChecker.check(diff, context);
+        checkResult.getExistTableList().addAll(existTableList);
         return checkResult;
     }
 
-    public Set<String> getLoadAbleTables(String targetProject, List<TableDesc> missTableList) {
+    public List<TableDesc> searchTablesInDataSource(String targetProject, List<TableDesc> missTableList) {
         if (CollectionUtils.isEmpty(missTableList)) {
-            return Sets.newHashSet();
+            return Collections.emptyList();
         }
         ProjectInstance projectInstance = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv())
                 .getProject(targetProject);
         ISourceMetadataExplorer explorer = SourceFactory.getSource(projectInstance).getSourceMetadataExplorer();
-        Set<String> loadAbleList = Sets.newHashSet();
+
+        List<TableDesc> existTableSet = Lists.newArrayList();
         for (TableDesc missTableDesc : missTableList) {
             try {
-                // get new table desc from datasource
+                // check datasource exist table
+                // no need to check column
                 TableDesc newTableDesc = explorer
                         .loadTableMetadata(missTableDesc.getDatabase(), missTableDesc.getName(), targetProject)
                         .getFirst();
-                // check column all exists
-                Set<ColumnDesc> columnDescList = Arrays.stream(missTableDesc.getColumns())
-                        .filter(col -> !col.isComputedColumn()).collect(Collectors.toSet());
-                Set<ColumnDesc> notExistColSet = newTableDesc.findColumns(columnDescList).getSecond();
-                if (CollectionUtils.isNotEmpty(notExistColSet)) {
-                    // some column not exist in new table desc, mark table cannot load
-                    String missCols = notExistColSet.stream().map(ColumnDesc::getName).collect(Collectors.joining(","));
-                    logger.warn("Can not find columns [{}] in table [{}]", missCols, newTableDesc.getIdentity());
-                    continue;
-                }
-                loadAbleList.add(newTableDesc.getIdentity());
+                newTableDesc.init(targetProject);
+                existTableSet.add(newTableDesc);
             } catch (Exception e) {
                 logger.warn("try load table: {} failed.", missTableDesc.getIdentity(), e);
             }
         }
-        return loadAbleList;
+        return existTableSet;
     }
 
     private void checkModelMetadataFile(MetadataStore metadataStore, Set<String> rawResourceList) {
@@ -654,24 +611,55 @@ public class MetaStoreService extends BasicService {
         }
     }
 
-    public LoadTableResponse innerLoadTables(String project, Set<SchemaChangeCheckResult.ModelSchemaChange> changes)
-            throws Exception {
-        Set<String> loadTables = Sets.newHashSet();
-        changes.forEach(change -> loadTables.addAll(change.getLoadTables()));
-        return tableExtService.loadDbTables(loadTables.toArray(new String[0]), project, false);
+    public LoadTableResponse innerLoadTables(String project, Set<String> needLoadTables) throws Exception {
+        return tableExtService.loadDbTables(needLoadTables.toArray(new String[0]), project, false);
+    }
+
+    public Pair<Set<String>, Map<String, Set<String>>> checkNewModelTables(SchemaChangeCheckResult checkResult,
+            ModelImportRequest request) {
+        List<String> existTableList = checkResult.getExistTableList().stream().map(TableDesc::getIdentity)
+                .collect(Collectors.toList());
+        List<String> newImportModelList = request.getModels().stream()
+                .filter(modelRequest -> modelRequest.getImportType() == ImportType.NEW)
+                .map(ModelImportRequest.ModelImport::getTargetName).collect(Collectors.toList());
+        // all tables need to be loaded
+        Set<String> needLoadTableSet = Sets.newHashSet();
+        // every model need to be loaded tables
+        Map<String, Set<String>> modelTablesMap = Maps.newHashMap();
+
+        checkResult.getModels().forEach((modelName, change) -> {
+            if (!newImportModelList.contains(modelName) || !change.creatable()) {
+                return;
+            }
+            Set<String> modelTables = Sets.newHashSet();
+            change.getNewItems().stream()//
+                    .filter(item -> item.getSchemaNode().getType().equals(MODEL_DIM)
+                            || item.getSchemaNode().getType().equals(MODEL_FACT))
+                    .map(SchemaChangeCheckResult.ChangedItem::getDetail).filter(existTableList::contains)
+                    .forEach(table -> {
+                        needLoadTableSet.add(table);
+                        modelTables.add(table);
+                    });
+            modelTablesMap.put(modelName, modelTables);
+        });
+        return Pair.newPair(needLoadTableSet, modelTablesMap);
     }
 
     private void innerImportModelMetadata(String project, MultipartFile metadataFile, ModelImportRequest request,
             ImportModelContext context, List<Exception> exceptions) throws Exception {
         val schemaChangeCheckResult = checkModelMetadata(project, context, metadataFile);
 
-        val schemaChanges = schemaChangeCheckResult.getModels().entrySet().stream()//
-                .filter(entry -> context.getNewModels().containsValue(entry.getKey())).map(Map.Entry::getValue)
-                .collect(Collectors.toSet());
-        boolean needLoadTable = schemaChanges.stream().anyMatch(change -> !change.getLoadTables().isEmpty());
+        val pair = checkNewModelTables(schemaChangeCheckResult, request);
+        Set<String> needLoadTableSet = pair.getFirst();
+        Map<String, Set<String>> modelTablesMap = pair.getSecond();
+
         LoadTableResponse loadTableResponse = null;
+        boolean needLoadTable = CollectionUtils.isNotEmpty(needLoadTableSet);
         if (needLoadTable) {
-            loadTableResponse = innerLoadTables(project, schemaChanges);
+            // try load tables
+            String needLoadTableStr = String.join(",", needLoadTableSet);
+            logger.info("try load tables: [{}]", needLoadTableStr);
+            loadTableResponse = innerLoadTables(project, needLoadTableSet);
             if (CollectionUtils.isNotEmpty(loadTableResponse.getFailed())) {
                 String loadFailedTables = String.join(",", loadTableResponse.getFailed());
                 logger.warn("Load Table failed: [{}]", loadFailedTables);
@@ -685,10 +673,10 @@ public class MetaStoreService extends BasicService {
         for (ModelImportRequest.ModelImport modelImport : request.getModels()) {
             try {
                 validateModelImport(project, modelImport, schemaChangeCheckResult);
-                if (modelImport.getImportType() == ModelImportRequest.ImportType.NEW) {
-                    val modelSchemaChange = schemaChangeCheckResult.getModels().get(modelImport.getTargetName());
-                    if (needLoadTable && modelSchemaChange.isLoadTableAble()) {
-                        Set<String> needLoadTables = modelSchemaChange.getLoadTables();
+                if (modelImport.getImportType() == ImportType.NEW) {
+                    if (needLoadTable) {
+                        Set<String> needLoadTables = modelTablesMap.getOrDefault(modelImport.getTargetName(),
+                                Collections.emptySet());
                         if (!loadTableResponse.getLoaded().containsAll(needLoadTables)) {
                             logger.warn("Import model [{}] failed, skip import.", modelImport.getOriginalName());
                             continue;
@@ -698,7 +686,7 @@ public class MetaStoreService extends BasicService {
                     var nDataModel = importDataModelManager.copyForWrite(importDataModel);
                     createNewModel(nDataModel, modelImport, project, importIndexPlanManager);
                     importRecommendations(project, nDataModel.getUuid(), importDataModel.getUuid(), targetKylinConfig);
-                } else if (modelImport.getImportType() == ModelImportRequest.ImportType.OVERWRITE) {
+                } else if (modelImport.getImportType() == ImportType.OVERWRITE) {
                     val importDataModel = importDataModelManager.getDataModelDescByAlias(modelImport.getOriginalName());
                     val nDataModel = importDataModelManager.copyForWrite(importDataModel);
 
@@ -733,7 +721,7 @@ public class MetaStoreService extends BasicService {
 
         Message msg = MsgPicker.getMsg();
 
-        if (modelImport.getImportType() == ModelImportRequest.ImportType.OVERWRITE) {
+        if (modelImport.getImportType() == ImportType.OVERWRITE) {
             NDataModel dataModel = NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), project)
                     .getDataModelDescByAlias(modelImport.getOriginalName());
 
@@ -753,7 +741,7 @@ public class MetaStoreService extends BasicService {
                         String.format(Locale.ROOT, msg.getUnSuitableImportType(createType), modelImport.getImportType(),
                                 modelImport.getOriginalName()));
             }
-        } else if (modelImport.getImportType() == ModelImportRequest.ImportType.NEW) {
+        } else if (modelImport.getImportType() == ImportType.NEW) {
 
             if (!org.apache.commons.lang.StringUtils.containsOnly(modelImport.getTargetName(),
                     ModelService.VALID_NAME_FOR_MODEL)) {
diff --git a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java
index dbbedc48bc..948ee05789 100644
--- a/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java
+++ b/src/modeling-service/src/test/java/org/apache/kylin/rest/service/MetaStoreServiceTest.java
@@ -20,8 +20,6 @@ package org.apache.kylin.rest.service;
 
 import static org.apache.kylin.common.constant.Constants.KE_VERSION;
 import static org.apache.kylin.common.exception.code.ErrorCodeServer.MODEL_ID_NOT_EXIST;
-import static org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult.UN_IMPORT_REASON.SAME_CC_NAME_HAS_DIFFERENT_EXPR;
-import static org.apache.kylin.metadata.model.schema.SchemaNodeType.MODEL_TABLE;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -39,7 +37,6 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
-import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.zip.ZipEntry;
 import java.util.zip.ZipInputStream;
@@ -71,6 +68,7 @@ import org.apache.kylin.metadata.model.NDataModel;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.model.schema.SchemaChangeCheckResult;
 import org.apache.kylin.metadata.model.schema.SchemaNodeType;
@@ -105,7 +103,6 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.base.Objects;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 
 import io.kyligence.kap.guava20.shaded.common.io.ByteSource;
 import org.apache.kylin.metadata.recommendation.candidate.JdbcRawRecStore;
@@ -755,17 +752,16 @@ public class MetaStoreServiceTest extends ServiceTestBase {
                 "src/test/resources/ut_model_metadata/metastore_model_metadata_c4a20039c16dfbb5dcc5610c5052d7b3.zip");
         val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
                 Files.newInputStream(file.toPath()));
-        val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
+        val checkResult = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
+        Assert.assertEquals(1, checkResult.getExistTableList().size());
+        Assert.assertEquals("SSB.CUSTOMER_NEW", checkResult.getExistTableList().get(0).getIdentity());
 
-        SchemaChangeCheckResult.ModelSchemaChange modelSchemaChange = metadataCheckResponse.getModels()
+        SchemaChangeCheckResult.ModelSchemaChange change = checkResult.getModels()
                 .get("missing_table_model");
-        Assert.assertNotNull(modelSchemaChange);
-
-        Assert.assertEquals(11, modelSchemaChange.getDifferences());
-        Assert.assertTrue(
-                modelSchemaChange.getMissingItems().stream().anyMatch(sc -> sc.getType() == SchemaNodeType.MODEL_TABLE
-                        && sc.getDetail().equals("SSB.CUSTOMER_NEW") && !sc.isImportable()));
-        Assert.assertTrue(modelSchemaChange.importable());
+        Assert.assertNotNull(change);
+        Assert.assertTrue(change.getMissingItems().isEmpty());
+        Assert.assertTrue(change.importable());
+        Assert.assertTrue(change.creatable());
     }
 
     @Test
@@ -1244,10 +1240,6 @@ public class MetaStoreServiceTest extends ServiceTestBase {
         Assert.assertNull(manager.getTableDesc("SSB.CUSTOMER_NEW"));
         metaStoreService.importModelMetadata("original_project", multipartFile, request);
         Assert.assertNotNull(manager.getTableDesc("SSB.CUSTOMER_NEW"));
-
-        {
-
-        }
     }
 
     @Test
@@ -1389,135 +1381,46 @@ public class MetaStoreServiceTest extends ServiceTestBase {
 
     @Test
     public void testMissTable() throws IOException {
+        String table = "SSB.CUSTOMER_NEW";
         val file = new File(
                 "src/test/resources/ut_meta/schema_utils/model_missing_table_update/model_table_missing_update_model_metadata_2020_11_16_02_37_33_3182D4A7694DA64E3D725C140CF80A47.zip");
         val multipartFile = new MockMultipartFile(file.getName(), file.getName(), null,
                 Files.newInputStream(file.toPath()));
-        val metadataCheckResponse = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
-
-        val modelSchemaChange = metadataCheckResponse.getModels().get("ssb_model");
+        val checkResult = metaStoreService.checkModelMetadata("original_project", multipartFile, null);
+        val modelSchemaChange = checkResult.getModels().get("ssb_model");
+        Assert.assertEquals(1, checkResult.getExistTableList().size());
+        Assert.assertEquals(table, checkResult.getExistTableList().get(0).getIdentity());
         Assert.assertNotNull(modelSchemaChange);
-        Assert.assertTrue(modelSchemaChange.isLoadTableAble());
-        Set<String> loadTables = modelSchemaChange.getLoadTables();
-        Assert.assertEquals(1, loadTables.size());
-        Assert.assertEquals("SSB.CUSTOMER_NEW", loadTables.iterator().next());
         Assert.assertTrue(modelSchemaChange.creatable());
         Assert.assertTrue(modelSchemaChange.importable());
         Assert.assertFalse(modelSchemaChange.overwritable());
-
-        testModelImportable(metadataCheckResponse);
-
-        {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            Mockito.doReturn(false).when(mockChange).overwritable();
-            Mockito.doReturn(false).when(mockChange).creatable();
-            Mockito.doReturn(false).when(mockChange).importable();
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertTrue(change.isLoadTableAble());
-            Assert.assertFalse(change.getLoadTables().isEmpty());
-        }
-
-        {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEWNEW"), metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
-        }
-
-        {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            val missItems = mockChange.getMissingItems().stream().filter(item -> item.getType() != MODEL_TABLE)
-                    .collect(Collectors.toList());
-            ReflectionTestUtils.setField(mockChange, "missingItems", missItems);
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
-        }
-
-        {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            val newItems = mockChange.getNewItems().stream()
-                    .peek(item -> item.getConflictReason().setReason(SAME_CC_NAME_HAS_DIFFERENT_EXPR))
-                    .collect(Collectors.toList());
-            ReflectionTestUtils.setField(mockChange, "newItems", newItems);
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
-        }
     }
 
-    private void testModelImportable(SchemaChangeCheckResult metadataCheckResponse) {
-        val modelSchemaChange = metadataCheckResponse.getModels().get("ssb_model");
-        {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            Mockito.doReturn(true).when(mockChange).overwritable();
-            Mockito.doReturn(true).when(mockChange).creatable();
-            Mockito.doReturn(true).when(mockChange).importable();
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
-        }
-
+    @Test
+    public void testSearchTablesInDataSource() {
         {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            Mockito.doReturn(true).when(mockChange).overwritable();
-            Mockito.doReturn(false).when(mockChange).creatable();
-            Mockito.doReturn(false).when(mockChange).importable();
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
+            val existTables = metaStoreService.searchTablesInDataSource("original_project", Lists.newArrayList());
+            Assert.assertTrue(existTables.isEmpty());
         }
 
         {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            Mockito.doReturn(false).when(mockChange).overwritable();
-            Mockito.doReturn(true).when(mockChange).creatable();
-            Mockito.doReturn(false).when(mockChange).importable();
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
+            TableDesc tableDesc = new TableDesc();
+            tableDesc.setDatabase("SSB");
+            tableDesc.setName("CUSTOMER_NEW");
+            val existTables = metaStoreService.searchTablesInDataSource("original_project",
+                    Lists.newArrayList(tableDesc));
+            Assert.assertFalse(existTables.isEmpty());
+            Assert.assertEquals(1, existTables.size());
+            Assert.assertEquals("SSB.CUSTOMER_NEW", existTables.get(0).getIdentity());
         }
 
         {
-            val mockChange = Mockito.spy(modelSchemaChange);
-            mockChange.setLoadTableAble(false);
-            mockChange.setLoadTables(Sets.newHashSet());
-            Mockito.doReturn(false).when(mockChange).overwritable();
-            Mockito.doReturn(false).when(mockChange).creatable();
-            Mockito.doReturn(true).when(mockChange).importable();
-            metadataCheckResponse.getModels().put("ssb_model", mockChange);
-            metaStoreService.checkTableLoadAble(Sets.newHashSet("SSB.CUSTOMER_NEW"), metadataCheckResponse);
-            val change = metadataCheckResponse.getModels().get("ssb_model");
-            Assert.assertFalse(change.isLoadTableAble());
-            Assert.assertTrue(change.getLoadTables().isEmpty());
+            TableDesc tableDesc = new TableDesc();
+            tableDesc.setDatabase("SSB");
+            tableDesc.setName("CUSTOMER_NEW_NEW");
+            val existTables = metaStoreService.searchTablesInDataSource("original_project",
+                    Lists.newArrayList(tableDesc));
+            Assert.assertTrue(existTables.isEmpty());
         }
     }
 


[kylin] 03/12: KYLIN-5344 Fix epoch update when epoch checker disabled

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit dea257ad3b1140d8f9b1b4c706739428ff6fae60
Author: Jiale He <ji...@kyligence.io>
AuthorDate: Thu Nov 3 20:14:04 2022 +0800

    KYLIN-5344 Fix epoch update when epoch checker disabled
---
 .../org/apache/kylin/metadata/epoch/EpochOrchestrator.java  |  8 +++++---
 .../apache/kylin/metadata/epoch/EpochOrchestratorTest.java  | 13 +++++++++++--
 2 files changed, 16 insertions(+), 5 deletions(-)

diff --git a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java
index 26c36824ae..631e2f5b33 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/metadata/epoch/EpochOrchestrator.java
@@ -70,13 +70,15 @@ public class EpochOrchestrator {
 
     private void startEpochChecker(KylinConfig kylinConfig) {
         // first renew and update epoch at org.apache.kylin.rest.discovery.KylinServiceDiscoveryCache#createServiceCache
+        long pollSecond = kylinConfig.getEpochCheckerIntervalSecond();
+        logger.info("Try to update/renew epoch every {} seconds", pollSecond);
         if (!kylinConfig.getEpochCheckerEnabled()) {
             // this logic can be used when there is only one All or Job KE node
-            logger.info("Disable epoch timing renew and update, renew and update epoch only once");
+            logger.info("Disable epoch timing renew, renew epoch only once");
+            checkerPool = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("EpochChecker"));
+            checkerPool.scheduleWithFixedDelay(new EpochChecker(), 1, pollSecond, TimeUnit.SECONDS);
             return;
         }
-        long pollSecond = kylinConfig.getEpochCheckerIntervalSecond();
-        logger.info("Try to update epoch every {} seconds", pollSecond);
         logger.info("Renew executor work size is :{}", kylinConfig.getRenewEpochWorkerPoolSize());
         checkerPool = Executors.newScheduledThreadPool(2, new NamedThreadFactory("EpochChecker"));
         checkerPool.scheduleWithFixedDelay(new EpochChecker(), 1, pollSecond, TimeUnit.SECONDS);
diff --git a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java
index af47f373e5..dd510cb84c 100644
--- a/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java
+++ b/src/core-metadata/src/test/java/org/apache/kylin/metadata/epoch/EpochOrchestratorTest.java
@@ -62,7 +62,16 @@ class EpochOrchestratorTest {
 
         val epochOrchestrator = new EpochOrchestrator(config);
         val obj = ReflectionTestUtils.getField(epochOrchestrator, "checkerPool");
-        Assertions.assertTrue(Objects.isNull(obj));
-    }
+        Assertions.assertTrue(Objects.nonNull(obj));
+        Assertions.assertTrue(obj instanceof ScheduledExecutorService);
 
+        val pool = (ScheduledExecutorService) obj;
+        Object obj2 = ReflectionTestUtils.getField(pool, "e");
+        Assertions.assertNotNull(obj2);
+        Assertions.assertTrue(obj2 instanceof ScheduledExecutorService);
+        ScheduledExecutorService executors = (ScheduledExecutorService) obj2;
+        Object obj3 = ReflectionTestUtils.getField(executors, "corePoolSize");
+        Assertions.assertNotNull(obj3);
+        Assertions.assertEquals(1, ((Integer) obj3).intValue());
+    }
 }


[kylin] 10/12: KYLIN-5356 Backend configuration of users supports the project administrator role

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit be5e5fa209fb26f8f5747ca3469403a46c8c6a13
Author: Liang.Hua <36...@users.noreply.github.com>
AuthorDate: Mon Oct 31 15:50:44 2022 +0800

    KYLIN-5356 Backend configuration of users supports the project administrator role
---
 .../rest/response/OpenAccessGroupResponse.java     |  40 +++++++
 .../rest/response/OpenAccessUserResponse.java      |  40 +++++++
 .../rest/controller/v2/NAccessControllerV2.java    | 124 ++++++++++++++++++--
 .../rest/controller/NAccessControllerV2Test.java   | 125 ++++++++++++++++++++-
 4 files changed, 316 insertions(+), 13 deletions(-)

diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/response/OpenAccessGroupResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/OpenAccessGroupResponse.java
new file mode 100644
index 0000000000..96042b432f
--- /dev/null
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/response/OpenAccessGroupResponse.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.response;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.Pair;
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class OpenAccessGroupResponse {
+
+    @JsonProperty("groups")
+    private List<Pair<String, Integer>> groups;
+
+    @JsonProperty("size")
+    private int size;
+}
diff --git a/src/common-service/src/main/java/org/apache/kylin/rest/response/OpenAccessUserResponse.java b/src/common-service/src/main/java/org/apache/kylin/rest/response/OpenAccessUserResponse.java
new file mode 100644
index 0000000000..c7f046bc57
--- /dev/null
+++ b/src/common-service/src/main/java/org/apache/kylin/rest/response/OpenAccessUserResponse.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.response;
+
+import java.util.List;
+
+import org.codehaus.jackson.annotate.JsonProperty;
+
+import org.apache.kylin.metadata.user.ManagedUser;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class OpenAccessUserResponse {
+
+    @JsonProperty("users")
+    private List<ManagedUser> users;
+
+    @JsonProperty("size")
+    private int size;
+}
diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/v2/NAccessControllerV2.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/v2/NAccessControllerV2.java
index 2b9f9d2f4b..e4465dd69f 100644
--- a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/v2/NAccessControllerV2.java
+++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/v2/NAccessControllerV2.java
@@ -17,8 +17,8 @@
  */
 package org.apache.kylin.rest.controller.v2;
 
-import static org.apache.kylin.common.exception.ServerErrorCode.USER_NOT_EXIST;
 import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V2_JSON;
+import static org.apache.kylin.common.exception.ServerErrorCode.USER_NOT_EXIST;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -26,22 +26,35 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.exception.KylinException;
 import org.apache.kylin.common.persistence.AclEntity;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.controller.NBasicController;
 import org.apache.kylin.rest.response.AccessEntryResponse;
 import org.apache.kylin.rest.response.EnvelopeResponse;
+import org.apache.kylin.rest.response.OpenAccessGroupResponse;
+import org.apache.kylin.rest.response.OpenAccessUserResponse;
+import org.apache.kylin.rest.security.AclEntityType;
 import org.apache.kylin.rest.service.AccessService;
+import org.apache.kylin.rest.service.AclTCRService;
+import org.apache.kylin.rest.service.IUserGroupService;
 import org.apache.kylin.rest.service.UserService;
+import org.apache.kylin.rest.util.AclEvaluate;
 import org.apache.kylin.rest.util.PagingUtil;
-import org.apache.kylin.rest.controller.NBasicController;
-import org.apache.kylin.rest.service.AclTCRService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.security.acls.domain.GrantedAuthoritySid;
+import org.springframework.security.acls.domain.PrincipalSid;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.GetMapping;
 import org.springframework.web.bind.annotation.PathVariable;
@@ -49,6 +62,10 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestParam;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.kylin.metadata.user.ManagedUser;
 import io.swagger.annotations.ApiOperation;
 
 @Controller
@@ -63,18 +80,26 @@ public class NAccessControllerV2 extends NBasicController {
     @Qualifier("userService")
     protected UserService userService;
 
+    @Autowired
+    @Qualifier("userGroupService")
+    private IUserGroupService userGroupService;
+
     @Autowired
     @Qualifier("aclTCRService")
     private AclTCRService aclTCRService;
 
+    @Autowired
+    private AclEvaluate aclEvaluate;
+
     private static final String PROJECT_NAME = "project_name";
     private static final String TABLE_NAME = "table_name";
 
-    private void checkUserName(String userName) {
+    private ManagedUser checkAndGetUser(String userName) {
         if (!userService.userExists(userName)) {
             throw new KylinException(USER_NOT_EXIST,
                     String.format(Locale.ROOT, "User '%s' does not exists.", userName));
         }
+        return (ManagedUser) userService.loadUserByUsername(userName);
     }
 
     /**
@@ -89,7 +114,7 @@ public class NAccessControllerV2 extends NBasicController {
     @ResponseBody
     @PreAuthorize(Constant.ACCESS_HAS_ROLE_ADMIN)
     public EnvelopeResponse getAllAccessEntitiesOfUser(@PathVariable("userName") String username) throws IOException {
-        checkUserName(username);
+        checkAndGetUser(username);
 
         List<Object> dataList = new ArrayList<>();
         List<String> projectList = accessService.getGrantedProjectsOfUser(username);
@@ -115,13 +140,92 @@ public class NAccessControllerV2 extends NBasicController {
             @RequestParam(value = "pageOffset", required = false, defaultValue = "0") Integer pageOffset,
             @RequestParam(value = "pageSize", required = false, defaultValue = "10") Integer pageSize)
             throws IOException {
-        AclEntity ae = accessService.getAclEntity(type, getProject(project).getUuid());
-        List<AccessEntryResponse> resultsAfterFuzzyMatching = this.accessService.generateAceResponsesByFuzzMatching(ae,
-                nameSeg, isCaseSensitive);
-        List<AccessEntryResponse> sublist = PagingUtil.cutPage(resultsAfterFuzzyMatching, pageOffset, pageSize);
+        List<AccessEntryResponse> accessList = getAccessList(type, project, nameSeg, isCaseSensitive);
+        List<AccessEntryResponse> sublist = PagingUtil.cutPage(accessList, pageOffset, pageSize);
         HashMap<String, Object> data = new HashMap<>();
         data.put("sids", sublist);
-        data.put("size", resultsAfterFuzzyMatching.size());
+        data.put("size", accessList.size());
         return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, data, "");
     }
+
+    @ApiOperation(value = "getAllAccessUsers", tags = { "MID" })
+    @GetMapping(value = "/all/users", produces = { HTTP_VND_APACHE_KYLIN_V2_JSON })
+    @ResponseBody
+    public EnvelopeResponse<OpenAccessUserResponse> getAllAccessUsers(
+            @RequestParam(value = "project", required = false) String project,
+            @RequestParam(value = "userName", required = false) String userName,
+            @RequestParam(value = "pageOffset", required = false, defaultValue = "0") Integer pageOffset,
+            @RequestParam(value = "pageSize", required = false, defaultValue = "10") Integer pageSize)
+            throws IOException {
+        Set<ManagedUser> users = StringUtils.isNotEmpty(userName) ? Sets.newHashSet(checkAndGetUser(userName))
+                : getUsersOfProjects(getGrantedProjects(project));
+        return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, new OpenAccessUserResponse(
+                PagingUtil.cutPage(Lists.newArrayList(users), pageOffset, pageSize), users.size()), "");
+    }
+
+    @ApiOperation(value = "getAllAccessGroups", tags = { "MID" })
+    @GetMapping(value = "/all/groups", produces = { HTTP_VND_APACHE_KYLIN_V2_JSON })
+    @ResponseBody
+    public EnvelopeResponse<OpenAccessGroupResponse> getAllAccessGroups(
+            @RequestParam(value = "project", required = false) String project,
+            @RequestParam(value = "groupName", required = false) String groupName,
+            @RequestParam(value = "pageOffset", required = false, defaultValue = "0") Integer pageOffset,
+            @RequestParam(value = "pageSize", required = false, defaultValue = "10") Integer pageSize)
+            throws IOException {
+        List<Pair<String, Integer>> result = StringUtils.isNotEmpty(groupName)
+                ? Lists.newArrayList(Pair.newPair(groupName, userGroupService.getGroupMembersByName(groupName).size()))
+                : getUserGroupsOfProjects(getGrantedProjects(project));
+        return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, new OpenAccessGroupResponse(
+                PagingUtil.cutPage(Lists.newArrayList(result), pageOffset, pageSize), result.size()), "");
+    }
+
+    private List<AccessEntryResponse> getAccessList(String type, String projectName, String nameSeg,
+            boolean isCaseSensitive) throws IOException {
+        AclEntity aclEntity = accessService.getAclEntity(type, getProject(projectName).getUuid());
+        return this.accessService.generateAceResponsesByFuzzMatching(aclEntity, nameSeg, isCaseSensitive);
+    }
+
+    private List<String> getGrantedProjects(String projectName) {
+        NProjectManager projectManager = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
+        if (StringUtils.isBlank(projectName)) {
+            return projectManager.listAllProjects().stream().map(ProjectInstance::getName)
+                    .filter(name -> aclEvaluate.hasProjectAdminPermission(name)).collect(Collectors.toList());
+        } else if (aclEvaluate.hasProjectReadPermission(projectManager.getProject(projectName))) {
+            return Lists.newArrayList(projectName);
+        }
+        return Lists.newArrayList();
+    }
+    
+    private Set<ManagedUser> getUsersOfProjects(List<String> projects) throws IOException {
+        Set<ManagedUser> allUsers = Sets.newHashSet();
+        for (String projectName : projects) {
+            List<AccessEntryResponse> responses = getAccessList(AclEntityType.PROJECT_INSTANCE, projectName, null,
+                    false);
+            allUsers.addAll(responses.stream().filter(response -> response.getSid() instanceof PrincipalSid)
+                    .map(response -> (ManagedUser) userService
+                            .loadUserByUsername(((PrincipalSid) response.getSid()).getPrincipal()))
+                    .collect(Collectors.toSet()));
+        }
+        return allUsers;
+    }
+
+    private List<Pair<String, Integer>> getUserGroupsOfProjects(List<String> projects) throws IOException {
+        List<Pair<String, Integer>> allUserGroups = Lists.newArrayList();
+        List<String> grantedGroups = Lists.newArrayList();
+        for (String projectName : projects) {
+            List<AccessEntryResponse> responses = getAccessList(AclEntityType.PROJECT_INSTANCE, projectName, null,
+                    false);
+            for (AccessEntryResponse response : responses) {
+                if (response.getSid() instanceof GrantedAuthoritySid) {
+                    String grantedAuthority = ((GrantedAuthoritySid) response.getSid()).getGrantedAuthority();
+                    if (!grantedGroups.contains(grantedAuthority)) {
+                        grantedGroups.add(grantedAuthority);
+                        allUserGroups.add(Pair.newPair(grantedAuthority,
+                                userGroupService.getGroupMembersByName(grantedAuthority).size()));
+                    }
+                }
+            }
+        }
+        return allUserGroups;
+    }
 }
diff --git a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NAccessControllerV2Test.java b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NAccessControllerV2Test.java
index 6aae71d82b..526ab7251d 100644
--- a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NAccessControllerV2Test.java
+++ b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/NAccessControllerV2Test.java
@@ -19,12 +19,30 @@ package org.apache.kylin.rest.controller;
 
 import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V2_JSON;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+
+import org.apache.kylin.common.persistence.AclEntity;
+import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.rest.constant.Constant;
-import org.apache.kylin.rest.service.AccessService;
-import org.apache.kylin.rest.service.UserService;
 import org.apache.kylin.rest.controller.v2.NAccessControllerV2;
+import org.apache.kylin.rest.response.AccessEntryResponse;
+import org.apache.kylin.rest.response.EnvelopeResponse;
+import org.apache.kylin.rest.response.OpenAccessGroupResponse;
+import org.apache.kylin.rest.response.OpenAccessUserResponse;
+import org.apache.kylin.rest.security.AclEntityType;
+import org.apache.kylin.rest.service.AccessService;
 import org.apache.kylin.rest.service.AclTCRService;
+import org.apache.kylin.rest.service.IUserGroupService;
+import org.apache.kylin.rest.service.ProjectService;
+import org.apache.kylin.rest.service.UserService;
+import org.apache.kylin.rest.util.AclEvaluate;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InjectMocks;
@@ -32,8 +50,11 @@ import org.mockito.Mock;
 import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 import org.springframework.http.MediaType;
+import org.springframework.security.acls.domain.GrantedAuthoritySid;
+import org.springframework.security.acls.domain.PrincipalSid;
 import org.springframework.security.authentication.TestingAuthenticationToken;
 import org.springframework.security.core.Authentication;
+import org.springframework.security.core.GrantedAuthority;
 import org.springframework.security.core.context.SecurityContextHolder;
 import org.springframework.test.web.servlet.MockMvc;
 import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
@@ -42,7 +63,9 @@ import org.springframework.test.web.servlet.setup.MockMvcBuilders;
 
 import com.google.common.collect.Lists;
 
-public class NAccessControllerV2Test {
+import org.apache.kylin.metadata.user.ManagedUser;
+
+public class NAccessControllerV2Test extends NLocalFileMetadataTestCase {
 
     private MockMvc mockMvc;
 
@@ -55,6 +78,15 @@ public class NAccessControllerV2Test {
     @Mock
     private AclTCRService aclTCRService;
 
+    @Mock
+    private AclEvaluate aclEvaluate;
+
+    @Mock
+    private ProjectService projectService;
+
+    @Mock
+    private IUserGroupService userGroupService;
+
     @InjectMocks
     private NAccessControllerV2 nAccessControllerV2 = Mockito.spy(new NAccessControllerV2());
 
@@ -67,10 +99,12 @@ public class NAccessControllerV2Test {
                 .defaultRequest(MockMvcRequestBuilders.get("/")).build();
 
         SecurityContextHolder.getContext().setAuthentication(authentication);
+        createTestMetadata();
     }
 
     @After
     public void tearDown() {
+        cleanupTestMetadata();
     }
 
     @Test
@@ -88,4 +122,89 @@ public class NAccessControllerV2Test {
         Mockito.verify(nAccessControllerV2).getAllAccessEntitiesOfUser(userName);
     }
 
+    @Test
+    public void testGetAllAccessUsers() throws Exception {
+        String project = "default";
+        String userName = "user01";
+        mockMvc.perform(MockMvcRequestBuilders.get("/api/access/all/users").contentType(MediaType.APPLICATION_JSON)
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V2_JSON)))
+                .andExpect(MockMvcResultMatchers.status().isOk()).andReturn();
+
+        Mockito.verify(nAccessControllerV2).getAllAccessUsers(null, null, 0, 10);
+
+        Mockito.doNothing().when(aclEvaluate).checkProjectAdminPermission(project);
+        mockMvc.perform(MockMvcRequestBuilders.get("/api/access/all/users").param("project", project)
+                .param("userName", userName).contentType(MediaType.APPLICATION_JSON)
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V2_JSON)))
+                .andExpect(MockMvcResultMatchers.status().is5xxServerError()).andReturn();
+
+        Mockito.verify(nAccessControllerV2).getAllAccessUsers(project, userName, 0, 10);
+
+        List<GrantedAuthority> authorities = new ArrayList<>();
+        ManagedUser user = new ManagedUser(userName, "123", false, authorities);
+        Authentication authentication = new TestingAuthenticationToken(user, userName, Constant.ROLE_ADMIN);
+        SecurityContextHolder.getContext().setAuthentication(authentication);
+        Mockito.doReturn(user).when(userService).loadUserByUsername(userName);
+        try {
+            nAccessControllerV2.getAllAccessUsers(project, userName, 0, 10);
+        } catch (Exception e) {
+            Assert.assertEquals(String.format(Locale.ROOT, "User '%s' does not exists.", userName), e.getMessage());
+        }
+
+        ProjectInstance projectInstance = NProjectManager.getInstance(getTestConfig()).getProject(project);
+        Mockito.doReturn(Lists.newArrayList(projectInstance)).when(projectService).getReadableProjects(project, true);
+        AccessEntryResponse accessEntryResponse = Mockito.mock(AccessEntryResponse.class);
+        AclEntity aclEntity = Mockito.mock(RootPersistentEntity.class);
+        PrincipalSid principalSid = Mockito.mock(PrincipalSid.class);
+        Mockito.doReturn(principalSid).when(accessEntryResponse).getSid();
+        Mockito.doReturn(userName).when(principalSid).getPrincipal();
+        Mockito.doReturn(aclEntity).when(accessService).getAclEntity(AclEntityType.PROJECT_INSTANCE,
+                projectInstance.getUuid());
+        Mockito.doReturn(Lists.newArrayList(accessEntryResponse)).when(accessService)
+                .generateAceResponsesByFuzzMatching(aclEntity, null, false);
+        EnvelopeResponse<OpenAccessUserResponse> envelopeResponse1 = nAccessControllerV2.getAllAccessUsers(project,
+                null, 0, 10);
+        Assert.assertNotNull(envelopeResponse1.getData());
+    }
+
+    @Test
+    public void testGetAllAccessGroups() throws Exception {
+        String project = "default";
+        String groupName = "group01";
+        mockMvc.perform(MockMvcRequestBuilders.get("/api/access/all/groups").contentType(MediaType.APPLICATION_JSON)
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V2_JSON)))
+                .andExpect(MockMvcResultMatchers.status().isOk()).andReturn();
+        Mockito.verify(nAccessControllerV2).getAllAccessGroups(null, null, 0, 10);
+
+        mockMvc.perform(MockMvcRequestBuilders.get("/api/access/all/groups").param("project", project)
+                .param("groupName", groupName).contentType(MediaType.APPLICATION_JSON)
+                .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V2_JSON)))
+                .andExpect(MockMvcResultMatchers.status().isOk()).andReturn();
+
+        Mockito.verify(nAccessControllerV2).getAllAccessGroups(project, groupName, 0, 10);
+
+        List<GrantedAuthority> authorities = new ArrayList<>();
+        ManagedUser user = new ManagedUser("user", "123", false, authorities);
+        Authentication authentication = new TestingAuthenticationToken(user, "user", Constant.ROLE_ADMIN);
+        Mockito.doReturn(Lists.newArrayList(user)).when(userGroupService).getGroupMembersByName(groupName);
+        EnvelopeResponse<OpenAccessGroupResponse> envelopeResponse = nAccessControllerV2.getAllAccessGroups(project,
+                groupName, 0, 10);
+        Assert.assertNotNull(envelopeResponse.getData());
+
+        ProjectInstance projectInstance = NProjectManager.getInstance(getTestConfig()).getProject(project);
+        Mockito.doReturn(Lists.newArrayList(projectInstance)).when(projectService).getReadableProjects(project, true);
+        AccessEntryResponse accessEntryResponse = Mockito.mock(AccessEntryResponse.class);
+        AclEntity aclEntity = Mockito.mock(RootPersistentEntity.class);
+        GrantedAuthoritySid grantedAuthoritySid = Mockito.mock(GrantedAuthoritySid.class);
+        Mockito.doReturn(grantedAuthoritySid).when(accessEntryResponse).getSid();
+        Mockito.doReturn(groupName).when(grantedAuthoritySid).getGrantedAuthority();
+        Mockito.doReturn(aclEntity).when(accessService).getAclEntity(AclEntityType.PROJECT_INSTANCE,
+                projectInstance.getUuid());
+        Mockito.doReturn(Lists.newArrayList(accessEntryResponse)).when(accessService)
+                .generateAceResponsesByFuzzMatching(aclEntity, null, false);
+        EnvelopeResponse<OpenAccessGroupResponse> envelopeResponse1 = nAccessControllerV2.getAllAccessGroups(project,
+                null, 0, 10);
+        Assert.assertNotNull(envelopeResponse1.getData());
+    }
+
 }


[kylin] 09/12: KYLIN-5349 Support project-level configuration of concurrent task limits

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5bca044690dabb78e18e86890d4283bd67b015ae
Author: Hang Jia <75...@qq.com>
AuthorDate: Thu Oct 27 10:57:59 2022 +0800

    KYLIN-5349 Support project-level configuration of concurrent task limits
---
 .../job/impl/threadpool/NDefaultScheduler.java     | 19 ++++--
 .../apache/kylin/job/runners/FetcherRunner.java    | 27 +++++++-
 .../job/impl/threadpool/NDefaultSchedulerTest.java | 79 +++++++++++++++++++++-
 .../kylin/rest/service/ModelBuildService.java      | 17 ++++-
 .../kylin/rest/service/ModelServiceBuildTest.java  | 46 +++++++++++++
 5 files changed, 177 insertions(+), 11 deletions(-)

diff --git a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
index d3efe67bb9..8be23b4c19 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/NDefaultScheduler.java
@@ -32,8 +32,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Setter;
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.transaction.UnitOfWork;
 import org.apache.kylin.common.util.ExecutorServiceUtil;
 import org.apache.kylin.common.util.NamedThreadFactory;
+import org.apache.kylin.common.util.SystemInfoCollector;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
@@ -43,16 +45,17 @@ import org.apache.kylin.job.runners.FetcherRunner;
 import org.apache.kylin.job.runners.JobCheckRunner;
 import org.apache.kylin.job.runners.LicenseCapacityCheckRunner;
 import org.apache.kylin.job.runners.QuotaStorageCheckRunner;
-import org.apache.kylin.common.persistence.transaction.UnitOfWork;
-import org.apache.kylin.common.util.SystemInfoCollector;
-import org.apache.kylin.metadata.epoch.EpochManager;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+import org.apache.kylin.metadata.epoch.EpochManager;
 import lombok.Getter;
 import lombok.SneakyThrows;
 import lombok.val;
@@ -152,7 +155,7 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> {
         //load all executable, set them to a consistent status
         fetcherPool = Executors.newScheduledThreadPool(1,
                 new NamedThreadFactory("FetchJobWorker(project:" + project + ")"));
-        int corePoolSize = jobEngineConfig.getMaxConcurrentJobLimit();
+        int corePoolSize = getMaxConcurrentJobLimitByProject(config, jobEngineConfig, project);
         if (config.getAutoSetConcurrentJob()) {
             val availableMemoryRate = config.getMaxLocalConsumptionRatio();
             synchronized (NDefaultScheduler.class) {
@@ -238,4 +241,12 @@ public class NDefaultScheduler implements Scheduler<AbstractExecutable> {
         return 1.0 * memoryRemaining.availablePermits();
     }
 
+    public int getMaxConcurrentJobLimitByProject(KylinConfig config, JobEngineConfig jobEngineConfig, String project) {
+        ProjectInstance prjInstance = NProjectManager.getInstance(config).getProject(project);
+        if (Strings.isNullOrEmpty(project) || prjInstance == null) {
+            return jobEngineConfig.getMaxConcurrentJobLimit();
+        }
+        return prjInstance.getConfig().getMaxConcurrentJobLimit();
+    }
+
 }
diff --git a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
index ab659c48a8..425a228f74 100644
--- a/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
+++ b/src/core-job/src/main/java/org/apache/kylin/job/runners/FetcherRunner.java
@@ -20,6 +20,7 @@ package org.apache.kylin.job.runners;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kylin.common.KylinConfig;
@@ -103,6 +104,8 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
                 reSchedule = false;
                 return;
             }
+            checkAndUpdateJobPoolNum();
+
             val executableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
             Map<String, Executable> runningJobs = context.getRunningJobs();
 
@@ -236,7 +239,9 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
     }
 
     private boolean isJobPoolFull() {
-        if (context.getRunningJobs().size() >= nDefaultScheduler.getJobEngineConfig().getMaxConcurrentJobLimit()) {
+        int corePoolSize = nDefaultScheduler.getMaxConcurrentJobLimitByProject(context.getConfig(),
+                nDefaultScheduler.getJobEngineConfig(), project);
+        if (context.getRunningJobs().size() >= corePoolSize) {
             logger.warn("There are too many jobs running, Job Fetch will wait until next schedule time.");
             return true;
         }
@@ -246,4 +251,24 @@ public class FetcherRunner extends AbstractDefaultSchedulerRunner {
     void scheduleNext() {
         fetcherPool.schedule(this, 0, TimeUnit.SECONDS);
     }
+
+    private void checkAndUpdateJobPoolNum() {
+        final ThreadPoolExecutor pool = (ThreadPoolExecutor) jobPool;
+        int maximumPoolSize = pool.getMaximumPoolSize();
+        int maxConcurrentJobLimit = nDefaultScheduler.getMaxConcurrentJobLimitByProject(context.getConfig(),
+                nDefaultScheduler.getJobEngineConfig(), project);
+        int activeCount = pool.getActiveCount();
+        if (maximumPoolSize == maxConcurrentJobLimit) {
+            return;
+        }
+        if (maximumPoolSize < maxConcurrentJobLimit) {
+            pool.setCorePoolSize(maxConcurrentJobLimit);
+            pool.setMaximumPoolSize(maxConcurrentJobLimit);
+            return;
+        }
+        if (activeCount <= maxConcurrentJobLimit) {
+            pool.setCorePoolSize(maxConcurrentJobLimit);
+            pool.setMaximumPoolSize(maxConcurrentJobLimit);
+        }
+    }
 }
diff --git a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
index 77990e29ca..f6602d2cd1 100644
--- a/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
+++ b/src/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.java
@@ -71,10 +71,10 @@ import org.apache.kylin.metadata.cube.model.NDataflow;
 import org.apache.kylin.metadata.cube.model.NDataflowManager;
 import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
 import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
-import org.apache.kylin.metadata.epoch.EpochManager;
 import org.apache.kylin.metadata.model.ManagementType;
 import org.apache.kylin.metadata.model.NDataModelManager;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.project.NProjectManager;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.assertj.core.api.Assertions;
 import org.awaitility.core.ConditionTimeoutException;
@@ -93,6 +93,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
+import org.apache.kylin.metadata.epoch.EpochManager;
 import lombok.val;
 import lombok.var;
 
@@ -940,8 +941,7 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest {
         Assert.assertEquals(RealizationStatusEnum.ONLINE, updateDf.getStatus());
     }
 
-    private DefaultExecutable testDataflowStatusWhenJobError(ManagementType tableOriented,
-            JobTypeEnum indexBuild) {
+    private DefaultExecutable testDataflowStatusWhenJobError(ManagementType tableOriented, JobTypeEnum indexBuild) {
         val dfMgr = NDataflowManager.getInstance(getTestConfig(), project);
         val modelMgr = NDataModelManager.getInstance(getTestConfig(), project);
         modelMgr.updateDataModel("89af4ee2-2cdb-4b07-b39e-4c29856309aa", copyForWrite -> {
@@ -2026,4 +2026,77 @@ public class NDefaultSchedulerTest extends BaseSchedulerTest {
             scheduler.getContext().setReachQuotaLimit(false);
         }
     }
+
+    @Test
+    @Repeat(3)
+    public void testProjectConcurrentJobLimit() {
+        String project = "heterogeneous_segment";
+        String modelId = "747f864b-9721-4b97-acde-0aa8e8656cba";
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        config.setProperty("kylin.job.max-concurrent-jobs", "1");
+        config.setProperty("kylin.engine.driver-memory-base", "512");
+
+        val scheduler = NDefaultScheduler.getInstance(project);
+        val originExecutableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), project);
+        val executableManager = Mockito.spy(originExecutableManager);
+        executableManager.deleteAllJob();
+        Mockito.doAnswer(invocation -> {
+            String jobId = invocation.getArgument(0);
+            originExecutableManager.destroyProcess(jobId);
+            return null;
+        }).when(executableManager).destroyProcess(Mockito.anyString());
+
+        scheduler.init(new JobEngineConfig(config));
+        val projectManager = NProjectManager.getInstance(config);
+
+        if (!scheduler.hasStarted()) {
+            throw new RuntimeException("scheduler has not been started");
+        }
+        int memory = NDefaultScheduler.getMemoryRemaining().availablePermits();
+        val df = NDataflowManager.getInstance(getTestConfig(), project).getDataflow(modelId);
+        val job1 = generateJob(df, project);
+        val job2 = generatePartial(df, project);
+        executableManager.addJob(job1);
+        executableManager.addJob(job2);
+        waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager);
+        Assert.assertNotEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits());
+        var runningExecutables = executableManager.getRunningExecutables(project, modelId);
+        runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+        Assert.assertEquals(ExecutableState.READY, runningExecutables.get(1).getStatus());
+
+        projectManager.getProject(project).getConfig().setProperty("kylin.job.max-concurrent-jobs", "2");
+        Assert.assertNotEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits());
+        val job3 = generateJob(df, project);
+        executableManager.addJob(job3);
+        waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager);
+        waitForJobByStatus(job2.getId(), 60000, ExecutableState.RUNNING, executableManager);
+        runningExecutables = executableManager.getRunningExecutables(project, modelId);
+        runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(1).getStatus());
+        Assert.assertEquals(ExecutableState.READY, runningExecutables.get(2).getStatus());
+
+        projectManager.getProject(project).getConfig().setProperty("kylin.job.max-concurrent-jobs", "1");
+        waitForJobByStatus(job1.getId(), 60000, ExecutableState.RUNNING, executableManager);
+        waitForJobByStatus(job2.getId(), 60000, ExecutableState.RUNNING, executableManager);
+
+        runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(1).getStatus());
+        Assert.assertEquals(ExecutableState.READY, runningExecutables.get(2).getStatus());
+
+        waitForJobByStatus(job1.getId(), 60000, null, executableManager);
+        runningExecutables = executableManager.getRunningExecutables(project, modelId);
+        Assert.assertEquals(2, runningExecutables.size());
+        runningExecutables.sort(Comparator.comparing(AbstractExecutable::getCreateTime));
+        Assert.assertEquals(ExecutableState.RUNNING, runningExecutables.get(0).getStatus());
+        Assert.assertEquals(ExecutableState.READY, runningExecutables.get(1).getStatus());
+
+        scheduler.shutdown();
+        Assert.assertEquals(memory, NDefaultScheduler.getMemoryRemaining().availablePermits());
+
+        Assert.assertEquals(1,
+                scheduler.getMaxConcurrentJobLimitByProject(config, scheduler.getJobEngineConfig(), "xxxxx"));
+    }
 }
diff --git a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
index 5548fd30d5..93724c8c26 100644
--- a/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
+++ b/src/data-loading-service/src/main/java/org/apache/kylin/rest/service/ModelBuildService.java
@@ -66,6 +66,8 @@ import org.apache.kylin.metadata.model.SegmentStatusEnumToDisplay;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.util.MultiPartitionUtil;
 import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
+import org.apache.kylin.metadata.project.NProjectManager;
+import org.apache.kylin.metadata.project.ProjectInstance;
 import org.apache.kylin.metadata.sourceusage.SourceUsageManager;
 import org.apache.kylin.query.util.PushDownUtil;
 import org.apache.kylin.rest.aspect.Transaction;
@@ -85,6 +87,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -471,7 +474,7 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil
             String segmentId, Set<Long> partitionIds, int priority, String yarnQueue, Object tag) {
         val jobIds = Lists.<String> newArrayList();
         if (parallelBuild) {
-            checkConcurrentSubmit(partitionIds.size());
+            checkConcurrentSubmit(partitionIds.size(), project);
             partitionIds.forEach(partitionId -> {
                 val jobParam = new JobParam(Sets.newHashSet(segmentId), null, modelId, getUsername(),
                         Sets.newHashSet(partitionId), null).withPriority(priority).withYarnQueue(yarnQueue)
@@ -490,14 +493,22 @@ public class ModelBuildService extends AbstractModelService implements ModelBuil
         return JobInfoResponse.of(jobIds, JobTypeEnum.SUB_PARTITION_BUILD.toString());
     }
 
-    private void checkConcurrentSubmit(int partitionSize) {
-        int runningJobLimit = getConfig().getMaxConcurrentJobLimit();
+    private void checkConcurrentSubmit(int partitionSize, String project) {
+        int runningJobLimit = getMaxConcurrentJobLimitByProject(getConfig(), project);
         int submitJobLimit = runningJobLimit * 5;
         if (partitionSize > submitJobLimit) {
             throw new KylinException(JOB_CONCURRENT_SUBMIT_LIMIT, submitJobLimit);
         }
     }
 
+    public int getMaxConcurrentJobLimitByProject(KylinConfig config, String project) {
+        ProjectInstance prjInstance = NProjectManager.getInstance(config).getProject(project);
+        if (Strings.isNullOrEmpty(project) || prjInstance == null) {
+            return config.getMaxConcurrentJobLimit();
+        }
+        return prjInstance.getConfig().getMaxConcurrentJobLimit();
+    }
+
     @Override
     @Transaction(project = 0)
     public void refreshSegments(String project, String table, String refreshStart, String refreshEnd,
diff --git a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
index 3dad56193f..6c9ea32d8b 100644
--- a/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
+++ b/src/data-loading-service/src/test/java/org/apache/kylin/rest/service/ModelServiceBuildTest.java
@@ -39,6 +39,7 @@ import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
@@ -125,6 +126,7 @@ import org.mockito.Mockito;
 import org.springframework.test.util.ReflectionTestUtils;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
 import lombok.val;
@@ -1647,4 +1649,48 @@ public class ModelServiceBuildTest extends SourceTestCase {
         Assert.assertThrows(KylinException.class, () -> DateFormat.proposeDateFormat("not_exits"));
     }
 
+    @Test
+    public void testGetMaxConcurrentJobLimitByProject() {
+        String project = getProject();
+        val modelId = "b780e4e4-69af-449e-b09f-05c90dfa04b6";
+        val segmentId = "ff839b0b-2c23-4420-b332-0df70e36c343";
+        val buildPartitions = Lists.<String[]> newArrayList();
+        buildPartitions.add(new String[] { "ASIA" });
+        buildPartitions.add(new String[] { "EUROPE" });
+        buildPartitions.add(new String[] { "MIDDLE EAST" });
+        buildPartitions.add(new String[] { "AMERICA" });
+        buildPartitions.add(new String[] { "MOROCCO" });
+        buildPartitions.add(new String[] { "INDONESIA" });
+
+        overwriteSystemProp("kylin.job.max-concurrent-jobs", "1");
+        Assert.assertEquals(1,
+                modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), project));
+        try {
+            modelBuildService.buildSegmentPartitionByValue(getProject(), modelId, segmentId, buildPartitions, true,
+                    false, 0, null, null);
+            Assert.fail();
+        } catch (Exception e) {
+            Assert.assertTrue(e instanceof KylinException);
+            Assert.assertEquals(JOB_CONCURRENT_SUBMIT_LIMIT.getMsg(5), e.getMessage());
+            Assert.assertEquals(0, getRunningExecutables(getProject(), modelId).size());
+        }
+
+        val segmentId2 = "d2edf0c5-5eb2-4968-9ad5-09efbf659324";
+        Map<String, String> testOverrideP = Maps.newLinkedHashMap();
+        testOverrideP.put("kylin.job.max-concurrent-jobs", "2");
+        projectService.updateProjectConfig(project, testOverrideP);
+        Assert.assertEquals(2,
+                modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), project));
+        try {
+            modelBuildService.buildSegmentPartitionByValue(getProject(), modelId, segmentId2, buildPartitions, true,
+                    false, 0, null, null);
+        } catch (Exception e) {
+            Assert.fail();
+        }
+        Assert.assertEquals(6, getRunningExecutables(getProject(), modelId).size());
+
+        Assert.assertEquals(1,
+                modelBuildService.getMaxConcurrentJobLimitByProject(modelBuildService.getConfig(), "xxxxx"));
+    }
+
 }


[kylin] 11/12: KYLIN-5357 fix snyk vulnerabilities

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 236172adece9da6506dd8e2a542dfe02524b95bd
Author: Jiale He <ji...@kyligence.io>
AuthorDate: Tue Nov 8 17:10:35 2022 +0800

    KYLIN-5357 fix snyk vulnerabilities
    
    * KYLIN-5357 upgrade esapi, h2database, spring-security-web
    
    * KYLIN-5357 exclude xalan
---
 pom.xml              | 21 ++++++++++++++-------
 src/kylin-it/pom.xml |  5 -----
 src/server/pom.xml   |  3 +--
 3 files changed, 15 insertions(+), 14 deletions(-)

diff --git a/pom.xml b/pom.xml
index cf05b751a6..c2bea76ab1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -123,7 +123,6 @@
         <log4j2-cachefile-transformer.version>2.14.1</log4j2-cachefile-transformer.version>
         <slf4j.version>1.7.30</slf4j.version>
         <apache-log4j.version>2.12.1</apache-log4j.version>
-        <xalan.version>2.7.2</xalan.version>
         <ehcache.version>2.10.9.2</ehcache.version>
         <redis.version>3.8.0</redis.version>
         <apache-httpclient.version>4.5.13</apache-httpclient.version>
@@ -158,7 +157,7 @@
         <spring-boot-admin.version>2.6.6</spring-boot-admin.version>
         <spring-session.version>2.6.1-kylin-r3</spring-session.version>
         <spring.framework.security.extensions.version>1.0.10.RELEASE</spring.framework.security.extensions.version>
-        <spring-security-web.version>5.6.4</spring-security-web.version>
+        <spring-security-web.version>5.6.9</spring-security-web.version>
         <opensaml.version>2.6.6</opensaml.version>
         <aspectj.version>1.8.9</aspectj.version>
         <unboundid-ldapsdk.version>3.1.1</unboundid-ldapsdk.version>
@@ -1951,6 +1950,11 @@
                 <artifactId>lombok</artifactId>
                 <version>${lombok.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.owasp.esapi</groupId>
+                <artifactId>esapi</artifactId>
+                <version>${esapi.version}</version>
+            </dependency>
 
             <!-- Logging -->
             <dependency>
@@ -2077,11 +2081,6 @@
                 <artifactId>xercesImpl</artifactId>
                 <version>${xerces.version}</version>
             </dependency>
-            <dependency>
-                <groupId>xalan</groupId>
-                <artifactId>xalan</artifactId>
-                <version>${xalan.version}</version>
-            </dependency>
             <dependency>
                 <groupId>com.github.jbellis</groupId>
                 <artifactId>jamm</artifactId>
@@ -2303,6 +2302,14 @@
                         <groupId>org.bouncycastle</groupId>
                         <artifactId>bcprov-jdk15on</artifactId>
                     </exclusion>
+                    <exclusion>
+                        <groupId>xalan</groupId>
+                        <artifactId>xalan</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.springframework.security</groupId>
+                        <artifactId>spring-security-web</artifactId>
+                    </exclusion>
                 </exclusions>
             </dependency>
             <dependency>
diff --git a/src/kylin-it/pom.xml b/src/kylin-it/pom.xml
index 26f612b535..322af3ea24 100644
--- a/src/kylin-it/pom.xml
+++ b/src/kylin-it/pom.xml
@@ -250,11 +250,6 @@
             <artifactId>xercesImpl</artifactId>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>xalan</groupId>
-            <artifactId>xalan</artifactId>
-            <scope>test</scope>
-        </dependency>
         <dependency>
             <groupId>com.h2database</groupId>
             <artifactId>h2</artifactId>
diff --git a/src/server/pom.xml b/src/server/pom.xml
index 92d041bbf9..755e7c6ce4 100644
--- a/src/server/pom.xml
+++ b/src/server/pom.xml
@@ -202,8 +202,7 @@
         <dependency>
             <groupId>com.h2database</groupId>
             <artifactId>h2</artifactId>
-            <version>1.4.197</version>
-            <scope>compile</scope>
+            <scope>test</scope>
         </dependency>
 
         <!-- mysql -->


[kylin] 04/12: KYLIN-5345 remove non existent user/usergroup in recommand advice list

Posted by xx...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4af690015ccb02897ede8a89fa441d3523892fdc
Author: binbin.zheng <bi...@kyligence.io>
AuthorDate: Thu Nov 3 21:17:10 2022 +0800

    KYLIN-5345 remove non existent user/usergroup in recommand advice list
---
 .../src/main/java/org/apache/kylin/rest/constant/Constant.java          | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/core-metadata/src/main/java/org/apache/kylin/rest/constant/Constant.java b/src/core-metadata/src/main/java/org/apache/kylin/rest/constant/Constant.java
index a29c7f9646..927ff939ac 100644
--- a/src/core-metadata/src/main/java/org/apache/kylin/rest/constant/Constant.java
+++ b/src/core-metadata/src/main/java/org/apache/kylin/rest/constant/Constant.java
@@ -30,7 +30,7 @@ public class Constant {
     public final static String FakeCatalogName = "defaultCatalog";
 
     public final static String IDENTITY_USER = "user";
-    public final static String IDENTITY_ROLE = "role";
+    public final static String ADMIN = "ADMIN";
 
     @ThirdPartyDependencies({
             @ThirdPartyDependencies.ThirdPartyDependent(repository = "static-user-manager", classes = {