You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/28 08:06:41 UTC

[kylin] 01/02: KYLIN-5374 support DDL on view

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit db8cb0e9022911c6a907a584b434c9102b51c265
Author: ChenLiang <31...@users.noreply.github.com>
AuthorDate: Sat Oct 29 19:40:21 2022 +0800

    KYLIN-5374 support DDL on view
    
    KYLIN-5374 support DDL on view
---
 pom.xml                                            |   5 +
 .../kylin/rest/controller/NAdminController.java    |   1 +
 src/common-service/pom.xml                         |   4 +
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../kylin/common/exception/ServerErrorCode.java    |   5 +-
 .../org/apache/kylin/common/msg/CnMessage.java     |  35 ++++++
 .../java/org/apache/kylin/common/msg/Message.java  |  37 ++++++
 .../resources/kylin_errorcode_conf_en.properties   |   2 +
 .../resources/kylin_errorcode_conf_zh.properties   |   2 +
 src/datasource-service/pom.xml                     |  13 +-
 .../apache/kylin/rest/request/ViewDDLRequest.java  |  31 +++++
 .../apache/kylin/rest/service/SparkDDLService.java |  75 +++++++++++
 .../apache/kylin/rest/service/SparkDDLTest.java    | 137 +++++++++++++++++++++
 .../spark/sql/common/SparkDDLTestUtils.scala       | 137 +++++++++++++++++++++
 .../kylin/rest/controller/SparkDDLController.java  |  68 ++++++++++
 .../rest/controller/SparkDDLControllerTest.java    |  89 +++++++++++++
 .../spark/source/NSparkMetadataExplorer.java       |  37 ++++++
 src/spark-project/spark-ddl-plugin/pom.xml         |  58 +++++++++
 .../java/org/apache/kylin/spark/ddl/DDLCheck.java  |  35 ++++++
 .../apache/kylin/spark/ddl/DDLCheckContext.java    |  50 ++++++++
 .../apache/kylin/spark/ddl/SourceTableCheck.java   |  76 ++++++++++++
 .../services/org.apache.kylin.spark.ddl.DDLCheck   |   2 +
 .../org/apache/kylin/spark/ddl/ViewCheck.scala     | 123 ++++++++++++++++++
 23 files changed, 1024 insertions(+), 2 deletions(-)

diff --git a/pom.xml b/pom.xml
index 5c1ba4eecc..896608f332 100644
--- a/pom.xml
+++ b/pom.xml
@@ -652,6 +652,11 @@
                 <artifactId>kylin-soft-affinity-cache</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.kylin</groupId>
+                <artifactId>kylin-spark-ddl</artifactId>
+                <version>${project.version}</version>
+            </dependency>
 
             <dependency>
                 <groupId>io.dropwizard.metrics</groupId>
