You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/14 17:50:28 UTC

[06/18] carbondata git commit: [CARBONDATA-1627] one job failed among 100 job while performing select operation with 100 different thread

[CARBONDATA-1627] one job failed among 100 job while performing select operation with 100 different thread

this issue is resolved when we launch 100 job through jmeter and one job was getting failed because multiple thread would try to initialize the variable even tough the call is made through lazy, but still in multiple thread cases multiple thread tried to initialize variable, to avoid this added synchronized block so that only one thread only can initialize the variable and latter for other thread there's no chance of accessing that block.

This closes #1457


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/796905fb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/796905fb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/796905fb

Branch: refs/heads/fgdatamap
Commit: 796905fb15f6657a3218be7bee99a9f726c57db0
Parents: 2f0959a
Author: kushalsaha <ku...@gmail.com>
Authored: Sat Nov 11 16:55:06 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Nov 14 16:44:05 2017 +0530

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/CarbonEnv.scala  | 64 ++++++++++----------
 1 file changed, 33 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/796905fb/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index b69ef2f..1ee7650 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -55,39 +55,41 @@ class CarbonEnv {
     // added for handling preaggregate table creation. when user will fire create ddl for
     // create table we are adding a udf so no need to apply PreAggregate rules.
     sparkSession.udf.register("preAgg", () => "")
-    if (!initialized) {
-      // update carbon session parameters , preserve thread parameters
-      val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-      carbonSessionInfo = new CarbonSessionInfo()
-      sessionParams = carbonSessionInfo.getSessionParams
-      if (currentThreadSesssionInfo != null) {
-        carbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams)
-      }
-      carbonSessionInfo.setSessionParams(sessionParams)
-      ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
-      val config = new CarbonSQLConf(sparkSession)
-      if(sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT) == None) {
-        config.addDefaultCarbonParams()
-      }
-      // add session params after adding DefaultCarbonParams
-      config.addDefaultCarbonSessionParams()
-      carbonMetastore = {
-        val properties = CarbonProperties.getInstance()
-        storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
-        if (storePath == null) {
-          storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
-          properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+    synchronized {
+      if (!initialized) {
+        // update carbon session parameters , preserve thread parameters
+        val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
+        carbonSessionInfo = new CarbonSessionInfo()
+        sessionParams = carbonSessionInfo.getSessionParams
+        if (currentThreadSesssionInfo != null) {
+          carbonSessionInfo.setThreadParams(currentThreadSesssionInfo.getThreadParams)
         }
-        LOGGER.info(s"carbon env initial: $storePath")
-        // trigger event for CarbonEnv init
-        val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
-          CarbonEnvInitPreEvent(sparkSession, storePath)
-        OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent)
-
-        CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
+        carbonSessionInfo.setSessionParams(sessionParams)
+        ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfo)
+        val config = new CarbonSQLConf(sparkSession)
+        if (sparkSession.conf.getOption(CarbonCommonConstants.ENABLE_UNSAFE_SORT) == None) {
+          config.addDefaultCarbonParams()
+        }
+        // add session params after adding DefaultCarbonParams
+        config.addDefaultCarbonSessionParams()
+        carbonMetastore = {
+          val properties = CarbonProperties.getInstance()
+          storePath = properties.getProperty(CarbonCommonConstants.STORE_LOCATION)
+          if (storePath == null) {
+            storePath = sparkSession.conf.get("spark.sql.warehouse.dir")
+            properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
+          }
+          LOGGER.info(s"carbon env initial: $storePath")
+          // trigger event for CarbonEnv init
+          val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
+            CarbonEnvInitPreEvent(sparkSession, storePath)
+          OperationListenerBus.getInstance.fireEvent(carbonEnvInitPreEvent)
+
+          CarbonMetaStoreFactory.createCarbonMetaStore(sparkSession.conf)
+        }
+        CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
+        initialized = true
       }
-      CarbonProperties.getInstance.addProperty(CarbonCommonConstants.IS_DRIVER_INSTANCE, "true")
-      initialized = true
     }
   }
 }