You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2022/08/09 02:47:07 UTC
[kylin] branch kylin5 updated: KYLIN-5217 Create a initial commit
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/kylin5 by this push:
new 605b4f52ec KYLIN-5217 Create a initial commit
605b4f52ec is described below
commit 605b4f52ecd607fb6c6943d8014c1a2736254f17
Author: sunbiaobiao <su...@gmail.com>
AuthorDate: Tue Aug 9 10:47:02 2022 +0800
KYLIN-5217 Create a initial commit
---
.../kap/engine/spark/job/EnviromentAdaptor.java | 29 -
.../kap/engine/spark/job/IJobProgressReport.java | 37 --
.../kap/engine/spark/job/ISparkJobHandler.java | 47 --
.../kap/engine/spark/job/ParamsConstants.java | 29 -
.../kap/engine/spark/job/SparkAppDescription.java | 53 --
.../engine/spark/application/SparkApplication.java | 620 ---------------------
.../kylin/engine/spark/application/SparkEntry.java | 27 -
.../spark/source/NSparkCubingSourceInput.java | 77 ---
.../engine/spark/source/NSparkDataSource.java | 79 ---
.../spark/source/NSparkMetadataExplorer.java | 313 -----------
.../kylin/engine/spark/source/NSparkTable.java | 65 ---
.../kylin/engine/spark/source/NSparkTableMeta.java | 111 ----
.../spark/source/NSparkTableMetaBuilder.java | 140 -----
.../spark/source/NSparkTableMetaExplorer.java | 203 -------
.../engine/spark/source/NSparkTableReader.java | 87 ---
.../kylin/engine/spark/source/SparkSqlUtil.java | 66 ---
16 files changed, 1983 deletions(-)
diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java
deleted file mode 100644
index 0301d20c91..0000000000
--- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/EnviromentAdaptor.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.kyligence.kap.engine.spark.job;
-
-import java.util.Map;
-
-import org.apache.spark.sql.SparkSession;
-
-public interface EnviromentAdaptor {
-
-
- Boolean prepareEnviroment(SparkSession spark, Map<String, String> params);
-}
diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java
deleted file mode 100644
index 9e5e58f20f..0000000000
--- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/IJobProgressReport.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.kyligence.kap.engine.spark.job;
-
-import java.util.Map;
-
-public interface IJobProgressReport {
-
- boolean updateSparkJobInfo(Map<String, String> params, String url, String json);
-
- boolean updateSparkJobExtraInfo(Map<String, String> params, String url, String project, String jobId,
- Map<String, String> extraInfo);
-
- default boolean executeFinish(Map<String, String> params, String project, String jobId) {
- return true;
- }
-
- default void initArgsParams(Map<String, String> argsParams) {
- }
-
-}
diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java
deleted file mode 100644
index c167b52597..0000000000
--- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ISparkJobHandler.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.kyligence.kap.engine.spark.job;
-
-import java.util.Map;
-import java.util.Properties;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.exception.ExecuteException;
-
-public interface ISparkJobHandler {
-
- void killOrphanApplicationIfExists(String project, String jobStepId, KylinConfig config,
- Map<String, String> sparkConf);
-
- void checkApplicationJar(KylinConfig config) throws ExecuteException;
-
- String createArgsFileOnRemoteFileSystem(KylinConfig config, String project, String jobId,
- Map<String, String> params) throws ExecuteException;
-
- Object generateSparkCmd(KylinConfig config, SparkAppDescription desc);
-
- default void modifyDump(Properties props) {
- }
-
- default void prepareEnviroment(String project, String jobStepId, Map<String, String> params) {
- }
-
- Map<String, String> runSparkSubmit(Object cmd, String parentId) throws ExecuteException;
-
-}
diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java
deleted file mode 100644
index e218d319c7..0000000000
--- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/ParamsConstants.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.kyligence.kap.engine.spark.job;
-
-public class ParamsConstants {
-
- private ParamsConstants() {
- throw new IllegalStateException("Utility class");
- }
-
- public static final String TIME_OUT = "time_out";
- public static final String JOB_TMP_DIR = "job_tmp_dir";
-}
diff --git a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java b/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java
deleted file mode 100644
index 8f105a5172..0000000000
--- a/src/spark-project/engine-build-sdk/src/main/java/io/kyligence/kap/engine/spark/job/SparkAppDescription.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package io.kyligence.kap.engine.spark.job;
-
-import java.util.Map;
-import java.util.Set;
-
-import lombok.Data;
-
-@Data
-public class SparkAppDescription {
-
- private String hadoopConfDir;
-
- private String kylinJobJar;
-
- private String appArgs;
-
- private String jobNamePrefix;
-
- private String project;
-
- private String jobId;
-
- private int stepId;
-
- private String sparkSubmitClassName;
-
- private Map<String, String> sparkConf;
-
- private Set<String> sparkJars;
-
- private Set<String> sparkFiles;
-
- private String comma;
-
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
deleted file mode 100644
index cac224d44b..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ /dev/null
@@ -1,620 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.application;
-
-import static org.apache.kylin.engine.spark.job.StageType.WAITE_FOR_RESOURCE;
-import static org.apache.kylin.engine.spark.utils.SparkConfHelper.COUNT_DISTICT;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.kylin.cluster.IClusterManager;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.exception.KylinException;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.metadata.MetadataStore;
-import org.apache.kylin.common.util.Application;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.TimeZoneUtils;
-import org.apache.kylin.common.util.Unsafe;
-import org.apache.kylin.engine.spark.job.BuildJobInfos;
-import org.apache.kylin.engine.spark.job.KylinBuildEnv;
-import org.apache.kylin.engine.spark.job.LogJobInfoUtils;
-import org.apache.kylin.engine.spark.job.NSparkCubingUtil;
-import org.apache.kylin.engine.spark.job.ResourceDetect;
-import org.apache.kylin.engine.spark.job.RestfulJobProgressReport;
-import org.apache.kylin.engine.spark.job.SegmentBuildJob;
-import org.apache.kylin.engine.spark.job.SparkJobConstants;
-import org.apache.kylin.engine.spark.job.UdfManager;
-import org.apache.kylin.engine.spark.scheduler.ClusterMonitor;
-import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
-import org.apache.kylin.engine.spark.utils.SparkConfHelper;
-import org.apache.kylin.metadata.cube.model.NBatchConstants;
-import org.apache.kylin.metadata.model.NDataModel;
-import org.apache.kylin.metadata.model.NDataModelManager;
-import org.apache.kylin.metadata.model.NTableMetadataManager;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.query.pushdown.SparkSubmitter;
-import org.apache.kylin.query.util.PushDownUtil;
-import org.apache.spark.SparkConf;
-import org.apache.spark.application.NoRetryException;
-import org.apache.spark.launcher.SparkLauncher;
-import org.apache.spark.sql.KylinSession;
-import org.apache.spark.sql.KylinSession$;
-import org.apache.spark.sql.SparderEnv;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.SparkSessionExtensions;
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
-import org.apache.spark.sql.catalyst.rules.Rule;
-import org.apache.spark.sql.execution.datasource.AlignmentTableStats;
-import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
-import org.apache.spark.util.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-import io.kyligence.kap.engine.spark.job.EnviromentAdaptor;
-import io.kyligence.kap.engine.spark.job.IJobProgressReport;
-import io.kyligence.kap.engine.spark.job.ParamsConstants;
-import lombok.val;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-public abstract class SparkApplication implements Application {
- private static final Logger logger = LoggerFactory.getLogger(SparkApplication.class);
- private Map<String, String> params = Maps.newHashMap();
- public static final String JOB_NAME_PREFIX = "job_step_";
- private IJobProgressReport report;
-
- protected volatile KylinConfig config;
- protected volatile String jobId;
- protected String project;
- protected int layoutSize = -1;
- protected BuildJobInfos infos;
- /**
- * path for spark app args on HDFS
- */
- protected String path;
-
- private ClusterMonitor clusterMonitor;
- private final AtomicLong atomicDisconnectSparkMasterTimes = new AtomicLong(0);
- private final AtomicBoolean atomicUnreachableSparkMaster = new AtomicBoolean(false);
- private final AtomicReference<SparkConf> atomicSparkConf = new AtomicReference<>(null);
- private final AtomicReference<SparkSession> atomicSparkSession = new AtomicReference<>(null);
- private final AtomicReference<KylinBuildEnv> atomicBuildEnv = new AtomicReference<>(null);
-
- public void execute(String[] args) {
- try {
- path = args[0];
- String argsLine = readArgsFromHDFS();
- params = JsonUtil.readValueAsMap(argsLine);
- logger.info("Execute {} with args : {}", this.getClass().getName(), argsLine);
- execute();
- } catch (Exception e) {
- throw new RuntimeException("Error execute " + this.getClass().getName(), e);
- }
- }
-
- public AtomicBoolean getAtomicUnreachableSparkMaster() {
- return atomicUnreachableSparkMaster;
- }
-
- public final Map<String, String> getParams() {
- return this.params;
- }
-
- public final String getParam(String key) {
- return this.params.get(key);
- }
-
- public final void setParam(String key, String value) {
- this.params.put(key, value);
- }
-
- public final boolean contains(String key) {
- return params.containsKey(key);
- }
-
- public String getJobId() {
- return jobId;
- }
-
- public String getProject() {
- return project;
- }
-
- public KylinConfig getConfig() {
- return config;
- }
-
- public IJobProgressReport getReport() {
- if (report == null)
- return new RestfulJobProgressReport();
- return report;
- }
-
- /// backwards compatibility, must have been initialized before invoking #doExecute.
- protected SparkSession ss;
-
- public SparkSession getSparkSession() throws NoRetryException {
- SparkSession sparkSession = atomicSparkSession.get();
- if (Objects.isNull(sparkSession)) {
- // shouldn't reach here
- throw new NoRetryException("spark session shouldn't be null");
- }
- return sparkSession;
- }
-
- public String readArgsFromHDFS() {
- val fs = HadoopUtil.getFileSystem(path);
- String argsLine = null;
- Path filePath = new Path(path);
- try (FSDataInputStream inputStream = fs.open(filePath)) {
- BufferedReader br = new BufferedReader(new InputStreamReader(inputStream, Charset.defaultCharset()));
- argsLine = br.readLine();
- } catch (IOException e) {
- logger.error("Error occurred when reading args file: {}", path, e);
- }
- return argsLine;
- }
-
- /**
- * get tracking url by application id
- *
- * @param sparkSession build sparkSession
- * @return
- */
- public String getTrackingUrl(IClusterManager clusterManager, SparkSession sparkSession) {
- return clusterManager.getBuildTrackingUrl(sparkSession);
- }
-
- private String tryReplaceHostAddress(String url) {
- String originHost = null;
- try {
- URI uri = URI.create(url);
- originHost = uri.getHost();
- String hostAddress = InetAddress.getByName(originHost).getHostAddress();
- return url.replace(originHost, hostAddress);
- } catch (UnknownHostException uhe) {
- logger.error("failed to get the ip address of {}, step back to use the origin tracking url.", originHost,
- uhe);
- return url;
- }
- }
-
- private Map<String, String> getTrackingInfo(SparkSession sparkSession, boolean ipAddressPreferred) {
- IClusterManager clusterManager = atomicBuildEnv.get().clusterManager();
- String applicationId = sparkSession.sparkContext().applicationId();
- Map<String, String> extraInfo = new HashMap<>();
- extraInfo.put("yarn_app_id", applicationId);
- try {
- String trackingUrl = getTrackingUrl(clusterManager, sparkSession);
- if (StringUtils.isBlank(trackingUrl)) {
- logger.warn("Get tracking url of application {}, but empty url found.", applicationId);
- return extraInfo;
- }
- if (ipAddressPreferred) {
- trackingUrl = tryReplaceHostAddress(trackingUrl);
- }
- extraInfo.put("yarn_app_url", trackingUrl);
- } catch (Exception e) {
- logger.error("get tracking url failed!", e);
- }
- return extraInfo;
- }
-
- protected void exchangeSparkSession() {
- exchangeSparkSession(atomicSparkConf.get());
- }
-
- protected final void execute() throws Exception {
- String hdfsMetalUrl = getParam(NBatchConstants.P_DIST_META_URL);
- jobId = getParam(NBatchConstants.P_JOB_ID);
- project = getParam(NBatchConstants.P_PROJECT_NAME);
- if (getParam(NBatchConstants.P_LAYOUT_IDS) != null) {
- layoutSize = StringUtils.split(getParam(NBatchConstants.P_LAYOUT_IDS), ",").length;
- }
- try (KylinConfig.SetAndUnsetThreadLocalConfig autoCloseConfig = KylinConfig
- .setAndUnsetThreadLocalConfig(KylinConfig.loadKylinConfigFromHdfs(hdfsMetalUrl))) {
- config = autoCloseConfig.get();
- report = (IJobProgressReport) ClassUtil.newInstance(config.getBuildJobProgressReporter());
- report.initArgsParams(getParams());
- //// KylinBuildEnv
- final KylinBuildEnv buildEnv = KylinBuildEnv.getOrCreate(config);
- atomicBuildEnv.set(buildEnv);
- infos = buildEnv.buildJobInfos();
- infos.recordJobId(jobId);
- infos.recordProject(project);
- infos.recordJobStepId(System.getProperty("spark.driver.param.taskId", jobId));
-
- monitorSparkMaster();
-
- HadoopUtil.setCurrentConfiguration(new Configuration());
- ////////
- exchangeSparkConf(buildEnv.sparkConf());
-
- TimeZoneUtils.setDefaultTimeZone(config);
-
- /// wait until resource is enough
- waiteForResource(atomicSparkConf.get(), buildEnv);
-
- ///
- logger.info("Prepare job environment");
- prepareSparkSession();
-
- /// backwards compatibility
- ss = getSparkSession();
- val master = ss.conf().get(SparkLauncher.SPARK_MASTER, "");
- if (!master.equals("local")) {
- EnviromentAdaptor adaptor = (EnviromentAdaptor) ClassUtil
- .newInstance(config.getBuildJobEnviromentAdaptor());
- adaptor.prepareEnviroment(ss, params);
- }
-
- if (config.useDynamicS3RoleCredentialInTable()) {
- val tableMetadataManager = NTableMetadataManager.getInstance(config, project);
- tableMetadataManager.listAllTables().forEach(tableDesc -> SparderEnv
- .addS3CredentialFromTableToSpark(tableMetadataManager.getOrCreateTableExt(tableDesc), ss));
- }
-
- if (!config.isUTEnv()) {
- Unsafe.setProperty("kylin.env", config.getDeployEnv());
- }
- logger.info("Start job");
- infos.startJob();
- // should be invoked after method prepareSparkSession
- extraInit();
-
- waiteForResourceSuccess();
- doExecute();
- // Output metadata to another folder
- val resourceStore = ResourceStore.getKylinMetaStore(config);
- val outputConfig = KylinConfig.createKylinConfig(config);
- outputConfig.setMetadataUrl(getParam(NBatchConstants.P_OUTPUT_META_URL));
- MetadataStore.createMetadataStore(outputConfig).dump(resourceStore);
- } catch (Exception e) {
- handleException(e);
- } finally {
- if (infos != null) {
- infos.jobEnd();
- }
- destroySparkSession();
- extraDestroy();
- executeFinish();
- }
- }
-
- protected void handleException(Exception e) throws Exception {
- throw e;
- }
-
- private SparkSession createSpark(SparkConf sparkConf) {
- SparkSession.Builder sessionBuilder = SparkSession.builder()
- .withExtensions(new AbstractFunction1<SparkSessionExtensions, BoxedUnit>() {
- @Override
- public BoxedUnit apply(SparkSessionExtensions v1) {
- v1.injectPostHocResolutionRule(new AbstractFunction1<SparkSession, Rule<LogicalPlan>>() {
- @Override
- public Rule<LogicalPlan> apply(SparkSession session) {
- return new AlignmentTableStats(session);
- }
- });
- return BoxedUnit.UNIT;
- }
- }).enableHiveSupport().config(sparkConf)
- .config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false");
-
- // If this is UT and SparkSession is already created, then use SparkSession.
- // Otherwise, we always use KylinSession
- boolean createWithSparkSession = !isJobOnCluster(sparkConf) && SparderEnv.isSparkAvailable();
- if (createWithSparkSession) {
- boolean isKylinSession = SparderEnv.getSparkSession() instanceof KylinSession;
- createWithSparkSession = !isKylinSession;
- }
-
- if (createWithSparkSession) {
- return sessionBuilder.getOrCreate();
- } else {
- return KylinSession$.MODULE$.KylinBuilder(sessionBuilder).buildCluster().getOrCreateKylinSession();
- }
- }
-
- public boolean isJobOnCluster(SparkConf conf) {
- return !Utils.isLocalMaster(conf) && !config.isUTEnv();
- }
-
- protected void extraInit() {
- }
-
- public void extraDestroy() {
- if (clusterMonitor != null) {
- clusterMonitor.shutdown();
- }
- }
-
- protected abstract void doExecute() throws Exception;
-
- protected void onLayoutFinished(long layoutId) {
- //do nothing
- }
-
- protected void onExecuteFinished() {
- //do nothing
- }
-
- protected String calculateRequiredCores() throws Exception {
- return SparkJobConstants.DEFAULT_REQUIRED_CORES;
- }
-
- private void autoSetSparkConf(SparkConf sparkConf) throws Exception {
- SparkConfHelper helper = new SparkConfHelper();
- // copy user defined spark conf
- if (sparkConf.getAll() != null) {
- Arrays.stream(sparkConf.getAll()).forEach(config -> helper.setConf(config._1, config._2));
- }
- helper.setClusterManager(KylinBuildEnv.get().clusterManager());
-
- chooseContentSize(helper);
-
- helper.setOption(SparkConfHelper.LAYOUT_SIZE, Integer.toString(layoutSize));
- helper.setOption(SparkConfHelper.REQUIRED_CORES, calculateRequiredCores());
- helper.setConf(COUNT_DISTICT, hasCountDistinct().toString());
- helper.generateSparkConf();
- helper.applySparkConf(sparkConf);
- }
-
- private void waiteForResource(SparkConf sparkConf, KylinBuildEnv buildEnv) throws Exception {
- val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
- infos.recordStageId(waiteForResource.getId());
- waiteForResource.execute();
- }
-
- protected void waiteForResourceSuccess() throws Exception {
- val waiteForResource = WAITE_FOR_RESOURCE.create(this, null, null);
- waiteForResource.onStageFinished(true);
- infos.recordStageId("");
- }
-
- protected void executeFinish() {
- try {
- getReport().executeFinish(getReportParams(), project, getJobId());
- } catch (Exception e) {
- logger.error("executeFinish failed", e);
- }
- }
-
- protected void chooseContentSize(SparkConfHelper helper) {
- Path shareDir = config.getJobTmpShareDir(project, jobId);
- // add content size with unit
- helper.setOption(SparkConfHelper.SOURCE_TABLE_SIZE, chooseContentSize(shareDir));
- }
-
- protected boolean checkRangePartitionTableIsExist(NDataModel modelDesc) {
- return modelDesc.getAllTableRefs().stream().anyMatch(p -> p.getTableDesc().isRangePartition());
- }
-
- protected String chooseContentSize(Path shareDir) {
- // return size with unit
- return ResourceDetectUtils.getMaxResourceSize(shareDir) + "b";
- }
-
- protected Boolean hasCountDistinct() throws IOException {
- Path countDistinct = new Path(config.getJobTmpShareDir(project, jobId),
- ResourceDetectUtils.countDistinctSuffix());
- FileSystem fileSystem = countDistinct.getFileSystem(HadoopUtil.getCurrentConfiguration());
- Boolean exist;
- if (fileSystem.exists(countDistinct)) {
- exist = ResourceDetectUtils.readResourcePathsAs(countDistinct);
- } else {
- exist = false;
- logger.debug("File count_distinct.json doesn't exist, set hasCountDistinct to false.");
- }
- logger.debug("Exist count distinct measure: {}", exist);
- return exist;
- }
-
- public void logJobInfo() {
- try {
- logger.info(generateInfo());
- if (KylinConfig.getInstanceFromEnv().skipRecordJobExecutionTime()) {
- logger.info("skip record job wait and run time");
- return;
- }
- Map<String, String> extraInfo = new HashMap<>();
- extraInfo.put("yarn_job_wait_time", ((Long) KylinBuildEnv.get().buildJobInfos().waitTime()).toString());
- extraInfo.put("yarn_job_run_time", ((Long) KylinBuildEnv.get().buildJobInfos().buildTime()).toString());
-
- getReport().updateSparkJobExtraInfo(getReportParams(), "/kylin/api/jobs/wait_and_run_time", project, jobId,
- extraInfo);
- } catch (Exception e) {
- logger.warn("Error occurred when generate job info.", e);
- }
- }
-
- private Map<String, String> getReportParams() {
- val reportParams = new HashMap<String, String>();
- reportParams.put(ParamsConstants.TIME_OUT, String.valueOf(config.getUpdateJobInfoTimeout()));
- reportParams.put(ParamsConstants.JOB_TMP_DIR, config.getJobTmpDir(project, true));
- return reportParams;
- }
-
- protected String generateInfo() {
- return LogJobInfoUtils.sparkApplicationInfo();
- }
-
- public Set<String> getIgnoredSnapshotTables() {
- return NSparkCubingUtil.toIgnoredTableSet(getParam(NBatchConstants.P_IGNORED_SNAPSHOT_TABLES));
- }
-
- protected Map<String, String> getSparkConfigOverride(KylinConfig config) {
- return config.getSparkConfigOverride();
- }
-
- protected void checkDateFormatIfExist(String project, String modelId) throws Exception {
- if (config.isUTEnv()) {
- return;
- }
- val modelManager = NDataModelManager.getInstance(config, project);
- NDataModel modelDesc = modelManager.getDataModelDesc(modelId);
- if (checkRangePartitionTableIsExist(modelDesc)) {
- logger.info("Range partitioned tables do not support pushdown, so do not need to perform subsequent logic");
- return;
- }
-
- val partitionDesc = modelDesc.getPartitionDesc();
- if (PartitionDesc.isEmptyPartitionDesc(partitionDesc)
- || org.apache.commons.lang.StringUtils.isEmpty(partitionDesc.getPartitionDateFormat()))
- return;
-
- if (CatalogTableType.VIEW().name().equals(modelDesc.getRootFactTable().getTableDesc().getTableType()))
- return;
-
- String partitionColumn = modelDesc.getPartitionDesc().getPartitionDateColumnRef().getExpressionInSourceDB();
-
- SparkSession sparkSession = atomicSparkSession.get();
- try (SparkSubmitter.OverriddenSparkSession ignored = SparkSubmitter.getInstance()
- .overrideSparkSession(sparkSession)) {
- String dateString = PushDownUtil.getFormatIfNotExist(modelDesc.getRootFactTableName(), partitionColumn,
- project);
- val sdf = new SimpleDateFormat(modelDesc.getPartitionDesc().getPartitionDateFormat(),
- Locale.getDefault(Locale.Category.FORMAT));
- val date = sdf.parse(dateString);
- if (date == null || !dateString.equals(sdf.format(date))) {
- throw new NoRetryException("date format not match");
- }
- } catch (KylinException ignore) {
- // ignore it when pushdown return empty row
- } catch (ParseException | NoRetryException e) {
- throw new NoRetryException("date format not match");
- }
- }
-
- private void exchangeSparkConf(SparkConf sparkConf) throws Exception {
- if (isJobOnCluster(sparkConf) && !(this instanceof ResourceDetect)) {
- Map<String, String> baseSparkConf = getSparkConfigOverride(config);
- if (!baseSparkConf.isEmpty()) {
- baseSparkConf.forEach(sparkConf::set);
- String baseSparkConfStr = JsonUtil.writeValueAsString(baseSparkConf);
- logger.info("Override user-defined spark conf: {}", baseSparkConfStr);
- }
- if (config.isAutoSetSparkConf()) {
- logger.info("Set spark conf automatically.");
- try {
- autoSetSparkConf(sparkConf);
- } catch (Exception e) {
- logger.warn("Auto set spark conf failed. Load spark conf from system properties", e);
- }
- }
- }
-
- atomicSparkConf.set(sparkConf);
- }
-
- private void exchangeSparkSession(SparkConf sparkConf) {
- SparkSession sparkSession = atomicSparkSession.get();
- if (Objects.nonNull(sparkSession)) {
- // destroy previous spark session
- destroySparkSession();
- }
-
- sparkSession = createSpark(sparkConf);
- if (!config.isUTEnv() && !sparkConf.get("spark.master").startsWith("k8s")) {
- getReport().updateSparkJobExtraInfo(getReportParams(), "/kylin/api/jobs/spark", project, jobId,
- getTrackingInfo(sparkSession, config.isTrackingUrlIpAddressEnabled()));
- }
-
- // for spark metrics
- JobMetricsUtils.registerListener(sparkSession);
- SparderEnv.registerListener(sparkSession.sparkContext());
-
- //#8341
- SparderEnv.setSparkSession(sparkSession);
- UdfManager.create(sparkSession);
-
- ///
- atomicSparkSession.set(sparkSession);
- }
-
- private void prepareSparkSession() throws NoRetryException {
- SparkConf sparkConf = atomicSparkConf.get();
- if (Objects.isNull(sparkConf)) {
- // shouldn't reach here
- throw new NoRetryException("spark conf shouldn't be null");
- }
-
- /// SegmentBuildJob only!!!
- if (config.isSnapshotSpecifiedSparkConf() && (this instanceof SegmentBuildJob)) {
- // snapshot specified spark conf, based on the exchanged spark conf.
- SparkConf clonedSparkConf = sparkConf.clone();
- Map<String, String> snapshotSparkConf = config.getSnapshotBuildingConfigOverride();
- snapshotSparkConf.forEach(clonedSparkConf::set);
- logger.info("exchange sparkSession using snapshot specified sparkConf");
- exchangeSparkSession(clonedSparkConf);
- return;
- }
- // normal logic
- exchangeSparkSession(sparkConf);
- }
-
- private void destroySparkSession() {
- SparkSession sparkSession = atomicSparkSession.get();
- if (Objects.isNull(sparkSession)) {
- logger.info("no initialized sparkSession instance");
- return;
- }
- if (sparkSession.conf().get("spark.master").startsWith("local")) {
- // for UT use? but very strange for resource detect mode (spark local).
- return;
- }
- JobMetricsUtils.unRegisterListener(sparkSession);
- sparkSession.stop();
- }
-
- private void monitorSparkMaster() {
- clusterMonitor = new ClusterMonitor();
- clusterMonitor.monitorSparkMaster(atomicBuildEnv, atomicSparkSession, atomicDisconnectSparkMasterTimes,
- atomicUnreachableSparkMaster);
- }
-
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java
deleted file mode 100644
index 31974f65bc..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/application/SparkEntry.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.application;
-
-import org.apache.spark.application.JobWorkSpace;
-
-public class SparkEntry {
- public static void main(String[] args) {
- JobWorkSpace.execute(args);
- }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
deleted file mode 100644
index 5411c31bbd..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkCubingSourceInput.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import static org.apache.kylin.engine.spark.stats.utils.HiveTableRefChecker.isNeedCreateHiveTemporaryTable;
-
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.engine.spark.NSparkCubingEngine;
-import org.apache.kylin.engine.spark.job.KylinBuildEnv;
-import org.apache.kylin.engine.spark.utils.HiveTransactionTableHelper;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.types.StructType;
-import org.apache.spark.sql.util.SparderTypeUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
-
-public class NSparkCubingSourceInput implements NSparkCubingEngine.NSparkCubingSource {
- private static final Logger logger = LoggerFactory.getLogger(NSparkCubingSourceInput.class);
-
- @Override
- public Dataset<Row> getSourceData(TableDesc table, SparkSession ss, Map<String, String> params) {
- ColumnDesc[] columnDescs = table.getColumns();
- List<String> tblColNames = Lists.newArrayListWithCapacity(columnDescs.length);
- StructType kylinSchema = new StructType();
- for (ColumnDesc columnDesc : columnDescs) {
- if (!columnDesc.isComputedColumn()) {
- kylinSchema = kylinSchema.add(columnDesc.getName(),
- SparderTypeUtil.toSparkType(columnDesc.getType(), false), true);
- tblColNames.add("`" + columnDesc.getName() + "`");
- }
- }
- String[] colNames = tblColNames.toArray(new String[0]);
- String colString = Joiner.on(",").join(colNames);
- String sql;
- KylinConfig kylinConfig = KylinBuildEnv.get().kylinConfig();
- logger.info("isRangePartition:{};isTransactional:{};isReadTransactionalTableEnabled:{}",
- table.isRangePartition(), table.isTransactional(), kylinConfig.isReadTransactionalTableEnabled());
- if (isNeedCreateHiveTemporaryTable(table.isRangePartition(), table.isTransactional(),
- kylinConfig.isReadTransactionalTableEnabled())) {
- sql = HiveTransactionTableHelper.doGetQueryHiveTemporaryTableSql(table, params, colString,
- KylinBuildEnv.get());
- } else {
- sql = String.format(Locale.ROOT, "select %s from %s", colString, table.getIdentity());
- }
- Dataset<Row> df = ss.sql(sql);
- StructType sparkSchema = df.schema();
- logger.debug("Source data sql is: {}", sql);
- logger.debug("Kylin schema: {}", kylinSchema.treeString());
- return df.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema, kylinSchema));
- }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java
deleted file mode 100644
index 6a501bc71e..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkDataSource.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.source;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.metadata.model.IBuildable;
-import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.IReadableTable;
-import org.apache.kylin.source.ISampleDataDeployer;
-import org.apache.kylin.source.ISource;
-import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.engine.spark.NSparkCubingEngine;
-
-public class NSparkDataSource implements ISource {
- // for reflection
- public NSparkDataSource(KylinConfig config) {
-
- }
-
- @Override
- public ISourceMetadataExplorer getSourceMetadataExplorer() {
- return new NSparkMetadataExplorer();
- }
-
- @Override
- public <I> I adaptToBuildEngine(Class<I> engineInterface) {
- if (engineInterface == NSparkCubingEngine.NSparkCubingSource.class) {
- return (I) new NSparkCubingSourceInput();
- } else {
- throw new IllegalArgumentException("Unsupported engine interface: " + engineInterface);
- }
- }
-
- @Override
- public IReadableTable createReadableTable(TableDesc tableDesc) {
- return new NSparkTable(tableDesc);
- }
-
- @Override
- public SegmentRange enrichSourcePartitionBeforeBuild(IBuildable buildable, SegmentRange srcPartition) {
- return srcPartition;
- }
-
- @Override
- public ISampleDataDeployer getSampleDataDeployer() {
- return new NSparkMetadataExplorer();
- }
-
- @Override
- public SegmentRange getSegmentRange(String start, String end) {
- start = StringUtils.isEmpty(start) ? "0" : start;
- end = StringUtils.isEmpty(end) ? "" + Long.MAX_VALUE : end;
- return new SegmentRange.TimePartitionedSegmentRange(Long.parseLong(start), Long.parseLong(end));
- }
-
- @Override
- public boolean supportBuildSnapShotByPartition() {
- return true;
- }
-
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
deleted file mode 100644
index 7e4f7deea0..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkMetadataExplorer.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.kylin.common.KapConfig;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.common.util.RandomUtil;
-import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.ISourceAware;
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.source.ISampleDataDeployer;
-import org.apache.kylin.source.ISourceMetadataExplorer;
-import org.apache.kylin.metadata.model.NTableMetadataManager;
-import org.apache.spark.sql.AnalysisException;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparderEnv;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalog.Database;
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
-import org.apache.spark.sql.internal.SQLConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.clearspring.analytics.util.Lists;
-import com.google.common.collect.Sets;
-
-import lombok.val;
-
-public class NSparkMetadataExplorer implements ISourceMetadataExplorer, ISampleDataDeployer, Serializable {
-
- private static final Logger logger = LoggerFactory.getLogger(NSparkMetadataExplorer.class);
-
- public static String generateCreateSchemaSql(String schemaName) {
- return String.format(Locale.ROOT, "CREATE DATABASE IF NOT EXISTS %s", schemaName);
- }
-
- public static String[] generateCreateTableSql(TableDesc tableDesc) {
- String dropSql = "DROP TABLE IF EXISTS " + tableDesc.getIdentity();
-
- StringBuilder ddl = new StringBuilder();
- ddl.append("CREATE TABLE " + tableDesc.getIdentity() + "\n");
- ddl.append("(" + "\n");
-
- for (int i = 0; i < tableDesc.getColumns().length; i++) {
- ColumnDesc col = tableDesc.getColumns()[i];
- if (i > 0) {
- ddl.append(",");
- }
- ddl.append(col.getName() + " " + col.getDatatype() + "\n");
- }
-
- ddl.append(")" + "\n");
- ddl.append("USING com.databricks.spark.csv");
-
- return new String[] { dropSql, ddl.toString() };
- }
-
- public NSparkTableMetaExplorer getTableMetaExplorer() {
- return new NSparkTableMetaExplorer();
- }
-
- @Override
- public List<String> listDatabases() throws Exception {
- Dataset<Row> dataset = SparderEnv.getSparkSession().sql("show databases").select("namespace");
- return dataset.collectAsList().stream().map(row -> row.getString(0)).collect(Collectors.toList());
- }
-
- @Override
- public List<String> listTables(String database) throws Exception {
- val ugi = UserGroupInformation.getCurrentUser();
- val config = KylinConfig.getInstanceFromEnv();
- val spark = SparderEnv.getSparkSession();
-
- List<String> tables = Lists.newArrayList();
- try {
- String sql = "show tables";
- if (StringUtils.isNotBlank(database)) {
- sql = String.format(Locale.ROOT, sql + " in %s", database);
- }
- Dataset<Row> dataset = SparderEnv.getSparkSession().sql(sql).select("tableName");
- tables = dataset.collectAsList().stream().map(row -> row.getString(0)).collect(Collectors.toList());
-
- if (config.getTableAccessFilterEnable() && config.getKerberosProjectLevelEnable()
- && UserGroupInformation.isSecurityEnabled()) {
- List<String> accessTables = Lists.newArrayList();
- for (String table : tables) {
- val tableName = database + "." + table;
- if (checkTableAccess(tableName)) {
- accessTables.add(table);
- }
- }
- return accessTables;
- }
- } catch (Exception e) {
- logger.error("List hive tables failed. user: {}, db: {}", ugi.getUserName(), database);
- }
-
- return tables;
- }
-
- public boolean checkTableAccess(String tableName) {
- boolean isAccess = true;
- try {
- val spark = SparderEnv.getSparkSession();
- val sparkTable = spark.catalog().getTable(tableName);
- Set<String> needCheckTables = Sets.newHashSet();
- if (sparkTable.tableType().equals(CatalogTableType.VIEW().name())) {
- needCheckTables = SparkSqlUtil.getViewOrignalTables(tableName, SparderEnv.getSparkSession());
- } else {
- needCheckTables.add(tableName);
- }
- String hiveSpecFsLocation = spark.sessionState().conf().getConf(SQLConf.HIVE_SPECIFIC_FS_LOCATION());
- FileSystem fs = null == hiveSpecFsLocation ? HadoopUtil.getWorkingFileSystem()
- : HadoopUtil.getFileSystem(hiveSpecFsLocation);
- for (String table : needCheckTables) {
- fs.listStatus(new Path(getLoc(spark, table, hiveSpecFsLocation)));
- }
- } catch (Exception e) {
- isAccess = false;
- try {
- logger.error("Read hive table {} error:{}, ugi name: {}.", tableName, e.getMessage(),
- UserGroupInformation.getCurrentUser().getUserName());
- } catch (IOException ex) {
- logger.error("fetch user curr ugi info error.", e);
- }
- }
- return isAccess;
- }
-
- @Override
- public Pair<TableDesc, TableExtDesc> loadTableMetadata(final String database, String tableName, String prj)
- throws Exception {
- KylinConfig config = KylinConfig.getInstanceFromEnv();
- NTableMetadataManager metaMgr = NTableMetadataManager.getInstance(config, prj);
-
- NSparkTableMeta tableMeta = getTableMetaExplorer().getSparkTableMeta(database, tableName);
- TableDesc tableDesc = metaMgr.getTableDesc(database + "." + tableName);
-
- // make a new TableDesc instance, don't modify the one in use
- if (tableDesc == null) {
- tableDesc = new TableDesc();
- tableDesc.setDatabase(database.toUpperCase(Locale.ROOT));
- tableDesc.setName(tableName.toUpperCase(Locale.ROOT));
- tableDesc.setUuid(RandomUtil.randomUUIDStr());
- tableDesc.setLastModified(0);
- } else {
- tableDesc = new TableDesc(tableDesc);
- }
-
- if (tableMeta.tableType != null) {
- tableDesc.setTableType(tableMeta.tableType);
- }
- //set table type = spark
- tableDesc.setSourceType(ISourceAware.ID_SPARK);
- tableDesc.setTransactional(tableMeta.isTransactional);
- tableDesc.setRangePartition(tableMeta.isRangePartition);
-
- Set<String> partColumnSet = Optional.ofNullable(tableMeta.partitionColumns) //
- .orElseGet(Collections::emptyList).stream().map(field -> field.name) //
- .collect(Collectors.toSet());
- int columnNumber = tableMeta.allColumns.size();
- List<ColumnDesc> columns = new ArrayList<ColumnDesc>(columnNumber);
- for (int i = 0; i < columnNumber; i++) {
- NSparkTableMeta.SparkTableColumnMeta field = tableMeta.allColumns.get(i);
- ColumnDesc cdesc = new ColumnDesc();
- cdesc.setName(field.name.toUpperCase(Locale.ROOT));
- cdesc.setCaseSensitiveName(field.name);
- // use "double" in kylin for "float"
- if ("float".equalsIgnoreCase(field.dataType)) {
- cdesc.setDatatype("double");
- } else {
- cdesc.setDatatype(field.dataType);
- }
- cdesc.setId(String.valueOf(i + 1));
- cdesc.setComment(field.comment);
- cdesc.setPartitioned(partColumnSet.contains(field.name));
- columns.add(cdesc);
- }
- tableDesc.setColumns(columns.toArray(new ColumnDesc[columnNumber]));
- List<String> partCols = tableMeta.partitionColumns.stream().map(col -> col.name).collect(Collectors.toList());
- if (!partCols.isEmpty()) {
- tableDesc.setPartitionColumn(partCols.get(0).toUpperCase(Locale.ROOT));
- } else {
- tableDesc.setPartitionColumn(null);
- }
- StringBuilder partitionColumnBuilder = new StringBuilder();
- for (int i = 0, n = tableMeta.partitionColumns.size(); i < n; i++) {
- if (i > 0)
- partitionColumnBuilder.append(", ");
- partitionColumnBuilder.append(tableMeta.partitionColumns.get(i).name.toUpperCase(Locale.ROOT));
- }
-
- TableExtDesc tableExtDesc = new TableExtDesc();
- tableExtDesc.setIdentity(tableDesc.getIdentity());
- tableExtDesc.setUuid(RandomUtil.randomUUIDStr());
- tableExtDesc.setLastModified(0);
- tableExtDesc.init(prj);
-
- tableExtDesc.addDataSourceProp(TableExtDesc.LOCATION_PROPERTY_KEY, tableMeta.sdLocation);
- tableExtDesc.addDataSourceProp("owner", tableMeta.owner);
- tableExtDesc.addDataSourceProp("create_time", tableMeta.createTime);
- tableExtDesc.addDataSourceProp("last_access_time", tableMeta.lastAccessTime);
- tableExtDesc.addDataSourceProp("partition_column", partitionColumnBuilder.toString());
- tableExtDesc.addDataSourceProp("total_file_size", String.valueOf(tableMeta.fileSize));
- tableExtDesc.addDataSourceProp("total_file_number", String.valueOf(tableMeta.fileNum));
- tableExtDesc.addDataSourceProp("hive_inputFormat", tableMeta.sdInputFormat);
- tableExtDesc.addDataSourceProp("hive_outputFormat", tableMeta.sdOutputFormat);
- tableExtDesc.addDataSourceProp(TableExtDesc.S3_ROLE_PROPERTY_KEY, tableMeta.s3Role);
- tableExtDesc.addDataSourceProp(TableExtDesc.S3_ENDPOINT_KEY, tableMeta.s3Endpoint);
- return Pair.newPair(tableDesc, tableExtDesc);
- }
-
- @Override
- public List<String> getRelatedKylinResources(TableDesc table) {
- return Collections.emptyList();
- }
-
- @Override
- public boolean checkDatabaseAccess(String database) throws Exception {
- boolean hiveDBAccessFilterEnable = KapConfig.getInstanceFromEnv().getDBAccessFilterEnable();
- if (hiveDBAccessFilterEnable) {
- logger.info("Check database {} access start.", database);
- try {
- Database db = SparderEnv.getSparkSession().catalog().getDatabase(database);
- } catch (AnalysisException e) {
- UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
- logger.info("The current user: {} does not have permission to access database {}", ugi.getUserName(),
- database);
- return false;
- }
- }
-
- return true;
- }
-
- @Override
- public boolean checkTablesAccess(Set<String> tables) {
- return tables.stream().allMatch(this::checkTableAccess);
- }
-
- @Override
- public Set<String> getTablePartitions(String database, String table, String prj, String partCol) {
- return getTableMetaExplorer().checkAndGetTablePartitions(database, table, partCol);
- }
-
- @Override
- public void createSampleDatabase(String database) throws Exception {
- SparderEnv.getSparkSession().sql(generateCreateSchemaSql(database));
- }
-
- @Override
- public void createSampleTable(TableDesc table) throws Exception {
- String[] createTableSqls = generateCreateTableSql(table);
- for (String sql : createTableSqls) {
- SparderEnv.getSparkSession().sql(sql);
- }
- }
-
- @Override
- public void loadSampleData(String tableName, String tableFileDir) throws Exception {
- Dataset<Row> dataset = SparderEnv.getSparkSession().read().csv(tableFileDir + "/" + tableName + ".csv").toDF();
- if (tableName.indexOf(".") > 0) {
- tableName = tableName.substring(tableName.indexOf(".") + 1);
- }
- dataset.createOrReplaceTempView(tableName);
- }
-
- @Override
- public void createWrapperView(String origTableName, String viewName) throws Exception {
- throw new UnsupportedOperationException("unsupport create wrapper view");
- }
-
- public String getLoc(SparkSession spark, String table, String hiveSpecFsLocation) {
- String loc = spark.sql("desc formatted " + table).where("col_name == 'Location'").head().getString(1);
- if (null == hiveSpecFsLocation || null == loc) {
- return loc;
- }
- return loc.replace("hdfs://hacluster", hiveSpecFsLocation);
- }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java
deleted file mode 100644
index 6ab6643154..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTable.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.source;
-
-import java.io.IOException;
-import java.util.Locale;
-
-import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.source.IReadableTable;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class NSparkTable implements IReadableTable {
-
- private static final Logger logger = LoggerFactory.getLogger(NSparkTable.class);
-
- final private String database;
- final private String tableName;
-
- public NSparkTable(TableDesc tableDesc) {
- this.database = tableDesc.getDatabase();
- this.tableName = tableDesc.getName();
- }
-
- @Override
- public TableReader getReader() throws IOException {
- return new NSparkTableReader(database, tableName);
- }
-
- @Override
- public TableSignature getSignature() throws IOException {
- // TODO: 07/12/2017 get modify time
- String path = String.format(Locale.ROOT, "%s.%s", database, tableName);
- long lastModified = System.currentTimeMillis(); // assume table is ever changing
- int size = 0;
- return new TableSignature(path, size, lastModified);
- }
-
- @Override
- public boolean exists() {
- return true;
- }
-
- @Override
- public String toString() {
- return "spark: database=[" + database + "], table=[" + tableName + "]";
- }
-
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java
deleted file mode 100644
index 29f48bfbfc..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMeta.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import java.util.List;
-
-public class NSparkTableMeta {
- public static class SparkTableColumnMeta {
- String name;
- String dataType;
- String comment;
-
- public SparkTableColumnMeta(String name, String dataType, String comment) {
- this.name = name;
- this.dataType = dataType;
- this.comment = comment;
- }
-
- @Override
- public String toString() {
- return "SparkTableColumnMeta{" + "name='" + name + '\'' + ", dataType='" + dataType + '\'' + ", comment='"
- + comment + '\'' + '}';
- }
-
- public String getName() {
- return name;
- }
-
- public String getDataType() {
- return dataType;
- }
-
- public String getComment() {
- return comment;
- }
- }
-
- String tableName;
- String sdLocation;//sd is short for storage descriptor
- String sdInputFormat;
- String sdOutputFormat;
- String owner;
- String provider;
- String tableType;
- String createTime;
- String lastAccessTime;
- long fileSize;
- long fileNum;
- boolean isNative;
- List<SparkTableColumnMeta> allColumns;
- List<SparkTableColumnMeta> partitionColumns;
- boolean isTransactional;
- boolean isRangePartition;
- String s3Role;
- String s3Endpoint;
-
- public List<SparkTableColumnMeta> getAllColumns() {
- return allColumns;
- }
-
- public NSparkTableMeta(String tableName, String sdLocation, String sdInputFormat, String sdOutputFormat,
- String owner, String provider, String tableType, String createTime, String lastAccessTime, long fileSize,
- long fileNum, boolean isNative, List<SparkTableColumnMeta> allColumns,
- List<SparkTableColumnMeta> partitionColumns, boolean isTransactional, boolean isRangePartition,
- String s3Role, String s3Endpoint) {
- this.tableName = tableName;
- this.sdLocation = sdLocation;
- this.sdInputFormat = sdInputFormat;
- this.sdOutputFormat = sdOutputFormat;
- this.owner = owner;
- this.provider = provider;
- this.tableType = tableType;
- this.createTime = createTime;
- this.lastAccessTime = lastAccessTime;
- this.fileSize = fileSize;
- this.fileNum = fileNum;
- this.isNative = isNative;
- this.allColumns = allColumns;
- this.partitionColumns = partitionColumns;
- this.isTransactional = isTransactional;
- this.isRangePartition = isRangePartition;
- this.s3Role = s3Role;
- this.s3Endpoint = s3Endpoint;
- }
-
- @Override
- public String toString() {
- return "SparkTableMeta{" + "tableName='" + tableName + '\'' + ", sdLocation='" + sdLocation + '\''
- + ", sdInputFormat='" + sdInputFormat + '\'' + ", sdOutputFormat='" + sdOutputFormat + '\''
- + ", owner='" + owner + ", provider='" + provider + '\'' + ", tableType='" + tableType
- + ", createTime='" + createTime + '\'' + ", lastAccessTime=" + lastAccessTime + ", fileSize=" + fileSize
- + ", fileNum=" + fileNum + ", isNative=" + isNative + ", allColumns=" + allColumns
- + ", partitionColumns=" + partitionColumns + ", isTransactional=" + isTransactional
- + ", isRangePartition=" + isRangePartition + ", s3Role=" + s3Role + ", s3Endpoint=" + s3Endpoint + '}';
- }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java
deleted file mode 100644
index 2d4a9eeae1..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaBuilder.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.engine.spark.source;
-
-import java.util.List;
-
-import com.google.common.collect.Lists;
-
-public class NSparkTableMetaBuilder {
- private String tableName;
- private String sdLocation;
- private String sdInputFormat;
- private String sdOutputFormat;
- private String owner;
- private String provider;
- private String tableType;
- private String createTime;
- private String lastAccessTime;
- private long fileSize;
- private long fileNum;
- private boolean isNative = true;
- private List<NSparkTableMeta.SparkTableColumnMeta> allColumns = Lists.newArrayList();
- private List<NSparkTableMeta.SparkTableColumnMeta> partitionColumns = Lists.newArrayList();
- private boolean isTransactional = false;
- private boolean isRangePartition = false;
- private String s3Role;
- private String s3Endpoint;
-
- public NSparkTableMetaBuilder setTableName(String tableName) {
- this.tableName = tableName;
- return this;
- }
-
- public NSparkTableMetaBuilder setSdLocation(String sdLocation) {
- this.sdLocation = sdLocation;
- return this;
- }
-
- public NSparkTableMetaBuilder setSdInputFormat(String sdInputFormat) {
- this.sdInputFormat = sdInputFormat;
- return this;
- }
-
- public NSparkTableMetaBuilder setSdOutputFormat(String sdOutputFormat) {
- this.sdOutputFormat = sdOutputFormat;
- return this;
- }
-
- public NSparkTableMetaBuilder setOwner(String owner) {
- this.owner = owner;
- return this;
- }
-
- public NSparkTableMetaBuilder setProvider(String provider) {
- this.provider = provider;
- return this;
- }
-
- public NSparkTableMetaBuilder setTableType(String tableType) {
- this.tableType = tableType;
- return this;
- }
-
- public NSparkTableMetaBuilder setCreateTime(String createTime) {
- this.createTime = createTime;
- return this;
- }
-
- public NSparkTableMetaBuilder setLastAccessTime(String lastAccessTime) {
- this.lastAccessTime = lastAccessTime;
- return this;
- }
-
- public NSparkTableMetaBuilder setFileSize(long fileSize) {
- this.fileSize = fileSize;
- return this;
- }
-
- public NSparkTableMetaBuilder setFileNum(long fileNum) {
- this.fileNum = fileNum;
- return this;
- }
-
- public NSparkTableMetaBuilder setIsNative(boolean isNative) {
- this.isNative = isNative;
- return this;
- }
-
- public NSparkTableMetaBuilder setAllColumns(List<NSparkTableMeta.SparkTableColumnMeta> allColumns) {
- this.allColumns = allColumns;
- return this;
- }
-
- public NSparkTableMetaBuilder setPartitionColumns(List<NSparkTableMeta.SparkTableColumnMeta> partitionColumns) {
- this.partitionColumns = partitionColumns;
- return this;
- }
-
- public NSparkTableMetaBuilder setIsTransactional(boolean isTransactional) {
- this.isTransactional = isTransactional;
- return this;
- }
-
- public NSparkTableMetaBuilder setIsRangePartition(boolean isRangePartition) {
- this.isRangePartition = isRangePartition;
- return this;
- }
-
- public NSparkTableMetaBuilder setS3Role(String s3Role) {
- this.s3Role = s3Role;
- return this;
- }
-
- public NSparkTableMetaBuilder setS3Endpoint(String s3Endpoint) {
- this.s3Endpoint = s3Endpoint;
- return this;
- }
-
- public NSparkTableMeta createSparkTableMeta() {
- return new NSparkTableMeta(tableName, sdLocation, sdInputFormat, sdOutputFormat, owner, provider, tableType,
- createTime, lastAccessTime, fileSize, fileNum, isNative, allColumns, partitionColumns, isTransactional,
- isRangePartition, s3Role, s3Endpoint);
- }
-}
\ No newline at end of file
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
deleted file mode 100644
index 033edb84f9..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableMetaExplorer.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import java.io.Serializable;
-import java.net.URI;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.commons.jnet.Installer;
-import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
-import org.apache.spark.sql.SparderEnv;
-import org.apache.spark.sql.catalyst.TableIdentifier;
-import org.apache.spark.sql.catalyst.catalog.CatalogTable;
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog;
-import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import scala.Option;
-import scala.collection.JavaConversions;
-
-public class NSparkTableMetaExplorer implements Serializable {
-
- private static final Logger logger = LoggerFactory.getLogger(NSparkTableMetaExplorer.class);
-
- enum PROVIDER {
- HIVE("hive"), UNSPECIFIED("");
-
- private static final PROVIDER[] ALL = new PROVIDER[] { HIVE };
- private String value;
-
- PROVIDER(String value) {
- this.value = value;
- }
-
- public static PROVIDER fromString(Option<String> value) {
- if (value.isEmpty()) {
- return UNSPECIFIED;
- }
-
- for (PROVIDER provider : ALL) {
- if (provider.value.equals(value.get())) {
- return provider;
- }
- }
- return UNSPECIFIED;
- }
- }
-
- private static final List<String> UNSUPOORT_TYPE = Lists.newArrayList("array", "map", "struct", "binary");
- private static final String CHAR_VARCHAR_TYPE_STRING_METADATA_KEY = "__CHAR_VARCHAR_TYPE_STRING";
- public static final String S3_ROLE_PROPERTY_KEY = "role";
- public static final String S3_ENDPOINT_PROPERTY_KEY = "s3_endpoint";
-
- public NSparkTableMeta getSparkTableMeta(String database, String tableName) {
- SessionCatalog catalog = SparderEnv.getSparkSession().sessionState().catalog();
- TableIdentifier tableIdentifier = TableIdentifier.apply(tableName,
- Option.apply(database.isEmpty() ? null : database));
- CatalogTable tableMetadata = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
- checkTableIsValid(tableMetadata, tableIdentifier, tableName);
- return getSparkTableMeta(tableName, tableMetadata);
- }
-
- public Set<String> checkAndGetTablePartitions(String database, String tableName, String partitionCol) {
- SessionCatalog catalog = SparderEnv.getSparkSession().sessionState().catalog();
- TableIdentifier tableIdentifier = TableIdentifier.apply(tableName,
- Option.apply(database.isEmpty() ? null : database));
-
- CatalogTable tableMetadata = catalog.getTempViewOrPermanentTableMetadata(tableIdentifier);
-
- String firstPartCol = tableMetadata.partitionColumnNames().isEmpty() ? null
- : tableMetadata.partitionColumnNames().head().toLowerCase(Locale.ROOT);
-
- if (!partitionCol.equalsIgnoreCase(firstPartCol)) {
- throw new IllegalArgumentException(
- String.format(Locale.ROOT, "table partition col %s not match col %s", firstPartCol, partitionCol));
- }
- return JavaConversions.seqAsJavaList(catalog.listPartitions(tableIdentifier, Option.empty())).stream()
- .map(item -> JavaConversions.mapAsJavaMap(item.spec()).entrySet().stream()
- .filter(entry -> partitionCol.equalsIgnoreCase(entry.getKey())) //
- .findFirst() //
- .map(Map.Entry::getValue) //
- .orElse(null))
- .filter(Objects::nonNull).collect(Collectors.toSet());
- }
-
- private NSparkTableMeta getSparkTableMeta(String tableName, CatalogTable tableMetadata) {
- NSparkTableMetaBuilder builder = new NSparkTableMetaBuilder();
- builder.setTableName(tableName);
- builder.setAllColumns(getColumns(tableMetadata, tableMetadata.schema()));
- builder.setOwner(tableMetadata.owner());
- builder.setCreateTime(tableMetadata.createTime() + "");
- builder.setLastAccessTime(tableMetadata.lastAccessTime() + "");
- builder.setTableType(tableMetadata.tableType().name());
- builder.setPartitionColumns(getColumns(tableMetadata, tableMetadata.partitionSchema()));
- builder.setIsRangePartition(isRangePartition(tableMetadata));
- if (tableMetadata.storage().inputFormat().isDefined()) {
- builder.setSdInputFormat(tableMetadata.storage().inputFormat().get());
- }
- if (tableMetadata.storage().outputFormat().isDefined()) {
- builder.setSdOutputFormat(tableMetadata.storage().outputFormat().get());
- }
- Option<URI> uriOption = tableMetadata.storage().locationUri();
- if (uriOption.isDefined()) {
- builder.setSdLocation(uriOption.get().toString());
- }
- if (tableMetadata.provider().isDefined()) {
- builder.setProvider(tableMetadata.provider().get());
- }
- if (tableMetadata.properties().contains("totalSize")) {
- builder.setFileSize(Long.parseLong(tableMetadata.properties().get("totalSize").get()));
- }
- if (tableMetadata.properties().contains("numFiles")) {
- builder.setFileNum(Long.parseLong(tableMetadata.properties().get("numFiles").get()));
- }
- if (tableMetadata.properties().contains("transactional")) {
- builder.setIsTransactional(Boolean.parseBoolean(tableMetadata.properties().get("transactional").get()));
- }
- if (tableMetadata.properties().contains(S3_ROLE_PROPERTY_KEY)) {
- builder.setS3Role(tableMetadata.properties().get(S3_ROLE_PROPERTY_KEY).get());
- }
-
- if (tableMetadata.properties().contains(S3_ENDPOINT_PROPERTY_KEY)) {
- builder.setS3Endpoint(tableMetadata.properties().get(S3_ENDPOINT_PROPERTY_KEY).get());
- }
- return builder.createSparkTableMeta();
- }
-
- private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable tableMetadata, StructType schema) {
- return getColumns(tableMetadata, schema, true);
- }
-
- private List<NSparkTableMeta.SparkTableColumnMeta> getColumns(CatalogTable tableMetadata, StructType schema,
- boolean isCheckRepeatColumn) {
- List<NSparkTableMeta.SparkTableColumnMeta> allColumns = Lists.newArrayListWithCapacity(schema.size());
- Set<String> columnCacheTemp = Sets.newHashSet();
- for (org.apache.spark.sql.types.StructField field : schema.fields()) {
- String type = field.dataType().simpleString();
- if (field.metadata().contains(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY)) {
- type = field.metadata().getString(CHAR_VARCHAR_TYPE_STRING_METADATA_KEY);
- }
- String finalType = type;
- if (UNSUPOORT_TYPE.stream().anyMatch(finalType::contains)) {
- logger.info("Load table {} ignore column {}:{}", tableMetadata.identifier().identifier(), field.name(),
- finalType);
- continue;
- }
- if (isCheckRepeatColumn && columnCacheTemp.contains(field.name())) {
- logger.info("The【{}】column is already included and does not need to be added again", field.name());
- continue;
- }
- columnCacheTemp.add(field.name());
- allColumns.add(new NSparkTableMeta.SparkTableColumnMeta(field.name(), type,
- field.getComment().isDefined() ? field.getComment().get() : null));
- }
-
- return allColumns;
- }
-
- private void checkTableIsValid(CatalogTable tableMetadata, TableIdentifier tableIdentifier, String tableName) {
- if (CatalogTableType.VIEW().equals(tableMetadata.tableType())) {
- try {
- Installer.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
- SparderEnv.getSparkSession().table(tableIdentifier).queryExecution().analyzed();
- } catch (Throwable e) {
- logger.error("Error for parser view: " + tableName, e);
- throw new RuntimeException("Error for parser view: " + tableName + ", " + e.getMessage()
- + "(There are maybe syntactic differences between HIVE and SparkSQL)", e);
- }
- }
- }
-
- private Boolean isRangePartition(CatalogTable tableMetadata) {
- List<NSparkTableMeta.SparkTableColumnMeta> allColumns = getColumns(tableMetadata, tableMetadata.schema(),
- false);
- return allColumns.stream().collect(Collectors.groupingBy(p -> p.name)).values().stream()
- .anyMatch(p -> p.size() > 1);
- }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java
deleted file mode 100644
index 669f418c9c..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/NSparkTableReader.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-
-import org.apache.kylin.source.IReadableTable.TableReader;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparderEnv;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.types.StructField;
-
-public class NSparkTableReader implements TableReader {
- private String dbName;
- private String tableName;
- private SparkSession ss;
- private List<Row> records;
- private Iterator<Row> iterator;
- private Row currentRow;
-
- public NSparkTableReader(String dbName, String tableName) {
- this.dbName = dbName;
- this.tableName = tableName;
- initialize();
- }
-
- public static String[] getRowAsStringArray(Row record) {
- StructField[] fields = record.schema().fields();
- String[] arr = new String[fields.length];
- for (int i = 0; i < arr.length; i++) {
- Object o = record.get(i);
- arr[i] = (o == null) ? null : o.toString();
- }
- return arr;
- }
-
- private void initialize() {
- ss = SparderEnv.getSparkSession();
- String master = ss.sparkContext().master();
- String tableIdentity = tableName;
- // spark sql can not add the database prefix when create tempView from csv, but when working with hive, it need the database prefix
- if (!master.toLowerCase(Locale.ROOT).contains("local")) {
- tableIdentity = String.format(Locale.ROOT, "%s.%s", dbName, tableName);
- }
- records = SparkSqlUtil.queryAll(ss, tableIdentity);
- iterator = records.iterator();
- }
-
- @Override
- public boolean next() throws IOException {
- boolean hasNext = iterator != null && iterator.hasNext();
- if (hasNext) {
- currentRow = iterator.next();
- }
- return hasNext;
- }
-
- @Override
- public String[] getRow() {
- return getRowAsStringArray(currentRow);
- }
-
- @Override
- public void close() throws IOException {
- this.records = null;
- this.iterator = null;
- this.currentRow = null;
- }
-}
diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java
deleted file mode 100644
index cea86bc9fa..0000000000
--- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/source/SparkSqlUtil.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kylin.engine.spark.source;
-
-import java.util.List;
-import java.util.Locale;
-import java.util.Set;
-
-import org.apache.spark.sql.AnalysisException;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation;
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType;
-
-import com.google.common.collect.Sets;
-
-import lombok.val;
-
-public class SparkSqlUtil {
- public static Dataset<Row> query(SparkSession ss, String sql) {
- return ss.sql(sql);
- }
-
- public static List<Row> queryForList(SparkSession ss, String sql) {
- return ss.sql(sql).collectAsList();
- }
-
- public static List<Row> queryAll(SparkSession ss, String table) {
- String sql = String.format(Locale.ROOT, "select * from %s", table);
- return queryForList(ss, sql);
- }
-
- public static Set<String> getViewOrignalTables(String viewName, SparkSession spark) throws AnalysisException {
- String viewText = spark.sql("desc formatted " + viewName).where("col_name = 'View Text'").head().getString(1);
- val logicalPlan = spark.sessionState().sqlParser().parsePlan(viewText);
- Set<String> viewTables = Sets.newHashSet();
- for (Object l : scala.collection.JavaConverters.seqAsJavaListConverter(logicalPlan.collectLeaves()).asJava()) {
- if (l instanceof UnresolvedRelation) {
- val tableName = ((UnresolvedRelation) l).tableName();
- //if nested view
- if (spark.catalog().getTable(tableName).tableType().equals(CatalogTableType.VIEW().name())) {
- viewTables.addAll(getViewOrignalTables(tableName, spark));
- } else {
- viewTables.add(tableName);
- }
- }
- }
- return viewTables;
- }
-}