diff --git a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java
index 802b044449..d134bf04b9 100644
--- a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java
+++ b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java
@@ -74,6 +74,7 @@ public class NAdminController extends NBasicController {
         propertyKeys.add("kylin.streaming.enabled");
         propertyKeys.add("kylin.model.measure-name-check-enabled");
         propertyKeys.add("kylin.security.remove-ldap-custom-security-limit-enabled");
+        propertyKeys.add("kylin.source.ddl.enabled");
 
         // add second storage
         if (StringUtils.isNotEmpty(KylinConfig.getInstanceFromEnv().getSecondStorage())) {
diff --git a/src/common-service/pom.xml b/src/common-service/pom.xml
index 10acf44d80..40c94752dc 100644
--- a/src/common-service/pom.xml
+++ b/src/common-service/pom.xml
@@ -50,6 +50,10 @@
             <groupId>org.apache.kylin</groupId>
             <artifactId>kylin-tool</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-spark-ddl</artifactId>
+        </dependency>
 
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e8d777d013..9c4601705f 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3661,4 +3661,8 @@ public abstract class KylinConfigBase implements Serializable {
     public int getSecondStorageWaitLockTimeout() {
         return Integer.parseInt(getOptional("kylin.second-storage.wait-lock-timeout", "180"));
     }
+
+    public boolean getDDLEnabled(){
+        return Boolean.parseBoolean(getOptional("kylin.source.ddl.enabled", FALSE));
+    }
 }
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java
index 856a5da4db..1e6c9722bf 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java
@@ -285,7 +285,10 @@ public enum ServerErrorCode implements ErrorCodeSupplier {
     INVALID_JDBC_SOURCE_CONFIG("KE-010039001"), //
 
     // 10040XXX cache
-    REDIS_CLEAR_ERROR("KE-010040001"); //
+    REDIS_CLEAR_ERROR("KE-010040001"), //
+
+    // 10050XXX SQL DDL
+    DDL_CHECK_ERROR("KE-010050001");
 
     private final ErrorCode errorCode;
 
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
index 3688c2db26..3306e85266 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
@@ -1754,4 +1754,39 @@ public class CnMessage extends Message {
     public String getSecondStorageNodeNotAvailable(String nodeName) {
         return String.format(Locale.ROOT, "分层存储节点'%s'不可用。", nodeName);
     }
+
+    @Override
+    public String getDDLUnSupported() {
+        return "不支持的 DDL 语法,仅支持 `create view`, `drop view`, `alter view`, `show create view` 语法";
+    }
+
+    @Override
+    public String getDDLViewNameError() {
+        return "视图名需要以 KE_ 开头";
+    }
+
+    @Override
+    public String getDDLDropError() {
+        return "仅支持删除 view 类型表且 view 名称需要以 KE_ 开头";
+    }
+
+    @Override
+    public String getDDLTableNotLoad(String table) {
+        return String.format(Locale.ROOT, "'%s' 没有加载到数据源", table);
+    }
+
+    @Override
+    public String getDDLTableNotSupport(String table) {
+        return String.format(Locale.ROOT, "仅支持 hive 数据表,但 '%s' 不是 hive 表", table);
+    }
+
+    @Override
+    public String getDDLPermissionDenied() {
+        return "只有系统或者项目管理员可以进行 DDL 操作";
+    }
+
+    @Override
+    public String getDDLDatabaseAccessnDenied() {
+        return "用户没有视图所在数据库的权限";
+    }
 }
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
index d64b7179c1..365474db28 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
@@ -60,6 +60,15 @@ public class Message {
     private static final String LICENSE_MISMATCH_LICENSE = "The license doesn’t match the current cluster information. Please upload a new license, or contact Kyligence.";
     private static final String LICENSE_NOT_EFFECTIVE = "License is not effective yet, please apply for a new license.";
     private static final String LICENSE_EXPIRED = "The license has expired. Please upload a new license, or contact Kyligence.";
+    private static final String DDL_UNSUPPORTED = "Unsupported DDL syntax, only support single `create view`, `drop "
+        + "view`,  `alter view`, `show create table`";
+    private static final String DDL_VIEW_NAME_ERROR = "View names need to start with KE_";
+    private static final String DDL_DROP_ERROR = "Only support drop view";
+    private static final String DDL_TABLE_NOT_LOADED = "Table '%s' is not loaded into the data source ";
+    private static final String DDL_TABLE_NOT_SUPPORT = "Only support hive table, but '%s' is not hive table";
+    private static final String DDL_PERMISSION_DENIED = "Only Administrator or Project Administrator can do DDL operations";
+    private static final String DDL_DATABASE_ACCESSN_DENIED = "The user does not have the database permission to "
+        + "which the view belongs.";
 
     protected Message() {
 
@@ -1582,4 +1591,32 @@ public class Message {
     public String getSecondStorageNodeNotAvailable(String nodeName) {
         return String.format(Locale.ROOT, "Tiered storage node '%s' not available.", nodeName);
     }
+
+    public String getDDLUnSupported() {
+        return DDL_UNSUPPORTED;
+    }
+
+    public String getDDLViewNameError() {
+        return DDL_VIEW_NAME_ERROR;
+    }
+
+    public String getDDLDropError() {
+        return DDL_DROP_ERROR;
+    }
+
+    public String getDDLTableNotLoad(String table) {
+        return String.format(Locale.ROOT, DDL_TABLE_NOT_LOADED, table);
+    }
+
+    public String getDDLTableNotSupport(String table) {
+        return String.format(Locale.ROOT, DDL_TABLE_NOT_SUPPORT, table);
+    }
+
+    public String getDDLPermissionDenied() {
+        return DDL_PERMISSION_DENIED;
+    }
+
+    public String getDDLDatabaseAccessnDenied() {
+        return DDL_DATABASE_ACCESSN_DENIED;
+    }
 }
diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties
index 47ebee5f8b..ad67f4540f 100644
--- a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties
+++ b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties
@@ -273,3 +273,5 @@ KE-010039001=Invalid Connection Info
 
 # cache
 KE-010040001=Clear Redis Cache Failure
+# SQL DDL
+KE-010050001=DDL Operation Failure
\ No newline at end of file
diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties
index 0e43ba0c5a..ef6f47a33e 100644
--- a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties
+++ b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties
@@ -272,3 +272,5 @@ KE-010039001=连接信息有误
 
 # cache
 KE-010040001=清空Redis缓存失败
+# SQL DDL
+KE-010050001=DDL操作失败
\ No newline at end of file
diff --git a/src/datasource-service/pom.xml b/src/datasource-service/pom.xml
index ec0d9cc85f..69e2d8427b 100644
--- a/src/datasource-service/pom.xml
+++ b/src/datasource-service/pom.xml
@@ -131,5 +131,16 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
 </project>
diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ViewDDLRequest.java b/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ViewDDLRequest.java
new file mode 100644
index 0000000000..37c29aa835
--- /dev/null
+++ b/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ViewDDLRequest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.request;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Data;
+
+@Data
+public class ViewDDLRequest {
+  @JsonProperty("sql")
+  private String sql;
+  @JsonProperty("project")
+  private String project;
+}
diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkDDLService.java b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkDDLService.java
new file mode 100644
index 0000000000..7bce47ad22
--- /dev/null
+++ b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkDDLService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.rest.service;
+
+import static org.apache.kylin.common.exception.ServerErrorCode.DDL_CHECK_ERROR;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ServiceLoader;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.rest.util.AclPermissionUtil;
+import org.apache.kylin.spark.ddl.DDLCheck;
+import org.apache.kylin.spark.ddl.DDLCheckContext;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparderEnv;
+import org.springframework.stereotype.Service;
+
+import com.google.common.collect.Lists;
+
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Service
+public class SparkDDLService extends BasicService {
+
+  private final ServiceLoader<DDLCheck> ddlChecks = ServiceLoader.load(DDLCheck.class);
+
+  public String executeDDLSql(String project, String sql) {
+    if (!KylinConfig.getInstanceFromEnv().getDDLEnabled()) {
+      throw new KylinException(DDL_CHECK_ERROR, "DDL function has not been turned on.");
+    }
+    val groups = getCurrentUserGroups();
+    val context = new DDLCheckContext(sql, project, AclPermissionUtil.getCurrentUsername(),
+        groups);
+    for (DDLCheck checker : ddlChecks) {
+      checker.check(context);
+    }
+    final StringBuilder result = new StringBuilder();
+    List<Row> rows = SparderEnv.getSparkSession().sql(sql).collectAsList();
+    rows.forEach(row -> result.append(row.get(0).toString() + "\n"));
+    return result.toString();
+  }
+
+  public List<List<String>> pluginsDescription(String project) {
+    if (!KylinConfig.getInstanceFromEnv().getDDLEnabled()) {
+      throw new KylinException(DDL_CHECK_ERROR, "DDL function has not been turned on.");
+    }
+    List<String> descriptionEN = Lists.newArrayList();
+    List<String> descriptionCN = Lists.newArrayList();
+    for (DDLCheck checker : ddlChecks) {
+      String[] description = checker.description(project);
+      descriptionEN.addAll(Arrays.asList(description[0].split("\n")));
+      descriptionCN.addAll(Arrays.asList(description[1].split("\n")));
+    }
+    return Lists.newArrayList(descriptionEN, descriptionCN);
+  }
+}
\ No newline at end of file
diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
new file mode 100644
index 0000000000..df4af60863
--- /dev/null
+++ b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.rest.service;
+
+import java.util.List;
+
+import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.common.SparkDDLTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.test.util.ReflectionTestUtils;
+
+public class SparkDDLTest extends NLocalFileMetadataTestCase {
+  @Autowired
+  private final SparkDDLService ddlService = Mockito.spy(new SparkDDLService());
+  @Autowired
+  private final IUserGroupService userGroupService = Mockito.spy(NUserGroupService.class);
+
+  private static final String CREATEVIEW_SQL1 =
+      "CREATE VIEW `ssb`.`ke_order_view` as select LO_ORDERKEY, C_NAME from SSB.P_LINEORDER t1 left join "
+          + "SSB. CUSTOMER t2 on t1. LO_CUSTKEY = t2. C_CUSTKEY";
+  private static final String CREATEVIEW_SQL2 = "CREATE VIEW `ssb`.`order_view2` as select * from SSB.P_LINEORDER";
+  private static final String CREATEVIEW_SQL3 = "CREATE VIEW `ssb`.`order_view2` as abc";
+  private static final String CREATEVIEW_SQL4 = "CREATE VIEW `ssb`.`order_view2` as select * from SSB.unload_table";
+  private static final String CREATEVIEW_SQL5 = "CREATE VIEW `ke_order_view2` as select * from SSB.P_LINEORDER";
+  private static final String CREATEVIEW_SQL6 = "abc";
+  private static final String ALTERVIEW_SQL =
+      "alter view `ssb`.`ke_order_view` as select lo_orderkey from SSB.P_LINEORDER";
+  private static final String DROPVIEW_SQL1 = "drop view `ssb`.`ke_order_view`";
+  private static final String DROPVIEW_SQL2 = "drop table `ssb`.`ke_table1`";
+  private static final String DROPVIEW_SQL3 = "drop table `ssb`.`ke_order_view`";
+  private static final String DROPVIEW_SQL4 = "drop table `ke_table2`";
+  private static final String SHOWVIEW_SQL = "show create table ssb.ke_order_view";
+
+  @AfterClass
+  public static void tearDownResource() {
+    staticCleanupTestMetadata();
+  }
+
+  @Before
+  public void setup() {
+    createTestMetadata();
+    Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN);
+    SecurityContextHolder.getContext().setAuthentication(authentication);
+    ReflectionTestUtils.setField(ddlService, "userGroupService", userGroupService);
+  }
+
+  @After
+  public void cleanup() {
+    cleanupTestMetadata();
+  }
+
+  @Test
+  public void testDDL() {
+    try {
+      assertKylinExeption(
+          () ->
+              ddlService.executeDDLSql("ssb", CREATEVIEW_SQL5),
+          "DDL function has not been turned on.");
+
+      getTestConfig().setProperty("kylin.source.ddl.enabled", "true");
+      NTableMetadataManager tableManager = NTableMetadataManager.getInstance(getTestConfig(), "ssb");
+      SparkDDLTestUtils.prepare();
+      ddlService.executeDDLSql("ssb", CREATEVIEW_SQL5);
+      assertKylinExeption(
+          () ->
+              ddlService.executeDDLSql("ssb", CREATEVIEW_SQL2),
+          MsgPicker.getMsg().getDDLViewNameError());
+      assertKylinExeption(() ->
+          ddlService.executeDDLSql("ssb", CREATEVIEW_SQL3), "");
+      assertKylinExeption(() ->
+          ddlService.executeDDLSql("ssb", CREATEVIEW_SQL6), "");
+      assertKylinExeption(
+          () ->
+              ddlService.executeDDLSql("ssb", CREATEVIEW_SQL4),
+          MsgPicker.getMsg().getDDLTableNotLoad("SSB.unload_table"));
+      assertKylinExeption(
+          () ->
+              ddlService.executeDDLSql("ssb", DROPVIEW_SQL2),
+          MsgPicker.getMsg().getDDLDropError());
+      assertKylinExeption(
+          () ->
+              ddlService.executeDDLSql("ssb", DROPVIEW_SQL3), "");
+
+      ddlService.executeDDLSql("ssb", CREATEVIEW_SQL1);
+      ddlService.executeDDLSql("ssb", ALTERVIEW_SQL);
+      String createViewSQL = ddlService.executeDDLSql("ssb", SHOWVIEW_SQL);
+      Assert.assertTrue(createViewSQL.contains("ke_order_view"));
+      ddlService.executeDDLSql("ssb", DROPVIEW_SQL1);
+
+      Authentication authentication = new TestingAuthenticationToken("USER1",
+          "", Constant.GROUP_ALL_USERS);
+      SecurityContextHolder.getContext().setAuthentication(authentication);
+      assertKylinExeption(
+          () ->
+              ddlService.executeDDLSql("ssb", CREATEVIEW_SQL1),
+          MsgPicker.getMsg().getDDLPermissionDenied());
+
+      // ddl description
+      List<List<String>> description = ddlService.pluginsDescription("ssb");
+      Assert.assertTrue(description.size() > 0);
+    } finally {
+      SparkSession spark = SparderEnv.getSparkSession();
+      if (spark != null && !spark.sparkContext().isStopped()) {
+        spark.stop();
+      }
+    }
+  }
+}
diff --git a/src/datasource-service/src/test/scala/org/apache/spark/sql/common/SparkDDLTestUtils.scala b/src/datasource-service/src/test/scala/org/apache/spark/sql/common/SparkDDLTestUtils.scala
new file mode 100644
index 0000000000..22af162c2a
--- /dev/null
+++ b/src/datasource-service/src/test/scala/org/apache/spark/sql/common/SparkDDLTestUtils.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.common
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.kylin.common.util.RandomUtil
+
+import org.apache.spark.sql.{SparderEnv, SparkSession}
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.util.Utils
+
+object SparkDDLTestUtils {
+
+  def prepare(): Unit = {
+    val conf = new SparkConf(false)
+    val warehouse = s"./spark-warehouse/${RandomUtil.randomUUIDStr()}";
+    val file = new File(warehouse)
+    if (file.exists()) {
+      FileUtils.deleteDirectory(file)
+    }
+    conf.set(StaticSQLConf.WAREHOUSE_PATH.key, warehouse)
+    // Copied from TestHive
+    // HDFS root scratch dir requires the write all (733) permission. For each connecting user,
+    // an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with
+    // ${hive.scratch.dir.permission}. To resolve the permission issue, the simplest way is to
+    // delete it. Later, it will be re-created with the right permission.
+    val scratchDir = Utils.createTempDir()
+    if (scratchDir.exists()) {
+      FileUtils.deleteDirectory(scratchDir)
+    }
+    conf.set(ConfVars.SCRATCHDIR.varname, scratchDir.toString)
+    conf.set("spark.hadoop.javax.jdo.option.ConnectionURL", "jdbc:derby:memory:db;create=true")
+    val sparkSession = SparkSession.builder
+      .master("local[2]")
+      .appName(getClass.getSimpleName)
+      .config("fs.file.impl", classOf[DebugFilesystem].getCanonicalName)
+      .config(conf)
+      .enableHiveSupport()
+      .getOrCreate
+    SparderEnv.setSparkSession(sparkSession)
+    prepareTable
+  }
+
+  def prepareTable(): Unit = {
+    val spark = SparderEnv.getSparkSession
+    spark.sql("CREATE DATABASE if not exists SSB")
+    spark.sql("drop table if exists `ssb`.`lineorder`")
+    spark.sql(
+      s"""
+         |CREATE TABLE if not exists `ssb`.`lineorder`(
+         |  `lo_orderkey` bigint,
+         |  `lo_linenumber` bigint,
+         |  `lo_custkey` int,
+         |  `lo_partkey` int,
+         |  `lo_suppkey` int,
+         |  `lo_orderdate` int,
+         |  `lo_orderpriotity` string,
+         |  `lo_shippriotity` int,
+         |  `lo_quantity` bigint,
+         |  `lo_extendedprice` bigint,
+         |  `lo_ordtotalprice` bigint,
+         |  `lo_discount` bigint,
+         |  `lo_revenue` bigint,
+         |  `lo_supplycost` bigint,
+         |  `lo_tax` bigint,
+         |  `lo_commitdate` int,
+         |  `lo_shipmode` string)""".stripMargin)
+    spark.sql("drop view if exists `ssb`.p_lineorder")
+    spark.sql(
+      s"""
+         |CREATE VIEW if not exists `ssb`.`p_lineorder` AS SELECT
+         |  `lineorder`.`lo_orderkey`,
+         |  `lineorder`.`lo_linenumber`,
+         |  `lineorder`.`lo_custkey`,
+         |  `lineorder`.`lo_partkey`,
+         |  `lineorder`.`lo_suppkey`,
+         |  `lineorder`.`lo_orderdate`,
+         |  `lineorder`.`lo_orderpriotity`,
+         |  `lineorder`.`lo_shippriotity`,
+         |  `lineorder`.`lo_quantity`,
+         |  `lineorder`.`lo_extendedprice`,
+         |  `lineorder`.`lo_ordtotalprice`,
+         |  `lineorder`.`lo_discount`,
+         |  `lineorder`.`lo_revenue`,
+         |  `lineorder`.`lo_supplycost`,
+         |  `lineorder`.`lo_tax`,
+         |  `lineorder`.`lo_commitdate`,
+         |  `lineorder`.`lo_shipmode`,
+         |  `lineorder`.`lo_extendedprice`*`lineorder`.`lo_discount` AS `V_REVENUE`
+         |FROM `ssb`.`LINEORDER`
+       """.stripMargin)
+    spark.sql("drop table if exists `ssb`.`customer`")
+    spark.sql(
+      s"""
+         |CREATE TABLE if not exists `ssb`.`customer`(
+         |  `c_custkey` int,
+         |  `c_name` string,
+         |  `c_address` string,
+         |  `c_city` string,
+         |  `c_nation` string,
+         |  `c_region` string,
+         |  `c_phone` string,
+         |  `c_mktsegment` string)
+       """.stripMargin)
+    spark.sql(
+      s"""
+         |CREATE TABLE if not exists `ssb`.`unload_table`(
+         |  `c1` int,
+         |  `c2` string)
+       """.stripMargin)
+    spark.sql(
+      s"""
+         |CREATE TABLE if not exists `ssb`.`ke_table1`(
+         |  `c1` int,
+         |  `c2` string)
+       """.stripMargin)
+  }
+}
diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/SparkDDLController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/SparkDDLController.java
new file mode 100644
index 0000000000..38c2e9d003
--- /dev/null
+++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/SparkDDLController.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.rest.controller;
+
+import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
+import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON;
+
+import java.util.List;
+
+import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.rest.request.ViewDDLRequest;
+import org.apache.kylin.rest.response.EnvelopeResponse;
+import org.apache.kylin.rest.service.SparkDDLService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import io.swagger.annotations.ApiOperation;
+import lombok.extern.slf4j.Slf4j;
+
+@RestController
+@RequestMapping(value = "/api/spark_source", produces = {HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON,
+                                                         HTTP_VND_APACHE_KYLIN_JSON})
+@Slf4j
+public class SparkDDLController extends NBasicController {
+
+  @Autowired
+  private SparkDDLService sparkDDLService;
+
+  @ApiOperation(value = "ddl")
+  @PostMapping(value = "/ddl")
+  @ResponseBody
+  public EnvelopeResponse<String> executeSQL(@RequestBody ViewDDLRequest request)
+      throws Exception {
+    checkProjectName(request.getProject());
+    String result = sparkDDLService.executeDDLSql(request.getProject(), request.getSql());
+    return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, result, "");
+  }
+
+  @ApiOperation(value = "ddl_description")
+  @GetMapping(value = "/ddl/description")
+  @ResponseBody
+  public EnvelopeResponse<List<List<String>>> description(@RequestParam("project") String project) {
+    checkProjectName(project);
+    return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
+        sparkDDLService.pluginsDescription(project), "");
+  }
+}
diff --git a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/SparkDDLControllerTest.java b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/SparkDDLControllerTest.java
new file mode 100644
index 0000000000..f7d9938989
--- /dev/null
+++ b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/SparkDDLControllerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.controller;
+
+import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.request.ViewDDLRequest;
+import org.apache.kylin.rest.service.SparkDDLService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.springframework.http.MediaType;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
+import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
+import org.springframework.test.web.servlet.setup.MockMvcBuilders;
+
+public class SparkDDLControllerTest extends NLocalFileMetadataTestCase {
+  private MockMvc mockMvc;
+
+  @Mock
+  private SparkDDLService sparkDDLService;
+
+  @InjectMocks
+  private SparkDDLController ddlController = Mockito.spy(new SparkDDLController());
+
+  private final Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN);
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    mockMvc = MockMvcBuilders.standaloneSetup(ddlController)
+        .defaultRequest(MockMvcRequestBuilders.get("/")).build();
+    SecurityContextHolder.getContext().setAuthentication(authentication);
+    overwriteSystemProp("HADOOP_USER_NAME", "root");
+    createTestMetadata();
+  }
+
+  @After
+  public void tearDown() {
+    cleanupTestMetadata();
+  }
+
+  @Test
+  public void testExecuteSQL() throws Exception {
+    ViewDDLRequest request = new ViewDDLRequest();
+    request.setProject("ssb");
+
+    mockMvc.perform(MockMvcRequestBuilders.post("/api/spark_source/ddl")
+        .contentType(MediaType.APPLICATION_JSON)
+        .content(JsonUtil.writeValueAsString(request))
+        .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON)))
+        .andExpect(MockMvcResultMatchers.status().isOk()).andReturn();
+  }
+
+  @Test
+  public void testDescription() throws Exception {
+    mockMvc.perform(MockMvcRequestBuilders.get("/api/spark_source/ddl/description?project=ssb")
+        .contentType(MediaType.APPLICATION_JSON)
+        .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON)))
+        .andExpect(MockMvcResultMatchers.status().isOk()).andReturn();
+  }
+}
\ No newline at end of file
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
index fdc55553ad..b99b9727f7 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
@@ -28,8 +28,10 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.kylin.common.KapConfig;
 import org.apache.kylin.common.KylinConfig;
