You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/06/28 02:34:51 UTC

[carbondata] branch master updated: [CARBONDATA-3870] Optimize global lock to object lock for CarbonEnv.init

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 10aa6e9  [CARBONDATA-3870] Optimize global lock to object lock for CarbonEnv.init
10aa6e9 is described below

commit 10aa6e9906b74905043b50cc6422e1ef42b894f5
Author: QiangCai <qi...@qq.com>
AuthorDate: Wed Jun 24 15:19:26 2020 +0800

    [CARBONDATA-3870] Optimize global lock to object lock for CarbonEnv.init
    
    Why is this PR needed?
    global lock of CarbonEnv.init impacts the performance of the concurrent query with multiple SparkSession.
    
    What changes were proposed in this PR?
    Optimize global lock to object lock for CarbonEnv.init
    The global lock is only used to create CarbonEnv(not include init).
    Does this PR introduce any user interface change?
    No
    Is any new testcase added?
    No, derby only support one connection at a time, we can't create multiple sessions to connect a same derby metastore.
    
    This closes #3805
---
 .../scala/org/apache/spark/sql/CarbonEnv.scala     | 94 ++++++++++++----------
 1 file changed, 51 insertions(+), 43 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 5062a43..67de334 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -62,7 +62,11 @@ class CarbonEnv {
 
   var initialized = false
 
-  def init(sparkSession: SparkSession): Unit = {
+  def init(sparkSession: SparkSession): Unit = this.synchronized {
+    // after locking, check initialized at first
+    if (initialized) {
+      return
+    }
     val properties = CarbonProperties.getInstance()
     var storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
     if (storePath == null) {
@@ -96,43 +100,38 @@ class CarbonEnv {
     // added for handling timeseries function like hour, minute, day, month, year
     sparkSession.udf.register(MVFunctions.TIME_SERIES_FUNCTION, new TimeSeriesFunction)
 
-    // acquiring global level lock so global configuration will be updated by only one thread
-    CarbonEnv.carbonEnvMap.synchronized {
-      if (!initialized) {
-        // update carbon session parameters , preserve thread parameters
-        val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-        carbonSessionInfo = new CarbonSessionInfo()
-        // We should not corrupt the information in carbonSessionInfo object which is at the
-        // session level. Instead create a new object and in that set the user specified values in
-        // thread/session params
-        val threadLevelCarbonSessionInfo = new CarbonSessionInfo()
-        if (currentThreadSesssionInfo != null) {
-          threadLevelCarbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams)
-        }
-        ThreadLocalSessionInfo.setCarbonSessionInfo(threadLevelCarbonSessionInfo)
-        ThreadLocalSessionInfo.setConfigurationToCurrentThread(sparkSession
-          .sessionState.newHadoopConf())
-        val config = new CarbonSQLConf(sparkSession)
-        if (sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT).isEmpty) {
-          config.addDefaultCarbonParams()
-        }
-        // add session params after adding DefaultCarbonParams
-        config.addDefaultCarbonSessionParams()
-        carbonMetaStore = {
-          // trigger event for CarbonEnv create
-          val operationContext = new OperationContext
-          val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
-            CarbonEnvInitPreEvent(sparkSession, carbonSessionInfo, storePath)
-          OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent, operationContext)
-
-          CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
-        }
-        CarbonProperties.getInstance
-          .addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
-        initialized = true
-      }
+    // update carbon session parameters , preserve thread parameters
+    val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+    carbonSessionInfo = new CarbonSessionInfo()
+    // We should not corrupt the information in carbonSessionInfo object which is at the
+    // session level. Instead create a new object and in that set the user specified values in
+    // thread/session params
+    val threadLevelCarbonSessionInfo = new CarbonSessionInfo()
+    if (currentThreadSesssionInfo != null) {
+      threadLevelCarbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams)
     }
+    ThreadLocalSessionInfo.setCarbonSessionInfo(threadLevelCarbonSessionInfo)
+    ThreadLocalSessionInfo.setConfigurationToCurrentThread(
+      sparkSession.sessionState.newHadoopConf())
+    val config = new CarbonSQLConf(sparkSession)
+    if (sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT).isEmpty) {
+      config.addDefaultCarbonParams()
+    }
+    // add session params after adding DefaultCarbonParams
+    config.addDefaultCarbonSessionParams()
+    carbonMetaStore = {
+      // trigger event for CarbonEnv create
+      val operationContext = new OperationContext
+      val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
+        CarbonEnvInitPreEvent(sparkSession, carbonSessionInfo, storePath)
+      OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent, operationContext)
+      CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
+    }
+    CarbonProperties.getInstance
+      .addNonSerializableProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
     Profiler.initialize(sparkSession.sparkContext)
+    CarbonToSparkAdapter.addSparkSessionListener(sparkSession)
+    initialized = true
     LOGGER.info("Initialize CarbonEnv completed...")
   }
 }
@@ -147,14 +146,23 @@ object CarbonEnv {
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   def getInstance(sparkSession: SparkSession): CarbonEnv = {
-      var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession)
-      if (carbonEnv == null) {
-        carbonEnv = new CarbonEnv
-        carbonEnv.init(sparkSession)
-        CarbonToSparkAdapter.addSparkSessionListener(sparkSession)
-        carbonEnvMap.put(sparkSession, carbonEnv)
+    var carbonEnv: CarbonEnv = carbonEnvMap.get(sparkSession)
+    if (carbonEnv == null) {
+      // if carbonEnv is null, need to create create CarbonEnv
+      // use global level lock, it will be fine for only creating CarbonEnv object
+      CarbonEnv.carbonEnvMap.synchronized {
+        // need to double check whether CarbonEnv exists or not
+        carbonEnv = carbonEnvMap.get(sparkSession)
+        if (carbonEnv == null) {
+          carbonEnv = new CarbonEnv
+          carbonEnvMap.put(sparkSession, carbonEnv)
+        }
       }
-      carbonEnv
+    }
+    if (!carbonEnv.initialized) {
+      carbonEnv.init(sparkSession)
+    }
+    carbonEnv
   }
 
   /**