You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/26 09:12:25 UTC

[carbondata] branch master updated: [CARBONDATA-3449] Synchronize the initialization of listeners in case of concuurent scenarios

This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b98e183  [CARBONDATA-3449] Synchronize the initialization of listeners in case of concuurent scenarios
b98e183 is described below

commit b98e183f1546f577880c414b1c1649264ff2fd7d
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Sat Jun 22 11:27:41 2019 +0530

    [CARBONDATA-3449] Synchronize the initialization of listeners in case of concuurent scenarios
    
    Problem: Initialization of listeners in case of concurrent scenarios is not synchronized.
    
    Solution: Changed the function to a val due to which the synchronization will be handled by scala and init will only occur once.
    
    This closes #3304
---
 .../main/java/org/apache/carbondata/events/OperationListenerBus.java | 2 +-
 .../org/apache/spark/sql/hive/CarbonInMemorySessionState.scala       | 2 +-
 .../org/apache/spark/sql/hive/CarbonSessionState.scala               | 2 +-
 .../spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala       | 5 +++--
 .../main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala | 2 +-
 5 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
index 5f9a05c..3f652e2 100644
--- a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
+++ b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
@@ -53,7 +53,7 @@ public class OperationListenerBus {
    * @param eventClass
    * @param operationEventListener
    */
-  public OperationListenerBus addListener(Class<? extends Event> eventClass,
+  public synchronized OperationListenerBus addListener(Class<? extends Event> eventClass,
       OperationEventListener operationEventListener) {
 
     String eventType = eventClass.getName();
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
index e286fba..5dfb16d 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
@@ -146,7 +146,7 @@ class InMemorySessionCatalog(
   }
 
   // Initialize all listeners to the Operation bus.
-  CarbonEnv.initListeners()
+  CarbonEnv.init
 
   def getThriftTableInfo(tablePath: String): TableInfo = {
     val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
index 08cf3cc..f991a78 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -83,7 +83,7 @@ class CarbonHiveSessionCatalog(
   }
 
   // Initialize all listeners to the Operation bus.
-  CarbonEnv.initListeners()
+  CarbonEnv.init
 
   override def lookupRelation(name: TableIdentifier): LogicalPlan = {
     val rtnRelation = super.lookupRelation(name)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 094d298..e7a6d65 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -149,9 +149,10 @@ object CarbonEnv {
    * Method
    * 1. To initialize Listeners to their respective events in the OperationListenerBus
    * 2. To register common listeners
-   *
+   * 3. Only initialize once for all the listeners in case of concurrent scenarios we have given
+   * val, as val initializes once
    */
-  def init(sparkSession: SparkSession): Unit = {
+  val init = {
     initListeners
   }
 
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 5caa4dd..26f778e 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -108,7 +108,7 @@ class CarbonHiveSessionCatalog(
   }
 
   // Initialize all listeners to the Operation bus.
-  CarbonEnv.init(sparkSession)
+  CarbonEnv.init
 
   /**
    * This method will invalidate carbonrelation from cache if carbon table is updated in