You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/08/09 02:47:07 UTC

[kylin] branch kylin5 updated: KYLIN-5217 Create a initial commit

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/kylin5 by this push:
     new 605b4f52ec KYLIN-5217 Create a initial commit
605b4f52ec is described below

commit 605b4f52ecd607fb6c6943d8014c1a2736254f17
Author: sunbiaobiao <su...@gmail.com>
AuthorDate: Tue Aug 9 10:47:02 2022 +0800

    KYLIN-5217 Create a initial commit
---
 .../kap/engine/spark/job/EnviromentAdaptor.java    |  29 -
 .../kap/engine/spark/job/IJobProgressReport.java   |  37 --
 .../kap/engine/spark/job/ISparkJobHandler.java     |  47 --
 .../kap/engine/spark/job/ParamsConstants.java      |  29 -
 .../kap/engine/spark/job/SparkAppDescription.java  |  53 --
 .../engine/spark/application/SparkApplication.java | 620 ---------------------
 .../kylin/engine/spark/application/SparkEntry.java |  27 -
 .../spark/source/NSparkCubingSourceInput.java      |  77 ---
 .../engine/spark/source/NSparkDataSource.java      |  79 ---
 .../spark/source/NSparkMetadataExplorer.java       | 313 -----------
 .../kylin/engine/spark/source/NSparkTable.java     |  65 ---
 .../kylin/engine/spark/source/NSparkTableMeta.java | 111 ----
 .../spark/source/NSparkTableMetaBuilder.java       | 140 -----
 .../spark/source/NSparkTableMetaExplorer.java      | 203 -------
 .../engine/spark/source/NSparkTableReader.java     |  87 ---
 .../kylin/engine/spark/source/SparkSqlUtil.java    |  66 ---
 16 files changed, 1983 deletions(-)

diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java
deleted file mode 100644
index 0301d20c91..0000000000
--- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.kyligence.kap.engine.spark.job;
-
-import java.util.Map;
-
-import org.apache.spark.sql.SparkSession;
-
-public interface EnviromentAdaptor {
-
-
-    Boolean prepareEnviroment(SparkSession spark, Map<String, String> params);
-}
diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java
deleted file mode 100644
index 9e5e58f20f..0000000000
--- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.kyligence.kap.engine.spark.job;
-
-import java.util.Map;
-
-public interface IJobProgressReport {
-
-    boolean updateSparkJobInfo(Map<String, String> params, String url, String json);
-
-    boolean updateSparkJobExtraInfo(Map<String, String> params, String url, String project, String jobId,
-            Map<String, String> extraInfo);
-
-    default boolean executeFinish(Map<String, String> params, String project, String jobId) {
-        return true;
-    }
-
-    default void initArgsParams(Map<String, String> argsParams) {
-    }
-
-}
diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java
deleted file mode 100644
index c167b52597..0000000000
--- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.kyligence.kap.engine.spark.job;
-
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.exception.ExecuteException;
-
-public interface ISparkJobHandler {
-
-    void killOrphanApplicationIfExists(String project, String jobStepId, KylinConfig config,
-            Map<String, String> sparkConf);
-
-    void checkApplicationJar(KylinConfig config) throws ExecuteException;
-
-    String createArgsFileOnRemoteFileSystem(KylinConfig config, String project, String jobId,
-            Map<String, String> params) throws ExecuteException;
-
-    Object generateSparkCmd(KylinConfig config, SparkAppDescription desc);
-
-    default void modifyDump(Properties props) {
-    }
-
-    default void prepareEnviroment(String project, String jobStepId, Map<String, String> params) {
-    }
-
-    Map<String, String> runSparkSubmit(Object cmd, String parentId) throws ExecuteException;
-
-}
diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java
deleted file mode 100644
index e218d319c7..0000000000
--- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.kyligence.kap.engine.spark.job;
-
-public class ParamsConstants {
-
-    private ParamsConstants() {
-        throw new IllegalStateException("Utility class");
-    }
-
-    public static final String TIME_OUT = "time_out";
-    public static final String JOB_TMP_DIR = "job_tmp_dir";
-}
diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java
deleted file mode 100644
index 8f105a5172..0000000000
--- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.kyligence.kap.engine.spark.job;
-
-import java.util.Map;
-import java.util.Set;
-
-import lombok.Data;
-
-@Data
-public class SparkAppDescription {
-
-    private String hadoopConfDir;
-
-    private String kylinJobJar;
-
-    private String appArgs;
-
-    private String jobNamePrefix;
-
-    private String project;
-
-    private String jobId;
-
-    private int stepId;
-
-    private String sparkSubmitClassName;
-
-    private Map<String, String> sparkConf;
-
-    private Set<String> sparkJars;
-
-    private Set<String> sparkFiles;
-
-    private String comma;
-
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
deleted file mode 100644
index cac224d44b..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ /dev/null
@@ -1,620 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.application;
-
-import static org.apache.kylin.engine.spark.job.StageType.WAITE_FOR_RESOURCE;
-import static org.apache.kylin.engine.spark.utils.SparkConfHelper.COUNT_DISTICT;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.cluster.IClusterManager;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.exception.KylinException;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.metadata.MetadataStore;
-import org.apache.kylin.common.util.Application;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.TimeZoneUtils;
-import org.apache.kylin.common.util.Unsafe;
-import org.apache.kylin.engine.spark.job.BuildJobInfos;
-import org.apache.kylin.engine.spark.job.KylinBuildEnv;
-import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
-import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
-import org.apache.kylin.engine.spark.job.ResourceDetect;
-import org.apache.kylin.engine.spark.job.RestfulJobProgressReport;
-import org.apache.kylin.engine.spark.job.SegmentBuildJob;
-import org.apache.kylin.engine.spark.job.SparkJobConstants;
-import org.apache.kylin.engine.spark.job.UdfManager;
-import org.apache.kylin.engine.spark.scheduler.ClusterMonitor;
-import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
-import org.apache.kylin.engine.spark.utils.SparkConfHelper;
-import org.apache.kylin.metadata.cube.model.NBatchConstants;
-import org.apache.kylin.metadata.model.NDataModel;
-import org.apache.kylin.metadata.model.NDataModelManager;
-import org.apache.kylin.metadata.model.NTableMetadataManager;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.query.pushdown.SparkSubmitter;
-import org.apache.kylin.query.util.PushDownUtil;
-import org.apache.spark.SparkConf;
-import org.apache.spark.application.NoRetryException;
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.sql.KylinSession;
-import org.apache.spark.sql.KylinSession$;
-import org.apache.spark.sql.SparderEnv;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.SparkSessionExtensions;
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
-import org.apache.spark.sql.catalyst.rules.Rule;
-import org.apache.spark.sql.execution.datasource.AlignmentTableStats;
-import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
-import org.apache.spark.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-import io.kyligence.kap.engine.spark.job.EnviromentAdaptor;
-import io.kyligence.kap.engine.spark.job.IJobProgressReport;
-import io.kyligence.kap.engine.spark.job.ParamsConstants;
-import lombok.val;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-public abstract class SparkApplication implements Application {
-    private static final Logger logger = LoggerFactory.getLogger(SparkApplication.class);
-    private Map<String, String> params = Maps.newHashMap();
-    public static final String JOB_NAME_PREFIX = "job_step_";
-    private IJobProgressReport report;
-
-    protected volatile KylinConfig config;
-    protected volatile String jobId;
-    protected String project;
-    protected int layoutSize = -1;
-    protected BuildJobInfos infos;
-    /**
-     * path for spark app args on HDFS
-     */
-    protected String path;
-
-    private ClusterMonitor clusterMonitor;
-    private final AtomicLong atomicDisconnectSparkMasterTimes = new AtomicLong(0);
-    private final AtomicBoolean atomicUnreachableSparkMaster = new AtomicBoolean(false);
-    private final AtomicReference<SparkConf> atomicSparkConf = new AtomicReference<>(null);
-    private final AtomicReference<SparkSession> atomicSparkSession = new AtomicReference<>(null);
-    private final AtomicReference<KylinBuildEnv> atomicBuildEnv = new AtomicReference<>(null);
-
-    public void execute(String[] args) {
-        try {
-            path = args[0];
-            String argsLine = readArgsFromHDFS();
-            params = JsonUtil.readValueAsMap(argsLine);
-            logger.info("Execute {} with args : {}", this.getClass().getName(), argsLine);
-            execute();
-        } catch (Exception e) {
-            throw new RuntimeException("Error execute " + this.getClass().getName(), e);
-        }
-    }
-
-    public AtomicBoolean getAtomicUnreachableSparkMaster() {
-        return atomicUnreachableSparkMaster;
-    }
-
-    public final Map<String, String> getParams() {
-        return this.params;
-    }
-
-    public final String getParam(String key) {
-        return this.params.get(key);
-    }
-
-    public final void setParam(String key, String value) {
-        this.params.put(key, value);
-    }
-
-    public final boolean contains(String key) {
-        return params.containsKey(key);
-    }
-
-    public String getJobId() {
-        return jobId;
-    }
-
-    public String getProject() {
-        return project;
-    }
-
-    public KylinConfig getConfig() {
-        return config;
-    }
-
-    public IJobProgressReport getReport() {
-        if (report == null)
-            return new RestfulJobProgressReport();
-        return report;
-    }
-
-    /// backwards compatibility, must have been initialized before invoking #doExecute.
-    protected SparkSession ss;
-
-    public SparkSession getSparkSession() throws NoRetryException {
-        SparkSession sparkSession = atomicSparkSession.get();
-        if (Objects.isNull(sparkSession)) {
-            // shouldn't reach here
-            throw new NoRetryException("spark session shouldn't be null");
-        }
-        return sparkSession;
-    }
-
-    public String readArgsFromHDFS() {
-        val fs = HadoopUtil.getFileSystem(path);
-        String argsLine = null;
-        Path filePath = new Path(path);
-        try (FSDataInputStream inputStream = fs.open(filePath)) {
-            BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, Charset.defaultCharset()));
-            argsLine = br.readLine();
-        } catch (IOException e) {
-            logger.error("Error occurred when reading args file: {}", path, e);
-        }
-        return argsLine;
-    }
-
-    /**
-     * get tracking url by application id
-     *
-     * @param sparkSession build sparkSession
-     * @return
-     */
-    public String getTrackingUrl(IClusterManager clusterManager, SparkSession sparkSession) {
-        return clusterManager.getBuildTrackingUrl(sparkSession);
-    }
-
-    private String tryReplaceHostAddress(String url) {
-        String originHost = null;
-        try {
-            URI uri = URI.create(url);
-            originHost = uri.getHost();
-            String hostAddress = InetAddress.getByName(originHost).getHostAddress();
-            return url.replace(originHost, hostAddress);
-        } catch (UnknownHostException uhe) {
-            logger.error("failed to get the ip address of {}, step back to use the origin tracking url.", originHost,
-                    uhe);
-            return url;
-        }
-    }
-
-    private Map<String, String> getTrackingInfo(SparkSession sparkSession, boolean ipAddressPreferred) {
-        IClusterManager clusterManager = atomicBuildEnv.get().clusterManager();
-        String applicationId = sparkSession.sparkContext().applicationId();
-        Map<String, String> extraInfo = new HashMap<>();
-        extraInfo.put("yarn_app_id", applicationId);
-        try {
-            String trackingUrl = getTrackingUrl(clusterManager, sparkSession);
-            if (StringUtils.isBlank(trackingUrl)) {
-                logger.warn("Get tracking url of application {}, but empty url found.", applicationId);
-                return extraInfo;
-            }
-            if (ipAddressPreferred) {
-                trackingUrl = tryReplaceHostAddress(trackingUrl);
-            }
-            extraInfo.put("yarn_app_url", trackingUrl);
-        } catch (Exception e) {
-            logger.error("get tracking url failed!", e);
-        }
-        return extraInfo;
-    }
-
-    protected void exchangeSparkSession() {
-        exchangeSparkSession(atomicSparkConf.get());
-    }
-
-    protected final void execute() throws Exception {
-        String hdfsMetalUrl = getParam(NBatchConstants.P_DIST_META_URL);
-        jobId = getParam(NBatchConstants.P_JOB_ID);
-        project = getParam(NBatchConstants.P_PROJECT_NAME);
-        if (getParam(NBatchConstants.P_LAYOUT_IDS) != null) {
-            layoutSize = StringUtils.split(getParam(NBatchConstants.P_LAYOUT_IDS), ",").length;
-        }
-        try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig
-                .setAndUnsetThreadLocalConfig(KylinConfig.loadKylinConfigFromHdfs(hdfsMetalUrl))) {
-            config = autoCloseConfig.get();
-            report = (IJobProgressReport) ClassUtil.newInstance(config.getBuildJobProgressReporter());
-            report.initArgsParams(getParams());
-            //// KylinBuildEnv
-            final KylinBuildEnv buildEnv = KylinBuildEnv.getOrCreate(config);
-            atomicBuildEnv.set(buildEnv);
-            infos = buildEnv.buildJobInfos();
-            infos.recordJobId(jobId);
-            infos.recordProject(project);
-            infos.recordJobStepId(System.getProperty("spark.driver.param.taskId", jobId));
-
-            monitorSparkMaster();
-
-            HadoopUtil.setCurrentConfiguration(new Configuration());
-            ////////
-            exchangeSparkConf(buildEnv.sparkConf());
-
-            TimeZoneUtils.setDefaultTimeZone(config);
-
-            /// wait until resource is enough
-            waiteForResource(atomicSparkConf.get(), buildEnv);
-
-            ///
-            logger.info("Prepare job environment");
-            prepareSparkSession();
-
-            /// backwards compatibility
-            ss = getSparkSession();
-            val master = ss.conf().get(SparkLauncher.SPARK_MASTER, "");
-            if (!master.equals("local")) {
-                EnviromentAdaptor adaptor = (EnviromentAdaptor) ClassUtil
-                        .newInstance(config.getBuildJobEnviromentAdaptor());
-                adaptor.prepareEnviroment(ss, params);
-            }
-
-            if (config.useDynamicS3RoleCredentialInTable()) {
-                val tableMetadataManager = NTableMetadataManager.getInstance(config, project);
-                tableMetadataManager.listAllTables().forEach(tableDesc -> SparderEnv
-                        .addS3CredentialFromTableToSpark(tableMetadataManager.getOrCreateTableExt(tableDesc), ss));
-            }
-
-            if (!config.isUTEnv()) {
-                Unsafe.setProperty("kylin.env", config.getDeployEnv());
-            }
-            logger.info("Start job");
-            infos.startJob();
-            // should be invoked after method prepareSparkSession
-            extraInit();
-
-            waiteForResourceSuccess();
-            doExecute();
-            // Output metadata to another folder
-            val resourceStore = ResourceStore.getKylinMetaStore(config);
-            val outputConfig = KylinConfig.createKylinConfig(config);
-            outputConfig.setMetadataUrl(getParam(NBatchConstants.P_OUTPUT_META_URL));
-            MetadataStore.createMetadataStore(outputConfig).dump(resourceStore);
-        } catch (Exception e) {
-            handleException(e);
-        } finally {
-            if (infos != null) {
-                infos.jobEnd();
-            }
-            destroySparkSession();
-            extraDestroy();
-            executeFinish();
-        }
-    }
-
-    protected void handleException(Exception e) throws Exception {
-        throw e;
-    }
-
-    private SparkSession createSpark(SparkConf sparkConf) {
-        SparkSession.Builder sessionBuilder = SparkSession.builder()
-                .withExtensions(new AbstractFunction1<SparkSessionExtensions, BoxedUnit>() {
-                    @Override
-                    public BoxedUnit apply(SparkSessionExtensions v1) {
-                        v1.injectPostHocResolutionRule(new AbstractFunction1<SparkSession, Rule<LogicalPlan>>() {
-                            @Override
-                            public Rule<LogicalPlan> apply(SparkSession session) {
-                                return new AlignmentTableStats(session);
-                            }
-                        });
-                        return BoxedUnit.UNIT;
-                    }
-                }).enableHiveSupport().config(sparkConf)
-                .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false");
-
-        // If this is UT and SparkSession is already created, then use SparkSession.
-        // Otherwise, we always use KylinSession
-        boolean createWithSparkSession = !isJobOnCluster(sparkConf) && SparderEnv.isSparkAvailable();
-        if (createWithSparkSession) {
-            boolean isKylinSession = SparderEnv.getSparkSession() instanceof KylinSession;
-            createWithSparkSession = !isKylinSession;
-        }
-
-        if (createWithSparkSession) {
-            return sessionBuilder.getOrCreate();
-        } else {
-            return KylinSession$.MODULE$.KylinBuilder(sessionBuilder).buildCluster().getOrCreateKylinSession();
-        }
-    }
-
-    public boolean isJobOnCluster(SparkConf conf) {
-        return !Utils.isLocalMaster(conf) && !config.isUTEnv();
-    }
-
-    protected void extraInit() {
-    }
-
-    public void extraDestroy() {
-        if (clusterMonitor != null) {
-            clusterMonitor.shutdown();
-        }
-    }
-
-    protected abstract void doExecute() throws Exception;
-
-    protected void onLayoutFinished(long layoutId) {
-        //do nothing
-    }
-
-    protected void onExecuteFinished() {
-        //do nothing
-    }
-
-    protected String calculateRequiredCores() throws Exception {
-        return SparkJobConstants.DEFAULT_REQUIRED_CORES;
-    }
-
-    private void autoSetSparkConf(SparkConf sparkConf) throws Exception {
-        SparkConfHelper helper = new SparkConfHelper();
-        // copy user defined spark conf
-        if (sparkConf.getAll() != null) {
-            Arrays.stream(sparkConf.getAll()).forEach(config -> helper.setConf(config._1, config._2));
-        }
-        helper.setClusterManager(KylinBuildEnv.get().clusterManager());
-
-        chooseContentSize(helper);
-
-        helper.setOption(SparkConfHelper.LAYOUT_SIZE, Integer.toString(layoutSize));
-        helper.setOption(SparkConfHelper.REQUIRED_CORES, calculateRequiredCores());
-        helper.setConf(COUNT_DISTICT, hasCountDistinct().toString());
-        helper.generateSparkConf();
-        helper.applySparkConf(sparkConf);
-    }
-
-    private void waiteForResource(SparkConf sparkConf, KylinBuildEnv buildEnv) throws Exception {
-        val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
-        infos.recordStageId(waiteForResource.getId());
-        waiteForResource.execute();
-    }
-
-    protected void waiteForResourceSuccess() throws Exception {
-        val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
-        waiteForResource.onStageFinished(true);
-        infos.recordStageId("");
-    }
-
-    protected void executeFinish() {
-        try {
-            getReport().executeFinish(getReportParams(), project, getJobId());
-        } catch (Exception e) {
-            logger.error("executeFinish failed", e);
-        }
-    }
-
-    protected void chooseContentSize(SparkConfHelper helper) {
-        Path shareDir = config.getJobTmpShareDir(project, jobId);
-        // add content size with unit
-        helper.setOption(SparkConfHelper.SOURCE_TABLE_SIZE, chooseContentSize(shareDir));
-    }
-
-    protected boolean checkRangePartitionTableIsExist(NDataModel modelDesc) {
-        return modelDesc.getAllTableRefs().stream().anyMatch(p -> p.getTableDesc().isRangePartition());
-    }
-
-    protected String chooseContentSize(Path shareDir) {
-        // return size with unit
-        return ResourceDetectUtils.getMaxResourceSize(shareDir) + "b";
-    }
-
-    protected Boolean hasCountDistinct() throws IOException {
-        Path countDistinct = new Path(config.getJobTmpShareDir(project, jobId),
-                ResourceDetectUtils.countDistinctSuffix());
-        FileSystem fileSystem = countDistinct.getFileSystem(HadoopUtil.getCurrentConfiguration());
-        Boolean exist;
-        if (fileSystem.exists(countDistinct)) {
-            exist = ResourceDetectUtils.readResourcePathsAs(countDistinct);
-        } else {
-            exist = false;
-            logger.debug("File count_distinct.json doesn't exist, set hasCountDistinct to false.");
-        }
-        logger.debug("Exist count distinct measure: {}", exist);
-        return exist;
-    }
-
-    public void logJobInfo() {
-        try {
-            logger.info(generateInfo());
-            if (KylinConfig.getInstanceFromEnv().skipRecordJobExecutionTime()) {
-                logger.info("skip record job wait and run time");
-                return;
-            }
-            Map<String, String> extraInfo = new HashMap<>();
-            extraInfo.put("yarn_job_wait_time", ((Long) KylinBuildEnv.get().buildJobInfos().waitTime()).toString());
-            extraInfo.put("yarn_job_run_time", ((Long) KylinBuildEnv.get().buildJobInfos().buildTime()).toString());
-
-            getReport().updateSparkJobExtraInfo(getReportParams(), "/kylin/api/jobs/wait_and_run_time", project, jobId,
-                    extraInfo);
-        } catch (Exception e) {
-            logger.warn("Error occurred when generate job info.", e);
-        }
-    }
-
-    private Map<String, String> getReportParams() {
-        val reportParams = new HashMap<String, String>();
-        reportParams.put(ParamsConstants.TIME_OUT, String.valueOf(config.getUpdateJobInfoTimeout()));
-        reportParams.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true));
-        return reportParams;
-    }
-
-    protected String generateInfo() {
-        return LogJobInfoUtils.sparkApplicationInfo();
-    }
-
-    public Set<String> getIgnoredSnapshotTables() {
-        return NSparkCubingUtil.toIgnoredTableSet(getParam(NBatchConstants.P_IGNORED_SNAPSHOT_TABLES));
-    }
-
-    protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
-        return config.getSparkConfigOverride();
-    }
-
-    protected void checkDateFormatIfExist(String project, String modelId) throws Exception {
-        if (config.isUTEnv()) {
-            return;
-        }
-        val modelManager = NDataModelManager.getInstance(config, project);
-        NDataModel modelDesc = modelManager.getDataModelDesc(modelId);
-        if (checkRangePartitionTableIsExist(modelDesc)) {
-            logger.info("Range partitioned tables do not support pushdown, so do not need to perform subsequent logic");
-            return;
-        }
-
-        val partitionDesc = modelDesc.getPartitionDesc();
-        if (PartitionDesc.isEmptyPartitionDesc(partitionDesc)
-                || org.apache.commons.lang.StringUtils.isEmpty(partitionDesc.getPartitionDateFormat()))
-            return;
-
-        if (CatalogTableType.VIEW().name().equals(modelDesc.getRootFactTable().getTableDesc().getTableType()))
-            return;
-
-        String partitionColumn = modelDesc.getPartitionDesc().getPartitionDateColumnRef().getExpressionInSourceDB();
-
-        SparkSession sparkSession = atomicSparkSession.get();
-        try (SparkSubmitter.OverriddenSparkSession ignored = SparkSubmitter.getInstance()
-                .overrideSparkSession(sparkSession)) {
-            String dateString = PushDownUtil.getFormatIfNotExist(modelDesc.getRootFactTableName(), partitionColumn,
-                    project);
-            val sdf = new SimpleDateFormat(modelDesc.getPartitionDesc().getPartitionDateFormat(),
-                    Locale.getDefault(Locale.Category.FORMAT));
-            val date = sdf.parse(dateString);
-            if (date == null || !dateString.equals(sdf.format(date))) {
-                throw new NoRetryException("date format not match");
-            }
-        } catch (KylinException ignore) {
-            // ignore it when pushdown return empty row
-        } catch (ParseException | NoRetryException e) {
-            throw new NoRetryException("date format not match");
-        }
-    }
-
-    private void exchangeSparkConf(SparkConf sparkConf) throws Exception {
-        if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) {
-            Map<String, String> baseSparkConf = getSparkConfigOverride(config);
-            if (!baseSparkConf.isEmpty()) {
-                baseSparkConf.forEach(sparkConf::set);
-                String baseSparkConfStr = JsonUtil.writeValueAsString(baseSparkConf);
-                logger.info("Override user-defined spark conf: {}", baseSparkConfStr);
-            }
-            if (config.isAutoSetSparkConf()) {
-                logger.info("Set spark conf automatically.");
-                try {
-                    autoSetSparkConf(sparkConf);
-                } catch (Exception e) {
-                    logger.warn("Auto set spark conf failed. Load spark conf from system properties", e);
-                }
-            }
-        }
-
-        atomicSparkConf.set(sparkConf);
-    }
-
-    private void exchangeSparkSession(SparkConf sparkConf) {
-        SparkSession sparkSession = atomicSparkSession.get();
-        if (Objects.nonNull(sparkSession)) {
-            // destroy previous spark session
-            destroySparkSession();
-        }
-
-        sparkSession = createSpark(sparkConf);
-        if (!config.isUTEnv() && !sparkConf.get("spark.master").startsWith("k8s")) {
-            getReport().updateSparkJobExtraInfo(getReportParams(), "/kylin/api/jobs/spark", project, jobId,
-                    getTrackingInfo(sparkSession, config.isTrackingUrlIpAddressEnabled()));
-        }
-
-        // for spark metrics
-        JobMetricsUtils.registerListener(sparkSession);
-        SparderEnv.registerListener(sparkSession.sparkContext());
-
-        //#8341
-        SparderEnv.setSparkSession(sparkSession);
-        UdfManager.create(sparkSession);
-
-        ///
-        atomicSparkSession.set(sparkSession);
-    }
-
-    private void prepareSparkSession() throws NoRetryException {
-        SparkConf sparkConf = atomicSparkConf.get();
-        if (Objects.isNull(sparkConf)) {
-            // shouldn't reach here
-            throw new NoRetryException("spark conf shouldn't be null");
-        }
-
-        /// SegmentBuildJob only!!!
-        if (config.isSnapshotSpecifiedSparkConf() && (this instanceof SegmentBuildJob)) {
-            // snapshot specified spark conf, based on the exchanged spark conf.
-            SparkConf clonedSparkConf = sparkConf.clone();
-            Map<String, String> snapshotSparkConf = config.getSnapshotBuildingConfigOverride();
-            snapshotSparkConf.forEach(clonedSparkConf::set);
-            logger.info("exchange sparkSession using snapshot specified sparkConf");
-            exchangeSparkSession(clonedSparkConf);
-            return;
-        }
-        // normal logic
-        exchangeSparkSession(sparkConf);
-    }
-
-    private void destroySparkSession() {
-        SparkSession sparkSession = atomicSparkSession.get();
-        if (Objects.isNull(sparkSession)) {
-            logger.info("no initialized sparkSession instance");
-            return;
-        }
-        if (sparkSession.conf().get("spark.master").startsWith("local")) {
-            // for UT use? but very strange for resource detect mode (spark local).
-            return;
-        }
-        JobMetricsUtils.unRegisterListener(sparkSession);
-        sparkSession.stop();
-    }
-
-    private void monitorSparkMaster() {
-        clusterMonitor = new ClusterMonitor();
-        clusterMonitor.monitorSparkMaster(atomicBuildEnv, atomicSparkSession, atomicDisconnectSparkMasterTimes,
-                atomicUnreachableSparkMaster);
-    }
-
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java
deleted file mode 100644
index 31974f65bc..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.application;
-
-import org.apache.spark.application.JobWorkSpace;
-
-public class SparkEntry {
-    public static void main(String[] args) {
-        JobWorkSpace.execute(args);
-    }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
deleted file mode 100644
index 5411c31bbd..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import static org.apache.kylin.engine.spark.stats.utils.HiveTableRefChecker.isNeedCreateHiveTemporaryTable;
-
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.engine.spark.NSparkCubingEngine;
-import org.apache.kylin.engine.spark.job.KylinBuildEnv;
-import org.apache.kylin.engine.spark.utils.HiveTransactionTableHelper;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.SparderTypeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
-public class NSparkCubingSourceInput implements NSparkCubingEngine.NSparkCubingSource {
-    private static final Logger logger = LoggerFactory.getLogger(NSparkCubingSourceInput.class);
-
-    @Override
-    public Dataset<Row> getSourceData(TableDesc table, SparkSession ss, Map<String, String> params) {
-        ColumnDesc[] columnDescs = table.getColumns();
-        List<String> tblColNames = Lists.newArrayListWithCapacity(columnDescs.length);
-        StructType kylinSchema = new StructType();
-        for (ColumnDesc columnDesc : columnDescs) {
-            if (!columnDesc.isComputedColumn()) {
-                kylinSchema = kylinSchema.add(columnDesc.getName(),
-                        SparderTypeUtil.toSparkType(columnDesc.getType(), false), true);
-                tblColNames.add("`" + columnDesc.getName() + "`");
-            }
-        }
-        String[] colNames = tblColNames.toArray(new String[0]);
-        String colString = Joiner.on(",").join(colNames);
-        String sql;
-        KylinConfig kylinConfig = KylinBuildEnv.get().kylinConfig();
-        logger.info("isRangePartition:{};isTransactional:{};isReadTransactionalTableEnabled:{}",
-                table.isRangePartition(), table.isTransactional(), kylinConfig.isReadTransactionalTableEnabled());
-        if (isNeedCreateHiveTemporaryTable(table.isRangePartition(), table.isTransactional(),
-                kylinConfig.isReadTransactionalTableEnabled())) {
-            sql = HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(table, params, colString,
-                    KylinBuildEnv.get());
-        } else {
-            sql = String.format(Locale.ROOT, "select %s from %s", colString, table.getIdentity());
-        }
-        Dataset<Row> df = ss.sql(sql);
-        StructType sparkSchema = df.schema();
-        logger.debug("Source data sql is: {}", sql);
-        logger.debug("Kylin schema: {}", kylinSchema.treeString());
-        return df.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema, kylinSchema));
-    }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java
deleted file mode 100644
index 6a501bc71e..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.source;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.IReadableTable;
-import org.apache.kylin.source.ISampleDataDeployer;
-import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.engine.spark.NSparkCubingEngine;
-
-public class NSparkDataSource implements ISource {
-    // for reflection
-    public NSparkDataSource(KylinConfig config) {
-
-    }
-
-    @Override
-    public ISourceMetadataExplorer getSourceMetadataExplorer() {
-        return new NSparkMetadataExplorer();
-    }
-
-    @Override
-    public <I> I adaptToBuildEngine(Class<I> engineInterface) {
-        if (engineInterface == NSparkCubingEngine.NSparkCubingSource.class) {
-            return (I) new NSparkCubingSourceInput();
-        } else {
-            throw new IllegalArgumentException("Unsupported engine interface: " + engineInterface);
-        }
-    }
-
-    @Override
-    public IReadableTable createReadableTable(TableDesc tableDesc) {
-        return new NSparkTable(tableDesc);
-    }
-
-    @Override
-    public SegmentRange enrichSourcePartitionBeforeBuild(IBuildable buildable, SegmentRange srcPartition) {
-        return srcPartition;
-    }
-
-    @Override
-    public ISampleDataDeployer getSampleDataDeployer() {
-        return new NSparkMetadataExplorer();
-    }
-
-    @Override
-    public SegmentRange getSegmentRange(String start, String end) {
-        start = StringUtils.isEmpty(start) ? "0" : start;
-        end = StringUtils.isEmpty(end) ? "" + Long.MAX_VALUE : end;
-        return new SegmentRange.TimePartitionedSegmentRange(Long.parseLong(start), Long.parseLong(end));
-    }
-
-    @Override
-    public boolean supportBuildSnapShotByPartition() {
-        return true;
-    }
-
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
deleted file mode 100644
index 7e4f7deea0..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.kylin.common.KapConfig;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.RandomUtil;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.ISourceAware;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.source.ISampleDataDeployer;
-import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.metadata.model.NTableMetadataManager;
-import org.apache.spark.sql.AnalysisException;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparderEnv;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalog.Database;
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
-import org.apache.spark.sql.internal.SQLConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.clearspring.analytics.util.Lists;
-import com.google.common.collect.Sets;
-
-import lombok.val;
-
-public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleDataDeployer, Serializable {
-
-    private static final Logger logger = LoggerFactory.getLogger(NSparkMetadataExplorer.class);
-
-    public static String generateCreateSchemaSql(String schemaName) {
-        return String.format(Locale.ROOT, "CREATE DATABASE IF NOT EXISTS %s", schemaName);
-    }
-
-    public static String[] generateCreateTableSql(TableDesc tableDesc) {
-        String dropSql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity();
-
-        StringBuilder ddl = new StringBuilder();
-        ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
-        ddl.append("(" + "\n");
-
-        for (int i = 0; i < tableDesc.getColumns().length; i++) {
-            ColumnDesc col = tableDesc.getColumns()[i];
-            if (i > 0) {
-                ddl.append(",");
-            }
-            ddl.append(col.getName() + " " + col.getDatatype() + "\n");
-        }
-
-        ddl.append(")" + "\n");
-        ddl.append("USING com.databricks.spark.csv");
-
-        return new String[] { dropSql, ddl.toString() };
-    }
-
-    public NSparkTableMetaExplorer getTableMetaExplorer() {
-        return new NSparkTableMetaExplorer();
-    }
-
-    @Override
-    public List<String> listDatabases() throws Exception {
-        Dataset<Row> dataset = SparderEnv.getSparkSession().sql("show databases").select("namespace");
-        return dataset.collectAsList().stream().map(row -> row.getString(0)).collect(Collectors.toList());
-    }
-
-    @Override
-    public List<String> listTables(String database) throws Exception {
-        val ugi = UserGroupInformation.getCurrentUser();
-        val config = KylinConfig.getInstanceFromEnv();
-        val spark = SparderEnv.getSparkSession();
-
-        List<String> tables = Lists.newArrayList();
-        try {
-            String sql = "show tables";
-            if (StringUtils.isNotBlank(database)) {
-                sql = String.format(Locale.ROOT, sql + " in %s", database);
-            }
-            Dataset<Row> dataset = SparderEnv.getSparkSession().sql(sql).select("tableName");
-            tables = dataset.collectAsList().stream().map(row -> row.getString(0)).collect(Collectors.toList());
-
-            if (config.getTableAccessFilterEnable() && config.getKerberosProjectLevelEnable()
-                    && UserGroupInformation.isSecurityEnabled()) {
-                List<String> accessTables = Lists.newArrayList();
-                for (String table : tables) {
-                    val tableName = database + "." + table;
-                    if (checkTableAccess(tableName)) {
-                        accessTables.add(table);
-                    }
-                }
-                return accessTables;
-            }
-        } catch (Exception e) {
-            logger.error("List hive tables failed. user: {}, db: {}", ugi.getUserName(), database);
-        }
-
-        return tables;
-    }
-
-    public boolean checkTableAccess(String tableName) {
-        boolean isAccess = true;
-        try {
-            val spark = SparderEnv.getSparkSession();
-            val sparkTable = spark.catalog().getTable(tableName);
-            Set<String> needCheckTables = Sets.newHashSet();
-            if (sparkTable.tableType().equals(CatalogTableType.VIEW().name())) {
-                needCheckTables = SparkSqlUtil.getViewOrignalTables(tableName, SparderEnv.getSparkSession());
-            } else {
-                needCheckTables.add(tableName);
-            }
-            String hiveSpecFsLocation = spark.sessionState().conf().getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION());
-            FileSystem fs = null == hiveSpecFsLocation ? HadoopUtil.getWorkingFileSystem()
-                    : HadoopUtil.getFileSystem(hiveSpecFsLocation);
-            for (String table : needCheckTables) {
-                fs.listStatus(new Path(getLoc(spark, table, hiveSpecFsLocation)));
-            }
-        } catch (Exception e) {
-            isAccess = false;
-            try {
-                logger.error("Read hive table {} error:{}, ugi name: {}.", tableName, e.getMessage(),
-                        UserGroupInformation.getCurrentUser().getUserName());
-            } catch (IOException ex) {
-                logger.error("fetch user curr ugi info error.", e);
-            }
-        }
-        return isAccess;
-    }
-
-    @Override
-    public Pair<TableDesc, TableExtDesc> loadTableMetadata(final String database, String tableName, String prj)
-            throws Exception {
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
-        NTableMetadataManager metaMgr = NTableMetadataManager.getInstance(config, prj);
-
-        NSparkTableMeta tableMeta = getTableMetaExplorer().getSparkTableMeta(database, tableName);
-        TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
-
-        // make a new TableDesc instance, don't modify the one in use
-        if (tableDesc == null) {
-            tableDesc = new TableDesc();
-            tableDesc.setDatabase(database.toUpperCase(Locale.ROOT));
-            tableDesc.setName(tableName.toUpperCase(Locale.ROOT));
-            tableDesc.setUuid(RandomUtil.randomUUIDStr());
-            tableDesc.setLastModified(0);
-        } else {
-            tableDesc = new TableDesc(tableDesc);
-        }
-
-        if (tableMeta.tableType != null) {
-            tableDesc.setTableType(tableMeta.tableType);
-        }
-        //set table type = spark
-        tableDesc.setSourceType(ISourceAware.ID_SPARK);
-        tableDesc.setTransactional(tableMeta.isTransactional);
-        tableDesc.setRangePartition(tableMeta.isRangePartition);
-
-        Set<String> partColumnSet = Optional.ofNullable(tableMeta.partitionColumns) //
-                .orElseGet(Collections::emptyList).stream().map(field -> field.name) //
-                .collect(Collectors.toSet());
-        int columnNumber = tableMeta.allColumns.size();
-        List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
-        for (int i = 0; i < columnNumber; i++) {
-            NSparkTableMeta.SparkTableColumnMeta field = tableMeta.allColumns.get(i);
-            ColumnDesc cdesc = new ColumnDesc();
-            cdesc.setName(field.name.toUpperCase(Locale.ROOT));
-            cdesc.setCaseSensitiveName(field.name);
-            // use "double" in kylin for "float"
-            if ("float".equalsIgnoreCase(field.dataType)) {
-                cdesc.setDatatype("double");
-            } else {
-                cdesc.setDatatype(field.dataType);
-            }
-            cdesc.setId(String.valueOf(i + 1));
-            cdesc.setComment(field.comment);
-            cdesc.setPartitioned(partColumnSet.contains(field.name));
-            columns.add(cdesc);
-        }
-        tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
-        List<String> partCols = tableMeta.partitionColumns.stream().map(col -> col.name).collect(Collectors.toList());
-        if (!partCols.isEmpty()) {
-            tableDesc.setPartitionColumn(partCols.get(0).toUpperCase(Locale.ROOT));
-        } else {
-            tableDesc.setPartitionColumn(null);
-        }
-        StringBuilder partitionColumnBuilder = new StringBuilder();
-        for (int i = 0, n = tableMeta.partitionColumns.size(); i < n; i++) {
-            if (i > 0)
-                partitionColumnBuilder.append(", ");
-            partitionColumnBuilder.append(tableMeta.partitionColumns.get(i).name.toUpperCase(Locale.ROOT));
-        }
-
-        TableExtDesc tableExtDesc = new TableExtDesc();
-        tableExtDesc.setIdentity(tableDesc.getIdentity());
-        tableExtDesc.setUuid(RandomUtil.randomUUIDStr());
-        tableExtDesc.setLastModified(0);
-        tableExtDesc.init(prj);
-
-        tableExtDesc.addDataSourceProp(TableExtDesc.LOCATION_PROPERTY_KEY, tableMeta.sdLocation);
-        tableExtDesc.addDataSourceProp("owner", tableMeta.owner);
-        tableExtDesc.addDataSourceProp("create_time", tableMeta.createTime);
-        tableExtDesc.addDataSourceProp("last_access_time", tableMeta.lastAccessTime);
-        tableExtDesc.addDataSourceProp("partition_column", partitionColumnBuilder.toString());
-        tableExtDesc.addDataSourceProp("total_file_size", String.valueOf(tableMeta.fileSize));
-        tableExtDesc.addDataSourceProp("total_file_number", String.valueOf(tableMeta.fileNum));
-        tableExtDesc.addDataSourceProp("hive_inputFormat", tableMeta.sdInputFormat);
-        tableExtDesc.addDataSourceProp("hive_outputFormat", tableMeta.sdOutputFormat);
-        tableExtDesc.addDataSourceProp(TableExtDesc.S3_ROLE_PROPERTY_KEY, tableMeta.s3Role);
-        tableExtDesc.addDataSourceProp(TableExtDesc.S3_ENDPOINT_KEY, tableMeta.s3Endpoint);
-        return Pair.newPair(tableDesc, tableExtDesc);
-    }
-
-    @Override
-    public List<String> getRelatedKylinResources(TableDesc table) {
-        return Collections.emptyList();
-    }
-
-    @Override
-    public boolean checkDatabaseAccess(String database) throws Exception {
-        boolean hiveDBAccessFilterEnable = KapConfig.getInstanceFromEnv().getDBAccessFilterEnable();
-        if (hiveDBAccessFilterEnable) {
-            logger.info("Check database {} access start.", database);
-            try {
-                Database db = SparderEnv.getSparkSession().catalog().getDatabase(database);
-            } catch (AnalysisException e) {
-                UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-                logger.info("The current user: {} does not have permission to access database {}", ugi.getUserName(),
-                        database);
-                return false;
-            }
-        }
-
-        return true;
-    }
-
-    @Override
-    public boolean checkTablesAccess(Set<String> tables) {
-        return tables.stream().allMatch(this::checkTableAccess);
-    }
-
-    @Override
-    public Set<String> getTablePartitions(String database, String table, String prj, String partCol) {
-        return getTableMetaExplorer().checkAndGetTablePartitions(database, table, partCol);
-    }
-
-    @Override
-    public void createSampleDatabase(String database) throws Exception {
-        SparderEnv.getSparkSession().sql(generateCreateSchemaSql(database));
-    }
-
-    @Override
-    public void createSampleTable(TableDesc table) throws Exception {
-        String[] createTableSqls = generateCreateTableSql(table);
-        for (String sql : createTableSqls) {
-            SparderEnv.getSparkSession().sql(sql);
-        }
-    }
-
-    @Override
-    public void loadSampleData(String tableName, String tableFileDir) throws Exception {
-        Dataset<Row> dataset = SparderEnv.getSparkSession().read().csv(tableFileDir + "/" + tableName + ".csv").toDF();
-        if (tableName.indexOf(".") > 0) {
-            tableName = tableName.substring(tableName.indexOf(".") + 1);
-        }
-        dataset.createOrReplaceTempView(tableName);
-    }
-
-    @Override
-    public void createWrapperView(String origTableName, String viewName) throws Exception {
-        throw new UnsupportedOperationException("unsupport create wrapper view");
-    }
-
-    public String getLoc(SparkSession spark, String table, String hiveSpecFsLocation) {
-        String loc = spark.sql("desc formatted " + table).where("col_name == 'Location'").head().getString(1);
-        if (null == hiveSpecFsLocation || null == loc) {
-            return loc;
-        }
-        return loc.replace("hdfs://hacluster", hiveSpecFsLocation);
-    }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java
deleted file mode 100644
index 6ab6643154..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.source;
-
-import java.io.IOException;
-import java.util.Locale;
-
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.IReadableTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NSparkTable implements IReadableTable {
-
-    private static final Logger logger = LoggerFactory.getLogger(NSparkTable.class);
-
-    final private String database;
-    final private String tableName;
-
-    public NSparkTable(TableDesc tableDesc) {
-        this.database = tableDesc.getDatabase();
-        this.tableName = tableDesc.getName();
-    }
-
-    @Override
-    public TableReader getReader() throws IOException {
-        return new NSparkTableReader(database, tableName);
-    }
-
-    @Override
-    public TableSignature getSignature() throws IOException {
-        // TODO: 07/12/2017 get modify time
-        String path = String.format(Locale.ROOT, "%s.%s", database, tableName);
-        long lastModified = System.currentTimeMillis(); // assume table is ever changing
-        int size = 0;
-        return new TableSignature(path, size, lastModified);
-    }
-
-    @Override
-    public boolean exists() {
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        return "spark: database=[" + database + "], table=[" + tableName + "]";
-    }
-
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java
deleted file mode 100644
index 29f48bfbfc..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import java.util.List;
-
-public class NSparkTableMeta {
-    public static class SparkTableColumnMeta {
-        String name;
-        String dataType;
-        String comment;
-
-        public SparkTableColumnMeta(String name, String dataType, String comment) {
-            this.name = name;
-            this.dataType = dataType;
-            this.comment = comment;
-        }
-
-        @Override
-        public String toString() {
-            return "SparkTableColumnMeta{" + "name='" + name + '\'' + ", dataType='" + dataType + '\'' + ", comment='"
-                    + comment + '\'' + '}';
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public String getDataType() {
-            return dataType;
-        }
-
-        public String getComment() {
-            return comment;
-        }
-    }
-
-    String tableName;
-    String sdLocation;//sd is short for storage descriptor
-    String sdInputFormat;
-    String sdOutputFormat;
-    String owner;
-    String provider;
-    String tableType;
-    String createTime;
-    String lastAccessTime;
-    long fileSize;
-    long fileNum;
-    boolean isNative;
-    List<SparkTableColumnMeta> allColumns;
-    List<SparkTableColumnMeta> partitionColumns;
-    boolean isTransactional;
-    boolean isRangePartition;
-    String s3Role;
-    String s3Endpoint;
-
-    public List<SparkTableColumnMeta> getAllColumns() {
-        return allColumns;
-    }
-
-    public NSparkTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat,
-            String owner, String provider, String tableType, String createTime, String lastAccessTime, long fileSize,
-            long fileNum, boolean isNative, List<SparkTableColumnMeta> allColumns,
-            List<SparkTableColumnMeta> partitionColumns, boolean isTransactional, boolean isRangePartition,
-            String s3Role, String s3Endpoint) {
-        this.tableName = tableName;
-        this.sdLocation = sdLocation;
-        this.sdInputFormat = sdInputFormat;
-        this.sdOutputFormat = sdOutputFormat;
-        this.owner = owner;
-        this.provider = provider;
-        this.tableType = tableType;
-        this.createTime = createTime;
-        this.lastAccessTime = lastAccessTime;
-        this.fileSize = fileSize;
-        this.fileNum = fileNum;
-        this.isNative = isNative;
-        this.allColumns = allColumns;
-        this.partitionColumns = partitionColumns;
-        this.isTransactional = isTransactional;
-        this.isRangePartition = isRangePartition;
-        this.s3Role = s3Role;
-        this.s3Endpoint = s3Endpoint;
-    }
-
-    @Override
-    public String toString() {
-        return "SparkTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\''
-                + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\''
-                + ", owner='" + owner + ", provider='" + provider + '\'' + ", tableType='" + tableType
-                + ", createTime='" + createTime + '\'' + ", lastAccessTime=" + lastAccessTime + ", fileSize=" + fileSize
-                + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns=" + allColumns
-                + ", partitionColumns=" + partitionColumns + ", isTransactional=" + isTransactional
-                + ", isRangePartition=" + isRangePartition + ", s3Role=" + s3Role + ", s3Endpoint=" + s3Endpoint + '}';
-    }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java
deleted file mode 100644
index 2d4a9eeae1..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.source;
-
-import java.util.List;
-
-import com.google.common.collect.Lists;
-
-public class NSparkTableMetaBuilder {
-    private String tableName;
-    private String sdLocation;
-    private String sdInputFormat;
-    private String sdOutputFormat;
-    private String owner;
-    private String provider;
-    private String tableType;
-    private String createTime;
-    private String lastAccessTime;
-    private long fileSize;
-    private long fileNum;
-    private boolean isNative = true;
-    private List<NSparkTableMeta.SparkTableColumnMeta> allColumns = Lists.newArrayList();
-    private List<NSparkTableMeta.SparkTableColumnMeta> partitionColumns = Lists.newArrayList();
-    private boolean isTransactional = false;
-    private boolean isRangePartition = false;
-    private String s3Role;
-    private String s3Endpoint;
-
-    public NSparkTableMetaBuilder setTableName(String tableName) {
-        this.tableName = tableName;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setSdLocation(String sdLocation) {
-        this.sdLocation = sdLocation;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setSdInputFormat(String sdInputFormat) {
-        this.sdInputFormat = sdInputFormat;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setSdOutputFormat(String sdOutputFormat) {
-        this.sdOutputFormat = sdOutputFormat;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setOwner(String owner) {
-        this.owner = owner;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setProvider(String provider) {
-        this.provider = provider;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setTableType(String tableType) {
-        this.tableType = tableType;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setCreateTime(String createTime) {
-        this.createTime = createTime;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setLastAccessTime(String lastAccessTime) {
-        this.lastAccessTime = lastAccessTime;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setFileSize(long fileSize) {
-        this.fileSize = fileSize;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setFileNum(long fileNum) {
-        this.fileNum = fileNum;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setIsNative(boolean isNative) {
-        this.isNative = isNative;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setAllColumns(List<NSparkTableMeta.SparkTableColumnMeta> allColumns) {
-        this.allColumns = allColumns;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setPartitionColumns(List<NSparkTableMeta.SparkTableColumnMeta> partitionColumns) {
-        this.partitionColumns = partitionColumns;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setIsTransactional(boolean isTransactional) {
-        this.isTransactional = isTransactional;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setIsRangePartition(boolean isRangePartition) {
-        this.isRangePartition = isRangePartition;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setS3Role(String s3Role) {
-        this.s3Role = s3Role;
-        return this;
-    }
-
-    public NSparkTableMetaBuilder setS3Endpoint(String s3Endpoint) {
-        this.s3Endpoint = s3Endpoint;
-        return this;
-    }
-
-    public NSparkTableMeta createSparkTableMeta() {
-        return new NSparkTableMeta(tableName, sdLocation, sdInputFormat, sdOutputFormat, owner, provider, tableType,
-                createTime, lastAccessTime, fileSize, fileNum, isNative, allColumns, partitionColumns, isTransactional,
-                isRangePartition, s3Role, s3Endpoint);
-    }
-}
\ No newline at end of file
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
deleted file mode 100644
index 033edb84f9..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import java.io.Serializable;
-import java.net.URI;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.commons.jnet.Installer;
-import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
-import org.apache.spark.sql.SparderEnv;
-import org.apache.spark.sql.catalyst.TableIdentifier;
-import org.apache.spark.sql.catalyst.catalog.CatalogTable;
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
-import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import scala.Option;
-import scala.collection.JavaConversions;
-
-public class NSparkTableMetaExplorer implements Serializable {
-
-    private static final Logger logger = LoggerFactory.getLogger(NSparkTableMetaExplorer.class);
-
-    enum PROVIDER {
-        HIVE("hive"), UNSPECIFIED("");
-
-        private static final PROVIDER[] ALL = new PROVIDER[] { HIVE };
-        private String value;
-
-        PROVIDER(String value) {
-            this.value = value;
-        }
-
-        public static PROVIDER fromString(Option<String> value) {
-            if (value.isEmpty()) {
-                return UNSPECIFIED;
-            }
-
-            for (PROVIDER provider : ALL) {
-                if (provider.value.equals(value.get())) {
-                    return provider;
-                }
-            }
-            return UNSPECIFIED;
-        }
-    }
-
-    private static final List<String> UNSUPOORT_TYPE = Lists.newArrayList("array", "map", "struct", "binary");
-    private static final String CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING";
-    public static final String S3_ROLE_PROPERTY_KEY = "role";
-    public static final String S3_ENDPOINT_PROPERTY_KEY = "s3_endpoint";
-
-    public NSparkTableMeta getSparkTableMeta(String database, String tableName) {
-        SessionCatalog catalog = SparderEnv.getSparkSession().sessionState().catalog();
-        TableIdentifier tableIdentifier = TableIdentifier.apply(tableName,
-                Option.apply(database.isEmpty() ? null : database));
-        CatalogTable tableMetadata = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
-        checkTableIsValid(tableMetadata, tableIdentifier, tableName);
-        return getSparkTableMeta(tableName, tableMetadata);
-    }
-
-    public Set<String> checkAndGetTablePartitions(String database, String tableName, String partitionCol) {
-        SessionCatalog catalog = SparderEnv.getSparkSession().sessionState().catalog();
-        TableIdentifier tableIdentifier = TableIdentifier.apply(tableName,
-                Option.apply(database.isEmpty() ? null : database));
-
-        CatalogTable tableMetadata = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
-
-        String firstPartCol = tableMetadata.partitionColumnNames().isEmpty() ? null
-                : tableMetadata.partitionColumnNames().head().toLowerCase(Locale.ROOT);
-
-        if (!partitionCol.equalsIgnoreCase(firstPartCol)) {
-            throw new IllegalArgumentException(
-                    String.format(Locale.ROOT, "table partition col %s not match col %s", firstPartCol, partitionCol));
-        }
-        return JavaConversions.seqAsJavaList(catalog.listPartitions(tableIdentifier, Option.empty())).stream()
-                .map(item -> JavaConversions.mapAsJavaMap(item.spec()).entrySet().stream()
-                        .filter(entry -> partitionCol.equalsIgnoreCase(entry.getKey())) //
-                        .findFirst() //
-                        .map(Map.Entry::getValue) //
-                        .orElse(null))
-                .filter(Objects::nonNull).collect(Collectors.toSet());
-    }
-
-    private NSparkTableMeta getSparkTableMeta(String tableName, CatalogTable tableMetadata) {
-        NSparkTableMetaBuilder builder = new NSparkTableMetaBuilder();
-        builder.setTableName(tableName);
-        builder.setAllColumns(getColumns(tableMetadata, tableMetadata.schema()));
-        builder.setOwner(tableMetadata.owner());
-        builder.setCreateTime(tableMetadata.createTime() + "");
-        builder.setLastAccessTime(tableMetadata.lastAccessTime() + "");
-        builder.setTableType(tableMetadata.tableType().name());
-        builder.setPartitionColumns(getColumns(tableMetadata, tableMetadata.partitionSchema()));
-        builder.setIsRangePartition(isRangePartition(tableMetadata));
-        if (tableMetadata.storage().inputFormat().isDefined()) {
-            builder.setSdInputFormat(tableMetadata.storage().inputFormat().get());
-        }
-        if (tableMetadata.storage().outputFormat().isDefined()) {
-            builder.setSdOutputFormat(tableMetadata.storage().outputFormat().get());
-        }
-        Option<URI> uriOption = tableMetadata.storage().locationUri();
-        if (uriOption.isDefined()) {
-            builder.setSdLocation(uriOption.get().toString());
-        }
-        if (tableMetadata.provider().isDefined()) {
-            builder.setProvider(tableMetadata.provider().get());
-        }
-        if (tableMetadata.properties().contains("totalSize")) {
-            builder.setFileSize(Long.parseLong(tableMetadata.properties().get("totalSize").get()));
-        }
-        if (tableMetadata.properties().contains("numFiles")) {
-            builder.setFileNum(Long.parseLong(tableMetadata.properties().get("numFiles").get()));
-        }
-        if (tableMetadata.properties().contains("transactional")) {
-            builder.setIsTransactional(Boolean.parseBoolean(tableMetadata.properties().get("transactional").get()));
-        }
-        if (tableMetadata.properties().contains(S3_ROLE_PROPERTY_KEY)) {
-            builder.setS3Role(tableMetadata.properties().get(S3_ROLE_PROPERTY_KEY).get());
-        }
-
-        if (tableMetadata.properties().contains(S3_ENDPOINT_PROPERTY_KEY)) {
-            builder.setS3Endpoint(tableMetadata.properties().get(S3_ENDPOINT_PROPERTY_KEY).get());
-        }
-        return builder.createSparkTableMeta();
-    }
-
-    private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable tableMetadata, StructType schema) {
-        return getColumns(tableMetadata, schema, true);
-    }
-
-    private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable tableMetadata, StructType schema,
-            boolean isCheckRepeatColumn) {
-        List<NSparkTableMeta.SparkTableColumnMeta> allColumns = Lists.newArrayListWithCapacity(schema.size());
-        Set<String> columnCacheTemp = Sets.newHashSet();
-        for (org.apache.spark.sql.types.StructField field : schema.fields()) {
-            String type = field.dataType().simpleString();
-            if (field.metadata().contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)) {
-                type = field.metadata().getString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY);
-            }
-            String finalType = type;
-            if (UNSUPOORT_TYPE.stream().anyMatch(finalType::contains)) {
-                logger.info("Load table {} ignore column {}:{}", tableMetadata.identifier().identifier(), field.name(),
-                        finalType);
-                continue;
-            }
-            if (isCheckRepeatColumn && columnCacheTemp.contains(field.name())) {
-                logger.info("The【{}】column is already included and does not need to be added again", field.name());
-                continue;
-            }
-            columnCacheTemp.add(field.name());
-            allColumns.add(new NSparkTableMeta.SparkTableColumnMeta(field.name(), type,
-                    field.getComment().isDefined() ? field.getComment().get() : null));
-        }
-
-        return allColumns;
-    }
-
-    private void checkTableIsValid(CatalogTable tableMetadata, TableIdentifier tableIdentifier, String tableName) {
-        if (CatalogTableType.VIEW().equals(tableMetadata.tableType())) {
-            try {
-                Installer.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
-                SparderEnv.getSparkSession().table(tableIdentifier).queryExecution().analyzed();
-            } catch (Throwable e) {
-                logger.error("Error for parser view: " + tableName, e);
-                throw new RuntimeException("Error for parser view: " + tableName + ", " + e.getMessage()
-                        + "(There are maybe syntactic differences between HIVE and SparkSQL)", e);
-            }
-        }
-    }
-
-    private Boolean isRangePartition(CatalogTable tableMetadata) {
-        List<NSparkTableMeta.SparkTableColumnMeta> allColumns = getColumns(tableMetadata, tableMetadata.schema(),
-                false);
-        return allColumns.stream().collect(Collectors.groupingBy(p -> p.name)).values().stream()
-                .anyMatch(p -> p.size() > 1);
-    }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java
deleted file mode 100644
index 669f418c9c..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-
-import org.apache.kylin.source.IReadableTable.TableReader;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparderEnv;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.types.StructField;
-
-public class NSparkTableReader implements TableReader {
-    private String dbName;
-    private String tableName;
-    private SparkSession ss;
-    private List<Row> records;
-    private Iterator<Row> iterator;
-    private Row currentRow;
-
-    public NSparkTableReader(String dbName, String tableName) {
-        this.dbName = dbName;
-        this.tableName = tableName;
-        initialize();
-    }
-
-    public static String[] getRowAsStringArray(Row record) {
-        StructField[] fields = record.schema().fields();
-        String[] arr = new String[fields.length];
-        for (int i = 0; i < arr.length; i++) {
-            Object o = record.get(i);
-            arr[i] = (o == null) ? null : o.toString();
-        }
-        return arr;
-    }
-
-    private void initialize() {
-        ss = SparderEnv.getSparkSession();
-        String master = ss.sparkContext().master();
-        String tableIdentity = tableName;
-        // spark sql can not add the database prefix when create tempView from csv, but when working with hive, it need the database prefix
-        if (!master.toLowerCase(Locale.ROOT).contains("local")) {
-            tableIdentity = String.format(Locale.ROOT, "%s.%s", dbName, tableName);
-        }
-        records = SparkSqlUtil.queryAll(ss, tableIdentity);
-        iterator = records.iterator();
-    }
-
-    @Override
-    public boolean next() throws IOException {
-        boolean hasNext = iterator != null && iterator.hasNext();
-        if (hasNext) {
-            currentRow = iterator.next();
-        }
-        return hasNext;
-    }
-
-    @Override
-    public String[] getRow() {
-        return getRowAsStringArray(currentRow);
-    }
-
-    @Override
-    public void close() throws IOException {
-        this.records = null;
-        this.iterator = null;
-        this.currentRow = null;
-    }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java
deleted file mode 100644
index cea86bc9fa..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import java.util.List;
-import java.util.Locale;
-import java.util.Set;
-
-import org.apache.spark.sql.AnalysisException;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
-
-import com.google.common.collect.Sets;
-
-import lombok.val;
-
-public class SparkSqlUtil {
-    public static Dataset<Row> query(SparkSession ss, String sql) {
-        return ss.sql(sql);
-    }
-
-    public static List<Row> queryForList(SparkSession ss, String sql) {
-        return ss.sql(sql).collectAsList();
-    }
-
-    public static List<Row> queryAll(SparkSession ss, String table) {
-        String sql = String.format(Locale.ROOT, "select * from %s", table);
-        return queryForList(ss, sql);
-    }
-
-    public static Set<String> getViewOrignalTables(String viewName, SparkSession spark) throws AnalysisException {
-        String viewText = spark.sql("desc formatted " + viewName).where("col_name = 'View Text'").head().getString(1);
-        val logicalPlan = spark.sessionState().sqlParser().parsePlan(viewText);
-        Set<String> viewTables = Sets.newHashSet();
-        for (Object l : scala.collection.JavaConverters.seqAsJavaListConverter(logicalPlan.collectLeaves()).asJava()) {
-            if (l instanceof UnresolvedRelation) {
-                val tableName = ((UnresolvedRelation) l).tableName();
-                //if nested view
-                if (spark.catalog().getTable(tableName).tableType().equals(CatalogTableType.VIEW().name())) {
-                    viewTables.addAll(getViewOrignalTables(tableName, spark));
-                } else {
-                    viewTables.add(tableName);
-                }
-            }
-        }
-        return viewTables;
-    }
-}