You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/12/28 08:06:41 UTC
[kylin] 01/02: KYLIN-5374 support DDL on view
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit db8cb0e9022911c6a907a584b434c9102b51c265
Author: ChenLiang <31...@users.noreply.github.com>
AuthorDate: Sat Oct 29 19:40:21 2022 +0800
KYLIN-5374 support DDL on view
KYLIN-5374 support DDL on view
---
pom.xml | 5 +
.../kylin/rest/controller/NAdminController.java | 1 +
src/common-service/pom.xml | 4 +
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../kylin/common/exception/ServerErrorCode.java | 5 +-
.../org/apache/kylin/common/msg/CnMessage.java | 35 ++++++
.../java/org/apache/kylin/common/msg/Message.java | 37 ++++++
.../resources/kylin_errorcode_conf_en.properties | 2 +
.../resources/kylin_errorcode_conf_zh.properties | 2 +
src/datasource-service/pom.xml | 13 +-
.../apache/kylin/rest/request/ViewDDLRequest.java | 31 +++++
.../apache/kylin/rest/service/SparkDDLService.java | 75 +++++++++++
.../apache/kylin/rest/service/SparkDDLTest.java | 137 +++++++++++++++++++++
.../spark/sql/common/SparkDDLTestUtils.scala | 137 +++++++++++++++++++++
.../kylin/rest/controller/SparkDDLController.java | 68 ++++++++++
.../rest/controller/SparkDDLControllerTest.java | 89 +++++++++++++
.../spark/source/NSparkMetadataExplorer.java | 37 ++++++
src/spark-project/spark-ddl-plugin/pom.xml | 58 +++++++++
.../java/org/apache/kylin/spark/ddl/DDLCheck.java | 35 ++++++
.../apache/kylin/spark/ddl/DDLCheckContext.java | 50 ++++++++
.../apache/kylin/spark/ddl/SourceTableCheck.java | 76 ++++++++++++
.../services/org.apache.kylin.spark.ddl.DDLCheck | 2 +
.../org/apache/kylin/spark/ddl/ViewCheck.scala | 123 ++++++++++++++++++
23 files changed, 1024 insertions(+), 2 deletions(-)
diff --git a/pom.xml b/pom.xml
index 5c1ba4eecc..896608f332 100644
--- a/pom.xml
+++ b/pom.xml
@@ -652,6 +652,11 @@
<artifactId>kylin-soft-affinity-cache</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-spark-ddl</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
diff --git a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java
index 802b044449..d134bf04b9 100644
--- a/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java
+++ b/src/common-server/src/main/java/org/apache/kylin/rest/controller/NAdminController.java
@@ -74,6 +74,7 @@ public class NAdminController extends NBasicController {
propertyKeys.add("kylin.streaming.enabled");
propertyKeys.add("kylin.model.measure-name-check-enabled");
propertyKeys.add("kylin.security.remove-ldap-custom-security-limit-enabled");
+ propertyKeys.add("kylin.source.ddl.enabled");
// add second storage
if (StringUtils.isNotEmpty(KylinConfig.getInstanceFromEnv().getSecondStorage())) {
diff --git a/src/common-service/pom.xml b/src/common-service/pom.xml
index 10acf44d80..40c94752dc 100644
--- a/src/common-service/pom.xml
+++ b/src/common-service/pom.xml
@@ -50,6 +50,10 @@
<groupId>org.apache.kylin</groupId>
<artifactId>kylin-tool</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-spark-ddl</artifactId>
+ </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e8d777d013..9c4601705f 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -3661,4 +3661,8 @@ public abstract class KylinConfigBase implements Serializable {
public int getSecondStorageWaitLockTimeout() {
return Integer.parseInt(getOptional("kylin.second-storage.wait-lock-timeout", "180"));
}
+
+ public boolean getDDLEnabled(){
+ return Boolean.parseBoolean(getOptional("kylin.source.ddl.enabled", FALSE));
+ }
}
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java b/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java
index 856a5da4db..1e6c9722bf 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/exception/ServerErrorCode.java
@@ -285,7 +285,10 @@ public enum ServerErrorCode implements ErrorCodeSupplier {
INVALID_JDBC_SOURCE_CONFIG("KE-010039001"), //
// 10040XXX cache
- REDIS_CLEAR_ERROR("KE-010040001"); //
+ REDIS_CLEAR_ERROR("KE-010040001"), //
+
+ // 10050XXX SQL DDL
+ DDL_CHECK_ERROR("KE-010050001");
private final ErrorCode errorCode;
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
index 3688c2db26..3306e85266 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/CnMessage.java
@@ -1754,4 +1754,39 @@ public class CnMessage extends Message {
public String getSecondStorageNodeNotAvailable(String nodeName) {
return String.format(Locale.ROOT, "分层存储节点'%s'不可用。", nodeName);
}
+
+ @Override
+ public String getDDLUnSupported() {
+ return "不支持的 DDL 语法,仅支持 `create view`, `drop view`, `alter view`, `show create view` 语法";
+ }
+
+ @Override
+ public String getDDLViewNameError() {
+ return "视图名需要以 KE_ 开头";
+ }
+
+ @Override
+ public String getDDLDropError() {
+ return "仅支持删除 view 类型表且 view 名称需要以 KE_ 开头";
+ }
+
+ @Override
+ public String getDDLTableNotLoad(String table) {
+ return String.format(Locale.ROOT, "'%s' 没有加载到数据源", table);
+ }
+
+ @Override
+ public String getDDLTableNotSupport(String table) {
+ return String.format(Locale.ROOT, "仅支持 hive 数据表,但 '%s' 不是 hive 表", table);
+ }
+
+ @Override
+ public String getDDLPermissionDenied() {
+ return "只有系统或者项目管理员可以进行 DDL 操作";
+ }
+
+ @Override
+ public String getDDLDatabaseAccessnDenied() {
+ return "用户没有视图所在数据库的权限";
+ }
}
diff --git a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
index d64b7179c1..365474db28 100644
--- a/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
+++ b/src/core-common/src/main/java/org/apache/kylin/common/msg/Message.java
@@ -60,6 +60,15 @@ public class Message {
private static final String LICENSE_MISMATCH_LICENSE = "The license doesn’t match the current cluster information. Please upload a new license, or contact Kyligence.";
private static final String LICENSE_NOT_EFFECTIVE = "License is not effective yet, please apply for a new license.";
private static final String LICENSE_EXPIRED = "The license has expired. Please upload a new license, or contact Kyligence.";
+ private static final String DDL_UNSUPPORTED = "Unsupported DDL syntax, only support single `create view`, `drop "
+ + "view`, `alter view`, `show create table`";
+ private static final String DDL_VIEW_NAME_ERROR = "View names need to start with KE_";
+ private static final String DDL_DROP_ERROR = "Only support drop view";
+ private static final String DDL_TABLE_NOT_LOADED = "Table '%s' is not loaded into the data source ";
+ private static final String DDL_TABLE_NOT_SUPPORT = "Only support hive table, but '%s' is not hive table";
+ private static final String DDL_PERMISSION_DENIED = "Only Administrator or Project Administrator can do DDL operations";
+ private static final String DDL_DATABASE_ACCESSN_DENIED = "The user does not have the database permission to "
+ + "which the view belongs.";
protected Message() {
@@ -1582,4 +1591,32 @@ public class Message {
public String getSecondStorageNodeNotAvailable(String nodeName) {
return String.format(Locale.ROOT, "Tiered storage node '%s' not available.", nodeName);
}
+
+ public String getDDLUnSupported() {
+ return DDL_UNSUPPORTED;
+ }
+
+ public String getDDLViewNameError() {
+ return DDL_VIEW_NAME_ERROR;
+ }
+
+ public String getDDLDropError() {
+ return DDL_DROP_ERROR;
+ }
+
+ public String getDDLTableNotLoad(String table) {
+ return String.format(Locale.ROOT, DDL_TABLE_NOT_LOADED, table);
+ }
+
+ public String getDDLTableNotSupport(String table) {
+ return String.format(Locale.ROOT, DDL_TABLE_NOT_SUPPORT, table);
+ }
+
+ public String getDDLPermissionDenied() {
+ return DDL_PERMISSION_DENIED;
+ }
+
+ public String getDDLDatabaseAccessnDenied() {
+ return DDL_DATABASE_ACCESSN_DENIED;
+ }
}
diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties
index 47ebee5f8b..ad67f4540f 100644
--- a/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties
+++ b/src/core-common/src/main/resources/kylin_errorcode_conf_en.properties
@@ -273,3 +273,5 @@ KE-010039001=Invalid Connection Info
# cache
KE-010040001=Clear Redis Cache Failure
+# SQL DDL
+KE-010050001=DDL Operation Failure
\ No newline at end of file
diff --git a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties
index 0e43ba0c5a..ef6f47a33e 100644
--- a/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties
+++ b/src/core-common/src/main/resources/kylin_errorcode_conf_zh.properties
@@ -272,3 +272,5 @@ KE-010039001=连接信息有误
# cache
KE-010040001=清空Redis缓存失败
+# SQL DDL
+KE-010050001=DDL操作失败
\ No newline at end of file
diff --git a/src/datasource-service/pom.xml b/src/datasource-service/pom.xml
index ec0d9cc85f..69e2d8427b 100644
--- a/src/datasource-service/pom.xml
+++ b/src/datasource-service/pom.xml
@@ -131,5 +131,16 @@
<scope>test</scope>
</dependency>
</dependencies>
-
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ViewDDLRequest.java b/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ViewDDLRequest.java
new file mode 100644
index 0000000000..37c29aa835
--- /dev/null
+++ b/src/datasource-service/src/main/java/org/apache/kylin/rest/request/ViewDDLRequest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.request;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Data;
+
+@Data
+public class ViewDDLRequest {
+ @JsonProperty("sql")
+ private String sql;
+ @JsonProperty("project")
+ private String project;
+}
diff --git a/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkDDLService.java b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkDDLService.java
new file mode 100644
index 0000000000..7bce47ad22
--- /dev/null
+++ b/src/datasource-service/src/main/java/org/apache/kylin/rest/service/SparkDDLService.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.rest.service;
+
+import static org.apache.kylin.common.exception.ServerErrorCode.DDL_CHECK_ERROR;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.ServiceLoader;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.rest.util.AclPermissionUtil;
+import org.apache.kylin.spark.ddl.DDLCheck;
+import org.apache.kylin.spark.ddl.DDLCheckContext;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparderEnv;
+import org.springframework.stereotype.Service;
+
+import com.google.common.collect.Lists;
+
+import lombok.val;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+@Service
+public class SparkDDLService extends BasicService {
+
+ private final ServiceLoader<DDLCheck> ddlChecks = ServiceLoader.load(DDLCheck.class);
+
+ public String executeDDLSql(String project, String sql) {
+ if (!KylinConfig.getInstanceFromEnv().getDDLEnabled()) {
+ throw new KylinException(DDL_CHECK_ERROR, "DDL function has not been turned on.");
+ }
+ val groups = getCurrentUserGroups();
+ val context = new DDLCheckContext(sql, project, AclPermissionUtil.getCurrentUsername(),
+ groups);
+ for (DDLCheck checker : ddlChecks) {
+ checker.check(context);
+ }
+ final StringBuilder result = new StringBuilder();
+ List<Row> rows = SparderEnv.getSparkSession().sql(sql).collectAsList();
+ rows.forEach(row -> result.append(row.get(0).toString() + "\n"));
+ return result.toString();
+ }
+
+ public List<List<String>> pluginsDescription(String project) {
+ if (!KylinConfig.getInstanceFromEnv().getDDLEnabled()) {
+ throw new KylinException(DDL_CHECK_ERROR, "DDL function has not been turned on.");
+ }
+ List<String> descriptionEN = Lists.newArrayList();
+ List<String> descriptionCN = Lists.newArrayList();
+ for (DDLCheck checker : ddlChecks) {
+ String[] description = checker.description(project);
+ descriptionEN.addAll(Arrays.asList(description[0].split("\n")));
+ descriptionCN.addAll(Arrays.asList(description[1].split("\n")));
+ }
+ return Lists.newArrayList(descriptionEN, descriptionCN);
+ }
+}
\ No newline at end of file
diff --git a/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
new file mode 100644
index 0000000000..df4af60863
--- /dev/null
+++ b/src/datasource-service/src/test/java/org/apache/kylin/rest/service/SparkDDLTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.rest.service;
+
+import java.util.List;
+
+import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.common.SparkDDLTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.test.util.ReflectionTestUtils;
+
+public class SparkDDLTest extends NLocalFileMetadataTestCase {
+ @Autowired
+ private final SparkDDLService ddlService = Mockito.spy(new SparkDDLService());
+ @Autowired
+ private final IUserGroupService userGroupService = Mockito.spy(NUserGroupService.class);
+
+ private static final String CREATEVIEW_SQL1 =
+ "CREATE VIEW `ssb`.`ke_order_view` as select LO_ORDERKEY, C_NAME from SSB.P_LINEORDER t1 left join "
+ + "SSB. CUSTOMER t2 on t1. LO_CUSTKEY = t2. C_CUSTKEY";
+ private static final String CREATEVIEW_SQL2 = "CREATE VIEW `ssb`.`order_view2` as select * from SSB.P_LINEORDER";
+ private static final String CREATEVIEW_SQL3 = "CREATE VIEW `ssb`.`order_view2` as abc";
+ private static final String CREATEVIEW_SQL4 = "CREATE VIEW `ssb`.`order_view2` as select * from SSB.unload_table";
+ private static final String CREATEVIEW_SQL5 = "CREATE VIEW `ke_order_view2` as select * from SSB.P_LINEORDER";
+ private static final String CREATEVIEW_SQL6 = "abc";
+ private static final String ALTERVIEW_SQL =
+ "alter view `ssb`.`ke_order_view` as select lo_orderkey from SSB.P_LINEORDER";
+ private static final String DROPVIEW_SQL1 = "drop view `ssb`.`ke_order_view`";
+ private static final String DROPVIEW_SQL2 = "drop table `ssb`.`ke_table1`";
+ private static final String DROPVIEW_SQL3 = "drop table `ssb`.`ke_order_view`";
+ private static final String DROPVIEW_SQL4 = "drop table `ke_table2`";
+ private static final String SHOWVIEW_SQL = "show create table ssb.ke_order_view";
+
+ @AfterClass
+ public static void tearDownResource() {
+ staticCleanupTestMetadata();
+ }
+
+ @Before
+ public void setup() {
+ createTestMetadata();
+ Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN);
+ SecurityContextHolder.getContext().setAuthentication(authentication);
+ ReflectionTestUtils.setField(ddlService, "userGroupService", userGroupService);
+ }
+
+ @After
+ public void cleanup() {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void testDDL() {
+ try {
+ assertKylinExeption(
+ () ->
+ ddlService.executeDDLSql("ssb", CREATEVIEW_SQL5),
+ "DDL function has not been turned on.");
+
+ getTestConfig().setProperty("kylin.source.ddl.enabled", "true");
+ NTableMetadataManager tableManager = NTableMetadataManager.getInstance(getTestConfig(), "ssb");
+ SparkDDLTestUtils.prepare();
+ ddlService.executeDDLSql("ssb", CREATEVIEW_SQL5);
+ assertKylinExeption(
+ () ->
+ ddlService.executeDDLSql("ssb", CREATEVIEW_SQL2),
+ MsgPicker.getMsg().getDDLViewNameError());
+ assertKylinExeption(() ->
+ ddlService.executeDDLSql("ssb", CREATEVIEW_SQL3), "");
+ assertKylinExeption(() ->
+ ddlService.executeDDLSql("ssb", CREATEVIEW_SQL6), "");
+ assertKylinExeption(
+ () ->
+ ddlService.executeDDLSql("ssb", CREATEVIEW_SQL4),
+ MsgPicker.getMsg().getDDLTableNotLoad("SSB.unload_table"));
+ assertKylinExeption(
+ () ->
+ ddlService.executeDDLSql("ssb", DROPVIEW_SQL2),
+ MsgPicker.getMsg().getDDLDropError());
+ assertKylinExeption(
+ () ->
+ ddlService.executeDDLSql("ssb", DROPVIEW_SQL3), "");
+
+ ddlService.executeDDLSql("ssb", CREATEVIEW_SQL1);
+ ddlService.executeDDLSql("ssb", ALTERVIEW_SQL);
+ String createViewSQL = ddlService.executeDDLSql("ssb", SHOWVIEW_SQL);
+ Assert.assertTrue(createViewSQL.contains("ke_order_view"));
+ ddlService.executeDDLSql("ssb", DROPVIEW_SQL1);
+
+ Authentication authentication = new TestingAuthenticationToken("USER1",
+ "", Constant.GROUP_ALL_USERS);
+ SecurityContextHolder.getContext().setAuthentication(authentication);
+ assertKylinExeption(
+ () ->
+ ddlService.executeDDLSql("ssb", CREATEVIEW_SQL1),
+ MsgPicker.getMsg().getDDLPermissionDenied());
+
+ // ddl description
+ List<List<String>> description = ddlService.pluginsDescription("ssb");
+ Assert.assertTrue(description.size() > 0);
+ } finally {
+ SparkSession spark = SparderEnv.getSparkSession();
+ if (spark != null && !spark.sparkContext().isStopped()) {
+ spark.stop();
+ }
+ }
+ }
+}
diff --git a/src/datasource-service/src/test/scala/org/apache/spark/sql/common/SparkDDLTestUtils.scala b/src/datasource-service/src/test/scala/org/apache/spark/sql/common/SparkDDLTestUtils.scala
new file mode 100644
index 0000000000..22af162c2a
--- /dev/null
+++ b/src/datasource-service/src/test/scala/org/apache/spark/sql/common/SparkDDLTestUtils.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.common
+
+import java.io.File
+
+import org.apache.commons.io.FileUtils
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.kylin.common.util.RandomUtil
+
+import org.apache.spark.sql.{SparderEnv, SparkSession}
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.util.Utils
+
+object SparkDDLTestUtils {
+
+ def prepare(): Unit = {
+ val conf = new SparkConf(false)
+ val warehouse = s"./spark-warehouse/${RandomUtil.randomUUIDStr()}";
+ val file = new File(warehouse)
+ if (file.exists()) {
+ FileUtils.deleteDirectory(file)
+ }
+ conf.set(StaticSQLConf.WAREHOUSE_PATH.key, warehouse)
+ // Copied from TestHive
+ // HDFS root scratch dir requires the write all (733) permission. For each connecting user,
+ // an HDFS scratch dir: ${hive.exec.scratchdir}/<username> is created, with
+ // ${hive.scratch.dir.permission}. To resolve the permission issue, the simplest way is to
+ // delete it. Later, it will be re-created with the right permission.
+ val scratchDir = Utils.createTempDir()
+ if (scratchDir.exists()) {
+ FileUtils.deleteDirectory(scratchDir)
+ }
+ conf.set(ConfVars.SCRATCHDIR.varname, scratchDir.toString)
+ conf.set("spark.hadoop.javax.jdo.option.ConnectionURL", "jdbc:derby:memory:db;create=true")
+ val sparkSession = SparkSession.builder
+ .master("local[2]")
+ .appName(getClass.getSimpleName)
+ .config("fs.file.impl", classOf[DebugFilesystem].getCanonicalName)
+ .config(conf)
+ .enableHiveSupport()
+ .getOrCreate
+ SparderEnv.setSparkSession(sparkSession)
+ prepareTable
+ }
+
+ def prepareTable(): Unit = {
+ val spark = SparderEnv.getSparkSession
+ spark.sql("CREATE DATABASE if not exists SSB")
+ spark.sql("drop table if exists `ssb`.`lineorder`")
+ spark.sql(
+ s"""
+ |CREATE TABLE if not exists `ssb`.`lineorder`(
+ | `lo_orderkey` bigint,
+ | `lo_linenumber` bigint,
+ | `lo_custkey` int,
+ | `lo_partkey` int,
+ | `lo_suppkey` int,
+ | `lo_orderdate` int,
+ | `lo_orderpriotity` string,
+ | `lo_shippriotity` int,
+ | `lo_quantity` bigint,
+ | `lo_extendedprice` bigint,
+ | `lo_ordtotalprice` bigint,
+ | `lo_discount` bigint,
+ | `lo_revenue` bigint,
+ | `lo_supplycost` bigint,
+ | `lo_tax` bigint,
+ | `lo_commitdate` int,
+ | `lo_shipmode` string)""".stripMargin)
+ spark.sql("drop view if exists `ssb`.p_lineorder")
+ spark.sql(
+ s"""
+ |CREATE VIEW if not exists `ssb`.`p_lineorder` AS SELECT
+ | `lineorder`.`lo_orderkey`,
+ | `lineorder`.`lo_linenumber`,
+ | `lineorder`.`lo_custkey`,
+ | `lineorder`.`lo_partkey`,
+ | `lineorder`.`lo_suppkey`,
+ | `lineorder`.`lo_orderdate`,
+ | `lineorder`.`lo_orderpriotity`,
+ | `lineorder`.`lo_shippriotity`,
+ | `lineorder`.`lo_quantity`,
+ | `lineorder`.`lo_extendedprice`,
+ | `lineorder`.`lo_ordtotalprice`,
+ | `lineorder`.`lo_discount`,
+ | `lineorder`.`lo_revenue`,
+ | `lineorder`.`lo_supplycost`,
+ | `lineorder`.`lo_tax`,
+ | `lineorder`.`lo_commitdate`,
+ | `lineorder`.`lo_shipmode`,
+ | `lineorder`.`lo_extendedprice`*`lineorder`.`lo_discount` AS `V_REVENUE`
+ |FROM `ssb`.`LINEORDER`
+ """.stripMargin)
+ spark.sql("drop table if exists `ssb`.`customer`")
+ spark.sql(
+ s"""
+ |CREATE TABLE if not exists `ssb`.`customer`(
+ | `c_custkey` int,
+ | `c_name` string,
+ | `c_address` string,
+ | `c_city` string,
+ | `c_nation` string,
+ | `c_region` string,
+ | `c_phone` string,
+ | `c_mktsegment` string)
+ """.stripMargin)
+ spark.sql(
+ s"""
+ |CREATE TABLE if not exists `ssb`.`unload_table`(
+ | `c1` int,
+ | `c2` string)
+ """.stripMargin)
+ spark.sql(
+ s"""
+ |CREATE TABLE if not exists `ssb`.`ke_table1`(
+ | `c1` int,
+ | `c2` string)
+ """.stripMargin)
+ }
+}
diff --git a/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/SparkDDLController.java b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/SparkDDLController.java
new file mode 100644
index 0000000000..38c2e9d003
--- /dev/null
+++ b/src/metadata-server/src/main/java/org/apache/kylin/rest/controller/SparkDDLController.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.rest.controller;
+
+import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_JSON;
+import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON;
+
+import java.util.List;
+
+import org.apache.kylin.common.exception.KylinException;
+import org.apache.kylin.rest.request.ViewDDLRequest;
+import org.apache.kylin.rest.response.EnvelopeResponse;
+import org.apache.kylin.rest.service.SparkDDLService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.ResponseBody;
+import org.springframework.web.bind.annotation.RestController;
+
+import io.swagger.annotations.ApiOperation;
+import lombok.extern.slf4j.Slf4j;
+
+@RestController
+@RequestMapping(value = "/api/spark_source", produces = {HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON,
+ HTTP_VND_APACHE_KYLIN_JSON})
+@Slf4j
+public class SparkDDLController extends NBasicController {
+
+ @Autowired
+ private SparkDDLService sparkDDLService;
+
+ @ApiOperation(value = "ddl")
+ @PostMapping(value = "/ddl")
+ @ResponseBody
+ public EnvelopeResponse<String> executeSQL(@RequestBody ViewDDLRequest request)
+ throws Exception {
+ checkProjectName(request.getProject());
+ String result = sparkDDLService.executeDDLSql(request.getProject(), request.getSql());
+ return new EnvelopeResponse<>(KylinException.CODE_SUCCESS, result, "");
+ }
+
+ @ApiOperation(value = "ddl_description")
+ @GetMapping(value = "/ddl/description")
+ @ResponseBody
+ public EnvelopeResponse<List<List<String>>> description(@RequestParam("project") String project) {
+ checkProjectName(project);
+ return new EnvelopeResponse<>(KylinException.CODE_SUCCESS,
+ sparkDDLService.pluginsDescription(project), "");
+ }
+}
diff --git a/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/SparkDDLControllerTest.java b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/SparkDDLControllerTest.java
new file mode 100644
index 0000000000..f7d9938989
--- /dev/null
+++ b/src/metadata-server/src/test/java/org/apache/kylin/rest/controller/SparkDDLControllerTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.controller;
+
+import static org.apache.kylin.common.constant.HttpConstant.HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON;
+
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
+import org.apache.kylin.rest.constant.Constant;
+import org.apache.kylin.rest.request.ViewDDLRequest;
+import org.apache.kylin.rest.service.SparkDDLService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.springframework.http.MediaType;
+import org.springframework.security.authentication.TestingAuthenticationToken;
+import org.springframework.security.core.Authentication;
+import org.springframework.security.core.context.SecurityContextHolder;
+import org.springframework.test.web.servlet.MockMvc;
+import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
+import org.springframework.test.web.servlet.result.MockMvcResultMatchers;
+import org.springframework.test.web.servlet.setup.MockMvcBuilders;
+
+public class SparkDDLControllerTest extends NLocalFileMetadataTestCase {
+ private MockMvc mockMvc;
+
+ @Mock
+ private SparkDDLService sparkDDLService;
+
+ @InjectMocks
+ private SparkDDLController ddlController = Mockito.spy(new SparkDDLController());
+
+ private final Authentication authentication = new TestingAuthenticationToken("ADMIN", "ADMIN", Constant.ROLE_ADMIN);
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ mockMvc = MockMvcBuilders.standaloneSetup(ddlController)
+ .defaultRequest(MockMvcRequestBuilders.get("/")).build();
+ SecurityContextHolder.getContext().setAuthentication(authentication);
+ overwriteSystemProp("HADOOP_USER_NAME", "root");
+ createTestMetadata();
+ }
+
+ @After
+ public void tearDown() {
+ cleanupTestMetadata();
+ }
+
+ @Test
+ public void testExecuteSQL() throws Exception {
+ ViewDDLRequest request = new ViewDDLRequest();
+ request.setProject("ssb");
+
+ mockMvc.perform(MockMvcRequestBuilders.post("/api/spark_source/ddl")
+ .contentType(MediaType.APPLICATION_JSON)
+ .content(JsonUtil.writeValueAsString(request))
+ .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON)))
+ .andExpect(MockMvcResultMatchers.status().isOk()).andReturn();
+ }
+
+ @Test
+ public void testDescription() throws Exception {
+ mockMvc.perform(MockMvcRequestBuilders.get("/api/spark_source/ddl/description?project=ssb")
+ .contentType(MediaType.APPLICATION_JSON)
+ .accept(MediaType.parseMediaType(HTTP_VND_APACHE_KYLIN_V4_PUBLIC_JSON)))
+ .andExpect(MockMvcResultMatchers.status().isOk()).andReturn();
+ }
+}
\ No newline at end of file
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
index fdc55553ad..b99b9727f7 100644
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
+++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
@@ -28,8 +28,10 @@ import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
@@ -165,6 +167,41 @@ public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleD
return isAccess;
}
+ public boolean checkDatabaseHadoopAccessFast(String database) throws Exception {
+ boolean isAccess = true;
+ val spark = SparderEnv.getSparkSession();
+ try {
+ String databaseLocation = spark.catalog().getDatabase(database).locationUri();
+ RemoteIterator<FileStatus> tablesIterator = getFilesIterator(databaseLocation);
+ if (tablesIterator.hasNext()) {
+ Path tablePath = tablesIterator.next().getPath();
+ getFilesIterator(tablePath.toString());
+ }
+ } catch (Exception e) {
+ isAccess = false;
+ try {
+ logger.error("Read hive database {} error:{}, ugi name: {}.", database, e.getMessage(),
+ UserGroupInformation.getCurrentUser().getUserName());
+ } catch (IOException ex) {
+ logger.error("fetch user curr ugi info error.", e);
+ }
+ }
+ return isAccess;
+ }
+
+ private RemoteIterator<FileStatus> getFilesIterator(String location) throws IOException {
+ String hiveSpecFsLocation = SparderEnv.getSparkSession().sessionState().conf()
+ .getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION());
+ FileSystem fs = null == hiveSpecFsLocation ? HadoopUtil.getWorkingFileSystem()
+ : HadoopUtil.getFileSystem(hiveSpecFsLocation);
+ if (location.startsWith(fs.getScheme()) || location.startsWith("/")) {
+ fs.listStatus(new Path(location));
+ return fs.listStatusIterator(new Path(location));
+ } else {
+ return HadoopUtil.getFileSystem(location).listStatusIterator(new Path(location));
+ }
+ }
+
@Override
public Pair<TableDesc, TableExtDesc> loadTableMetadata(final String database, String tableName, String prj)
throws Exception {
diff --git a/src/spark-project/spark-ddl-plugin/pom.xml b/src/spark-project/spark-ddl-plugin/pom.xml
new file mode 100644
index 0000000000..9df7219174
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/pom.xml
@@ -0,0 +1,58 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>kylin</artifactId>
+ <groupId>org.apache.kylin</groupId>
+ <version>5.0.0-SNAPSHOT</version>
+ <relativePath>../../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>kylin-spark-ddl</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-systools</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-core-metadata</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-sparder</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.12</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.vintage</groupId>
+ <artifactId>junit-vintage-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kylin</groupId>
+ <artifactId>kylin-engine-spark</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
diff --git a/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheck.java b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheck.java
new file mode 100644
index 0000000000..10e3e3439c
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheck.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.spark.ddl;
+
+import static org.apache.kylin.common.exception.ServerErrorCode.DDL_CHECK_ERROR;
+
+import org.apache.kylin.common.exception.KylinException;
+
+public interface DDLCheck {
+
+ default String[] description(String project) {
+ return new String[] {"", ""};
+ }
+
+ void check(DDLCheckContext context);
+
+ default void throwException(String msg) {
+ throw new KylinException(DDL_CHECK_ERROR, msg);
+ }
+}
diff --git a/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheckContext.java b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheckContext.java
new file mode 100644
index 0000000000..e5e6d1819f
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/DDLCheckContext.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.spark.ddl;
+
+import java.util.Set;
+
+public class DDLCheckContext {
+ private String sql;
+ private String project;
+ private String userName;
+ private Set<String> groups;
+
+ public DDLCheckContext(String sql, String project, String userName, Set<String> groups) {
+ this.sql = sql;
+ this.project = project;
+ this.userName = userName;
+ this.groups = groups;
+ }
+
+ public String getSql() {
+ return sql;
+ }
+
+ public String getProject() {
+ return project;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public Set<String> getGroups() {
+ return groups;
+ }
+}
diff --git a/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/SourceTableCheck.java b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/SourceTableCheck.java
new file mode 100644
index 0000000000..c3c54623e5
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/src/main/java/org/apache/kylin/spark/ddl/SourceTableCheck.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.spark.ddl;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.msg.MsgPicker;
+import org.apache.kylin.metadata.model.ISourceAware;
+import org.apache.kylin.metadata.model.NTableMetadataManager;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.rest.util.AclPermissionUtil;
+import org.apache.spark.sql.SparderEnv;
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
+
+import lombok.val;
+import scala.collection.Seq;
+
+public class SourceTableCheck implements DDLCheck {
+
+ @Override
+ public String[] description(String project) {
+ return new String[] {
+ "The source table used to define the view needs to be loaded into the data source already",
+ "定义 view 用到的来源表需要已经加载到数据源"
+ };
+ }
+
+ @Override
+ public void check(DDLCheckContext context) {
+ val spark = SparderEnv.getSparkSession();
+ LogicalPlan logicalPlan = null;
+ try {
+ logicalPlan = spark.sessionState().sqlParser().parsePlan(context.getSql());
+ } catch (Throwable t) {
+ throwException(t.getMessage());
+ }
+ val tableManager = NTableMetadataManager.getInstance(
+ KylinConfig.getInstanceFromEnv(),
+ context.getProject());
+ if (!AclPermissionUtil.hasProjectAdminPermission(context.getProject(), context.getGroups())) {
+ throwException(MsgPicker.getMsg().getDDLPermissionDenied());
+ }
+ Seq<LogicalPlan> relationLeaves = logicalPlan.collectLeaves();
+ if (relationLeaves == null) {
+ return;
+ }
+ for (LogicalPlan plan : scala.collection.JavaConverters.seqAsJavaListConverter(relationLeaves).asJava()) {
+ if (plan instanceof UnresolvedRelation) {
+ val tableName = ((UnresolvedRelation) plan).tableName();
+ TableDesc tableDesc = tableManager.getTableDesc(tableName);
+ if (tableDesc == null) {
+ throwException(MsgPicker.getMsg().getDDLTableNotLoad(tableName));
+ }
+ if (ISourceAware.ID_HIVE != tableDesc.getSourceType()
+ && ISourceAware.ID_SPARK != tableDesc.getSourceType()) {
+ throwException(MsgPicker.getMsg().getDDLTableNotSupport(tableName));
+ }
+ }
+ }
+ }
+}
diff --git a/src/spark-project/spark-ddl-plugin/src/main/resources/META-INF/services/org.apache.kylin.spark.ddl.DDLCheck b/src/spark-project/spark-ddl-plugin/src/main/resources/META-INF/services/org.apache.kylin.spark.ddl.DDLCheck
new file mode 100644
index 0000000000..4f3a7527a8
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/src/main/resources/META-INF/services/org.apache.kylin.spark.ddl.DDLCheck
@@ -0,0 +1,2 @@
+org.apache.kylin.spark.ddl.SourceTableCheck
+org.apache.kylin.spark.ddl.ViewCheck
\ No newline at end of file
diff --git a/src/spark-project/spark-ddl-plugin/src/main/scala/org/apache/kylin/spark/ddl/ViewCheck.scala b/src/spark-project/spark-ddl-plugin/src/main/scala/org/apache/kylin/spark/ddl/ViewCheck.scala
new file mode 100644
index 0000000000..4fc451bdf2
--- /dev/null
+++ b/src/spark-project/spark-ddl-plugin/src/main/scala/org/apache/kylin/spark/ddl/ViewCheck.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.spark.ddl
+
+import java.security.PrivilegedExceptionAction
+
+import scala.collection.mutable.ListBuffer
+
+import org.apache.kylin.common.msg.MsgPicker
+import org.apache.kylin.engine.spark.source.NSparkMetadataExplorer
+import org.apache.kylin.rest.security.KerberosLoginManager
+import org.slf4j.LoggerFactory
+
+import org.apache.spark.sql.SparderEnv
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.{CommandExecutionMode, CommandResultExec, SparkPlan}
+import org.apache.spark.sql.execution.command._
+
+class ViewCheck extends DDLCheck {
+ private val log = LoggerFactory.getLogger(classOf[ViewCheck])
+ private val PREFIX = "KE_"
+ private val source = new NSparkMetadataExplorer
+
+ override def description(project: String): Array[String] = {
+ val databasesHasAccess = listAllDatabasesHasAccess(project)
+ Array(
+ "View name should start with `KE_`\n"
+ + "Only support `create view`,`alter view`,`drop view`,`show create table` syntax\n"
+ + s"Only supports creating views in ${databasesHasAccess}",
+ "View 名称需要以`KE_`开头\n"
+ + "仅支持 `create view`, `drop view`, `alter view`, `show create table` 语法\n"
+ + s"仅支持在 ${databasesHasAccess} 上述 database 中创建 view")
+ }
+
+ override def check(context: DDLCheckContext): Unit = {
+ log.info("start checking DDL view name")
+ val sql = context.getSql
+ val project = context.getProject
+ val spark = SparderEnv.getSparkSession
+ var plan: SparkPlan = null
+ try {
+ val logicalPlan = spark.sessionState.sqlParser.parsePlan(sql)
+ plan = stripRootCommandResult(spark.sessionState.executePlan(
+ logicalPlan, CommandExecutionMode.SKIP).executedPlan)
+ } catch {
+ case e: Exception => throwException(e.getMessage)
+ }
+ plan match {
+ case ExecutedCommandExec(view: CreateViewCommand) =>
+ checkTableName(view.name)
+ checkAccess(view.name, project)
+ case ExecutedCommandExec(view: ShowCreateTableCommand) =>
+ checkTableName(view.table)
+ checkAccess(view.table, project)
+ case ExecutedCommandExec(table: DropTableCommand) =>
+ checkTableName(table.tableName)
+ checkAccess(table.tableName, project)
+ if (!table.isView) {
+ throwException(MsgPicker.getMsg.getDDLDropError)
+ }
+ case ExecutedCommandExec(table: AlterViewAsCommand) =>
+ checkTableName(table.name)
+ checkAccess(table.name, project)
+ case _ => throwException(MsgPicker.getMsg.getDDLUnSupported)
+ }
+ }
+
+ private def checkTableName(identifier: TableIdentifier): Unit = {
+ if (!identifier.table.toUpperCase().startsWith(PREFIX)) {
+ throwException(MsgPicker.getMsg.getDDLViewNameError)
+ }
+ }
+
+ def checkAccess(identifier: TableIdentifier, project: String): Unit = {
+ val database = identifier.database.get
+ val ugi = KerberosLoginManager.getInstance.getProjectUGI(project)
+ val hasDatabaseAccess = ugi.doAs(new PrivilegedExceptionAction[Boolean]() {
+ override def run(): Boolean = {
+ source.checkDatabaseHadoopAccessFast(database)
+ }
+ })
+ if (!hasDatabaseAccess) {
+ throwException(MsgPicker.getMsg.getDDLDatabaseAccessnDenied)
+ }
+ }
+
+ def listAllDatabasesHasAccess(project: String): String = {
+ val ugi = KerberosLoginManager.getInstance.getProjectUGI(project)
+ val databasesHasAccess = ugi.doAs(new PrivilegedExceptionAction[List[String]]() {
+ override def run(): List[String] = {
+ val databases = source.listDatabases()
+ val databasesHasAccess = ListBuffer[String]()
+ databases.forEach(db => {
+ if (source.checkDatabaseHadoopAccessFast(db)) {
+ databasesHasAccess.append(db)
+ }
+ })
+ databasesHasAccess.toList
+ }
+ })
+ databasesHasAccess.mkString(",")
+ }
+
+ private def stripRootCommandResult(executedPlan: SparkPlan) = executedPlan match {
+ case CommandResultExec(_, plan, _) => plan
+ case other => other
+ }
+}