You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/06/26 09:12:25 UTC
[carbondata] branch master updated: [CARBONDATA-3449] Synchronize
the initialization of listeners in case of concuurent scenarios
This is an automated email from the ASF dual-hosted git repository.
kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new b98e183 [CARBONDATA-3449] Synchronize the initialization of listeners in case of concuurent scenarios
b98e183 is described below
commit b98e183f1546f577880c414b1c1649264ff2fd7d
Author: manishnalla1994 <ma...@gmail.com>
AuthorDate: Sat Jun 22 11:27:41 2019 +0530
[CARBONDATA-3449] Synchronize the initialization of listeners in case of concuurent scenarios
Problem: Initialization of listeners in case of concurrent scenarios is not synchronized.
Solution: Changed the function to a val due to which the synchronization will be handled by scala and init will only occur once.
This closes #3304
---
.../main/java/org/apache/carbondata/events/OperationListenerBus.java | 2 +-
.../org/apache/spark/sql/hive/CarbonInMemorySessionState.scala | 2 +-
.../org/apache/spark/sql/hive/CarbonSessionState.scala | 2 +-
.../spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala | 5 +++--
.../main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala | 2 +-
5 files changed, 7 insertions(+), 6 deletions(-)
diff --git a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
index 5f9a05c..3f652e2 100644
--- a/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
+++ b/core/src/main/java/org/apache/carbondata/events/OperationListenerBus.java
@@ -53,7 +53,7 @@ public class OperationListenerBus {
* @param eventClass
* @param operationEventListener
*/
- public OperationListenerBus addListener(Class<? extends Event> eventClass,
+ public synchronized OperationListenerBus addListener(Class<? extends Event> eventClass,
OperationEventListener operationEventListener) {
String eventType = eventClass.getName();
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
index e286fba..5dfb16d 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonInMemorySessionState.scala
@@ -146,7 +146,7 @@ class InMemorySessionCatalog(
}
// Initialize all listeners to the Operation bus.
- CarbonEnv.initListeners()
+ CarbonEnv.init
def getThriftTableInfo(tablePath: String): TableInfo = {
val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
diff --git a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
index 08cf3cc..f991a78 100644
--- a/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/commonTo2.2And2.3/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -83,7 +83,7 @@ class CarbonHiveSessionCatalog(
}
// Initialize all listeners to the Operation bus.
- CarbonEnv.initListeners()
+ CarbonEnv.init
override def lookupRelation(name: TableIdentifier): LogicalPlan = {
val rtnRelation = super.lookupRelation(name)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 094d298..e7a6d65 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -149,9 +149,10 @@ object CarbonEnv {
* Method
* 1. To initialize Listeners to their respective events in the OperationListenerBus
* 2. To register common listeners
- *
+ * 3. Only initialize once for all the listeners in case of concurrent scenarios we have given
+ * val, as val initializes once
*/
- def init(sparkSession: SparkSession): Unit = {
+ val init = {
initListeners
}
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 5caa4dd..26f778e 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -108,7 +108,7 @@ class CarbonHiveSessionCatalog(
}
// Initialize all listeners to the Operation bus.
- CarbonEnv.init(sparkSession)
+ CarbonEnv.init
/**
* This method will invalidate carbonrelation from cache if carbon table is updated in