@@ -165,6 +167,41 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD
         return isAccess;
     }
 
+    public boolean checkDatabaseHadoopAccessFast(String database) throws Exception {
+        boolean isAccess = true;
+        val spark = SparderEnv.getSparkSession();
+        try {
+            String databaseLocation = spark.catalog().getDatabase(database).locationUri();
+            RemoteIterator<FileStatus> tablesIterator = getFilesIterator(databaseLocation);
+            if (tablesIterator.hasNext()) {
+                Path tablePath = tablesIterator.next().getPath();
+                getFilesIterator(tablePath.toString());
+            }
+        } catch (Exception e) {
+            isAccess = false;
+            try {
+                logger.error("Read hive database {} error:{}, ugi name: {}.", database, e.getMessage(),
+                    UserGroupInformation.getCurrentUser().getUserName());
+            } catch (IOException ex) {
+                logger.error("fetch user curr ugi info error.", e);
+            }
+        }
+        return isAccess;
+    }
+
+    private RemoteIterator<FileStatus> getFilesIterator(String location) throws IOException {
+        String hiveSpecFsLocation = SparderEnv.getSparkSession().sessionState().conf()
+            .getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION());
+        FileSystem fs = null == hiveSpecFsLocation ? HadoopUtil.getWorkingFileSystem()
+            : HadoopUtil.getFileSystem(hiveSpecFsLocation);
+        if (location.startsWith(fs.getScheme()) || location.startsWith("/")) {
+            fs.listStatus(new Path(location));
+            return fs.listStatusIterator(new Path(location));
+        } else {
+            return HadoopUtil.getFileSystem(location).listStatusIterator(new Path(location));
+        }
+    }
+
     @Override
     public Pair<TableDesc, TableExtDesc> loadTableMetadata(final String database, String tableName, String prj)
             throws Exception {
diff --git a/src/spark-project/spark-ddl-plugin/pom.xml b/src/spark-project/spark-ddl-plugin/pom.xml
new file mode 100644
index 0000000000..9df7219174
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>kylin</artifactId>
+        <groupId>org.apache.kylin</groupId>
+        <version>5.0.0-SNAPSHOT</version>
+        <relativePath>../../../pom.xml</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kylin-spark-ddl</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-systools</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-core-metadata</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-sparder</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.12</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.junit.vintage</groupId>
+            <artifactId>junit-vintage-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kylin</groupId>
+            <artifactId>kylin-engine-spark</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+        </dependency>
+
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheck.java b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheck.java
new file mode 100644
index 0000000000..10e3e3439c
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheck.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.spark.ddl;
+
+import static org.apache.kylin.common.exception.ServerErrorCode.DDL_CHECK_ERROR;
+
+import org.apache.kylin.common.exception.KylinException;
+
+public interface DDLCheck {
+
+  default String[] description(String project) {
+    return new String[] {"", ""};
+  }
+
+  void check(DDLCheckContext context);
+
+  default void throwException(String msg) {
+    throw new KylinException(DDL_CHECK_ERROR, msg);
+  }
+}
diff --git a/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheckContext.java b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheckContext.java
new file mode 100644
index 0000000000..e5e6d1819f
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheckContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.spark.ddl;
+
+import java.util.Set;
+
+public class DDLCheckContext {
+  private String sql;
+  private String project;
+  private String userName;
+  private Set<String> groups;
+
+  public DDLCheckContext(String sql, String project, String userName, Set<String> groups) {
+    this.sql = sql;
+    this.project = project;
+    this.userName = userName;
+    this.groups = groups;
+  }
+
+  public String getSql() {
+    return sql;
+  }
+
+  public String getProject() {
+    return project;
+  }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public Set<String> getGroups() {
+    return groups;
+  }
+}
diff --git a/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/SourceTableCheck.java b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/SourceTableCheck.java
new file mode 100644
index 0000000000..c3c54623e5
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/SourceTableCheck.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.spark.ddl;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.rest.util.AclPermissionUtil;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+
+import lombok.val;
+import scala.collection.Seq;
+
+public class SourceTableCheck implements DDLCheck {
+
+  @Override
+  public String[] description(String project) {
+    return new String[] {
+        "The source table used to define the view needs to be loaded into the data source already",
+        "定义 view 用到的来源表需要已经加载到数据源"
+    };
+  }
+
+  @Override
+  public void check(DDLCheckContext context) {
+    val spark = SparderEnv.getSparkSession();
+    LogicalPlan logicalPlan = null;
+    try {
+      logicalPlan = spark.sessionState().sqlParser().parsePlan(context.getSql());
+    } catch (Throwable t) {
+      throwException(t.getMessage());
+    }
+    val tableManager = NTableMetadataManager.getInstance(
+        KylinConfig.getInstanceFromEnv(),
+        context.getProject());
+    if (!AclPermissionUtil.hasProjectAdminPermission(context.getProject(), context.getGroups())) {
+      throwException(MsgPicker.getMsg().getDDLPermissionDenied());
+    }
+    Seq<LogicalPlan> relationLeaves = logicalPlan.collectLeaves();
+    if (relationLeaves == null) {
+      return;
+    }
+    for (LogicalPlan plan : scala.collection.JavaConverters.seqAsJavaListConverter(relationLeaves).asJava()) {
+      if (plan instanceof UnresolvedRelation) {
+        val tableName = ((UnresolvedRelation) plan).tableName();
+        TableDesc tableDesc = tableManager.getTableDesc(tableName);
+        if (tableDesc == null) {
+          throwException(MsgPicker.getMsg().getDDLTableNotLoad(tableName));
+        }
+        if (ISourceAware.ID_HIVE != tableDesc.getSourceType()
+            && ISourceAware.ID_SPARK != tableDesc.getSourceType()) {
+          throwException(MsgPicker.getMsg().getDDLTableNotSupport(tableName));
+        }
+      }
+    }
+  }
+}
diff --git a/src/spark-project/spark-ddl-plugin/src/main/resources/META-INF/services/org.apache.kylin.spark.ddl.DDLCheck b/src/spark-project/spark-ddl-plugin/src/main/resources/META-INF/services/org.apache.kylin.spark.ddl.DDLCheck
new file mode 100644
index 0000000000..4f3a7527a8
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/src/main/resources/META-INF/services/org.apache.kylin.spark.ddl.DDLCheck
@@ -0,0 +1,2 @@
+org.apache.kylin.spark.ddl.SourceTableCheck
+org.apache.kylin.spark.ddl.ViewCheck
\ No newline at end of file
diff --git a/src/spark-project/spark-ddl-plugin/src/main/scala/org/apache/kylin/spark/ddl/ViewCheck.scala b/src/spark-project/spark-ddl-plugin/src/main/scala/org/apache/kylin/spark/ddl/ViewCheck.scala
new file mode 100644
index 0000000000..4fc451bdf2
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/src/main/scala/org/apache/kylin/spark/ddl/ViewCheck.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.spark.ddl
+
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.kylin.common.msg.MsgPicker
+import org.apache.kylin.engine.spark.source.NSparkMetadataExplorer
+import org.apache.kylin.rest.security.KerberosLoginManager
+import org.slf4j.LoggerFactory
+
+import org.apache.spark.sql.SparderEnv
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.{CommandExecutionMode, CommandResultExec, SparkPlan}
+import org.apache.spark.sql.execution.command._
+
+class ViewCheck extends DDLCheck {
+  private val log = LoggerFactory.getLogger(classOf[ViewCheck])
+  private val PREFIX = "KE_"
+  private val source = new NSparkMetadataExplorer
+
+  override def description(project: String): Array[String] = {
+    val databasesHasAccess = listAllDatabasesHasAccess(project)
+    Array(
+      "View name should start with `KE_`\n"
+        + "Only support `create view`,`alter view`,`drop view`,`show create table` syntax\n"
+        + s"Only supports creating views in ${databasesHasAccess}",
+      "View 名称需要以`KE_`开头\n"
+        + "仅支持 `create view`, `drop view`, `alter view`, `show create table` 语法\n"
+        + s"仅支持在 ${databasesHasAccess} 上述 database 中创建 view")
+  }
+
+  override def check(context: DDLCheckContext): Unit = {
+    log.info("start checking DDL view name")
+    val sql = context.getSql
+    val project = context.getProject
+    val spark = SparderEnv.getSparkSession
+    var plan: SparkPlan = null
+    try {
+      val logicalPlan = spark.sessionState.sqlParser.parsePlan(sql)
+      plan = stripRootCommandResult(spark.sessionState.executePlan(
+        logicalPlan, CommandExecutionMode.SKIP).executedPlan)
+    } catch {
+      case e: Exception => throwException(e.getMessage)
+    }
+    plan match {
+      case ExecutedCommandExec(view: CreateViewCommand) =>
+        checkTableName(view.name)
+        checkAccess(view.name, project)
+      case ExecutedCommandExec(view: ShowCreateTableCommand) =>
+        checkTableName(view.table)
+        checkAccess(view.table, project)
+      case ExecutedCommandExec(table: DropTableCommand) =>
+        checkTableName(table.tableName)
+        checkAccess(table.tableName, project)
+        if (!table.isView) {
+          throwException(MsgPicker.getMsg.getDDLDropError)
+        }
+      case ExecutedCommandExec(table: AlterViewAsCommand) =>
+        checkTableName(table.name)
+        checkAccess(table.name, project)
+      case _ => throwException(MsgPicker.getMsg.getDDLUnSupported)
+    }
+  }
+
+  private def checkTableName(identifier: TableIdentifier): Unit = {
+    if (!identifier.table.toUpperCase().startsWith(PREFIX)) {
+      throwException(MsgPicker.getMsg.getDDLViewNameError)
+    }
+  }
+
+  def checkAccess(identifier: TableIdentifier, project: String): Unit = {
+    val database = identifier.database.get
+    val ugi = KerberosLoginManager.getInstance.getProjectUGI(project)
+    val hasDatabaseAccess = ugi.doAs(new PrivilegedExceptionAction[Boolean]() {
+      override def run(): Boolean = {
+        source.checkDatabaseHadoopAccessFast(database)
+      }
+    })
+    if (!hasDatabaseAccess) {
+      throwException(MsgPicker.getMsg.getDDLDatabaseAccessnDenied)
+    }
+  }
+
+  def listAllDatabasesHasAccess(project: String): String = {
+    val ugi = KerberosLoginManager.getInstance.getProjectUGI(project)
+    val databasesHasAccess = ugi.doAs(new PrivilegedExceptionAction[List[String]]() {
+      override def run(): List[String] = {
+        val databases = source.listDatabases()
+        val databasesHasAccess = ListBuffer[String]()
+        databases.forEach(db => {
+          if (source.checkDatabaseHadoopAccessFast(db)) {
+            databasesHasAccess.append(db)
+          }
+        })
+        databasesHasAccess.toList
+      }
+    })
+    databasesHasAccess.mkString(",")
+  }
+
+  private def stripRootCommandResult(executedPlan: SparkPlan) = executedPlan match {
+    case CommandResultExec(_, plan, _) => plan
+    case other => other
+  }
+}