You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/04/23 06:01:01 UTC
[kylin] 20/22: [DIRTY] run sparder with local mode when proposing recommendations
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit c67ba03c6360b28ee625b3596b24511b0bcac2ba
Author: Pengfei.Zhan <pe...@kyligence.io>
AuthorDate: Fri Feb 3 22:26:44 2023 +0800
[DIRTY] run sparder with local mode when proposing recommendations
---
.../scala/org/apache/spark/sql/SparderEnv.scala | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)
diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
index 2da5620517..41f6abe771 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
@@ -21,20 +21,18 @@ package org.apache.spark.sql
import java.lang.{Boolean => JBoolean, String => JString}
import java.security.PrivilegedAction
import java.util.Map
-import java.util.concurrent.{Callable, ExecutorService}
import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{Callable, ExecutorService}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.kylin.common.{KylinConfig, QueryContext}
import org.apache.kylin.common.exception.{KylinException, KylinTimeoutException, ServerErrorCode}
import org.apache.kylin.common.msg.MsgPicker
import org.apache.kylin.common.util.{DefaultHostInfoFetcher, HadoopUtil, S3AUtil}
+import org.apache.kylin.common.{KylinConfig, QueryContext}
import org.apache.kylin.metadata.model.{NTableMetadataManager, TableExtDesc}
import org.apache.kylin.metadata.project.NProjectManager
import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache
-
-import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerLogRollUp}
import org.apache.spark.sql.KylinSession._
@@ -46,6 +44,7 @@ import org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin
import org.apache.spark.sql.hive.ReplaceLocationRule
import org.apache.spark.sql.udf.UdfManager
import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.{SparkConf, SparkContext}
// scalastyle:off
object SparderEnv extends Logging {
@@ -217,7 +216,8 @@ object SparderEnv extends Logging {
val appName = "sparder-" + UserGroupInformation.getCurrentUser.getShortUserName + "-" + hostInfoFetcher.getHostname
val isLocalMode = KylinConfig.getInstanceFromEnv.isJobNodeOnly ||
- ("true").equals(System.getProperty("spark.local"))
+ "true".equals(System.getenv("SPARK_LOCAL")) ||
+ "true".equals(System.getProperty("spark.local"))
val sparkSession = isLocalMode match {
case true =>
SparkSession.builder
@@ -238,11 +238,11 @@ object SparderEnv extends Logging {
//if user defined other master in kylin.properties,
// it will get overwrite later in org.apache.spark.sql.KylinSession.KylinBuilder.initSparkConf
.withExtensions { ext =>
- ext.injectPlannerStrategy(_ => KylinSourceStrategy)
- ext.injectPlannerStrategy(_ => LayoutFileSourceStrategy)
- ext.injectPostHocResolutionRule(ReplaceLocationRule)
- ext.injectOptimizerRule(_ => new ConvertInnerJoinToSemiJoin())
- }
+ ext.injectPlannerStrategy(_ => KylinSourceStrategy)
+ ext.injectPlannerStrategy(_ => LayoutFileSourceStrategy)
+ ext.injectPostHocResolutionRule(ReplaceLocationRule)
+ ext.injectOptimizerRule(_ => new ConvertInnerJoinToSemiJoin())
+ }
.enableHiveSupport()
.getOrCreateKylinSession()
}
@@ -343,7 +343,7 @@ object SparderEnv extends Logging {
}
- def getHadoopConfiguration(): /**/Configuration = {
+ def getHadoopConfiguration(): /**/ Configuration = {
var configuration = HadoopUtil.getCurrentConfiguration
spark.conf.getAll.filter(item => item._1.startsWith("fs.")).foreach(item => configuration.set(item._1, item._2))
configuration