You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2023/04/23 06:01:01 UTC

[kylin] 20/22: [DIRTY] run sparder with local mode when proposing recommendations

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit c67ba03c6360b28ee625b3596b24511b0bcac2ba
Author: Pengfei.Zhan <pe...@kyligence.io>
AuthorDate: Fri Feb 3 22:26:44 2023 +0800

    [DIRTY] run sparder with local mode when proposing recommendations
---
 .../scala/org/apache/spark/sql/SparderEnv.scala    | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)

diff --git a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
index 2da5620517..41f6abe771 100644
--- a/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
+++ b/src/spark-project/sparder/src/main/scala/org/apache/spark/sql/SparderEnv.scala
@@ -21,20 +21,18 @@ package org.apache.spark.sql
 import java.lang.{Boolean => JBoolean, String => JString}
 import java.security.PrivilegedAction
 import java.util.Map
-import java.util.concurrent.{Callable, ExecutorService}
 import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{Callable, ExecutorService}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.security.UserGroupInformation
-import org.apache.kylin.common.{KylinConfig, QueryContext}
 import org.apache.kylin.common.exception.{KylinException, KylinTimeoutException, ServerErrorCode}
 import org.apache.kylin.common.msg.MsgPicker
 import org.apache.kylin.common.util.{DefaultHostInfoFetcher, HadoopUtil, S3AUtil}
+import org.apache.kylin.common.{KylinConfig, QueryContext}
 import org.apache.kylin.metadata.model.{NTableMetadataManager, TableExtDesc}
 import org.apache.kylin.metadata.project.NProjectManager
 import org.apache.kylin.query.runtime.plan.QueryToExecutionIDCache
-
-import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerLogRollUp}
 import org.apache.spark.sql.KylinSession._
@@ -46,6 +44,7 @@ import org.apache.spark.sql.execution.ui.PostQueryExecutionForKylin
 import org.apache.spark.sql.hive.ReplaceLocationRule
 import org.apache.spark.sql.udf.UdfManager
 import org.apache.spark.util.{ThreadUtils, Utils}
+import org.apache.spark.{SparkConf, SparkContext}
 
 // scalastyle:off
 object SparderEnv extends Logging {
@@ -217,7 +216,8 @@ object SparderEnv extends Logging {
       val appName = "sparder-" + UserGroupInformation.getCurrentUser.getShortUserName + "-" + hostInfoFetcher.getHostname
 
       val isLocalMode = KylinConfig.getInstanceFromEnv.isJobNodeOnly ||
-        ("true").equals(System.getProperty("spark.local"))
+        "true".equals(System.getenv("SPARK_LOCAL")) ||
+        "true".equals(System.getProperty("spark.local"))
       val sparkSession = isLocalMode match {
         case true =>
           SparkSession.builder
@@ -238,11 +238,11 @@ object SparderEnv extends Logging {
             //if user defined other master in kylin.properties,
             // it will get overwrite later in org.apache.spark.sql.KylinSession.KylinBuilder.initSparkConf
             .withExtensions { ext =>
-            ext.injectPlannerStrategy(_ => KylinSourceStrategy)
-            ext.injectPlannerStrategy(_ => LayoutFileSourceStrategy)
-            ext.injectPostHocResolutionRule(ReplaceLocationRule)
-            ext.injectOptimizerRule(_ => new ConvertInnerJoinToSemiJoin())
-          }
+              ext.injectPlannerStrategy(_ => KylinSourceStrategy)
+              ext.injectPlannerStrategy(_ => LayoutFileSourceStrategy)
+              ext.injectPostHocResolutionRule(ReplaceLocationRule)
+              ext.injectOptimizerRule(_ => new ConvertInnerJoinToSemiJoin())
+            }
             .enableHiveSupport()
             .getOrCreateKylinSession()
       }
@@ -343,7 +343,7 @@ object SparderEnv extends Logging {
 
   }
 
-  def getHadoopConfiguration(): /**/Configuration = {
+  def getHadoopConfiguration(): /**/ Configuration = {
     var configuration = HadoopUtil.getCurrentConfiguration
     spark.conf.getAll.filter(item => item._1.startsWith("fs.")).foreach(item => configuration.set(item._1, item._2))
     configuration