You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/08/18 09:07:04 UTC

[carbondata] branch master updated: [CARBONDATA-3919] Improve concurrent query performance

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cca062  [CARBONDATA-3919] Improve concurrent query performance
1cca062 is described below

commit 1cca06233b957aa6bf7874991248f5bce0670131
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Wed Jul 22 16:43:15 2020 +0530

    [CARBONDATA-3919] Improve concurrent query performance
    
    Why is this PR needed?
    1. when 500 queries executed concurrently.
    checkIfRefreshIsNeeded method was synchronized. so only one thread was working at a time.
    But actually synchronization is required only when schema modified to drop tables. Not
    for whole function.
    2. TokenCache.obtainTokensForNamenodes was causing a performance bottleneck for concurrent
    queries.so, removed it
    
    What changes were proposed in this PR?
    1. Synchronize only remove table part. Observed 500 query total performance improved from
    10s to 3 seconds in cluster.
    2. Avoid calling the API.
    
    This closes #3858
---
 .../carbondata/hadoop/api/CarbonInputFormat.java     |  4 ----
 .../apache/spark/sql/hive/CarbonFileMetastore.scala  | 20 ++++++++++++++++----
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 130e0d9..557fbfa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -84,7 +84,6 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.log4j.Logger;
 
 /**
@@ -472,9 +471,6 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
     QueryStatistic statistic = new QueryStatistic();
 
-    // get tokens for all the required FileSystem for table path
-    TokenCache.obtainTokensForNamenodes(job.getCredentials(),
-        new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration());
     List<ExtendedBlocklet> prunedBlocklets =
         getPrunedBlocklets(job, carbonTable, expression, segmentIds, invalidSegments,
             segmentsToBeRefreshed);
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index c9f78b5..b16579e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -67,7 +67,7 @@ private object CarbonFileMetastore {
   final val tableModifiedTimeStore = new ConcurrentHashMap[String, Long]()
 
   def checkIfRefreshIsNeeded(absoluteTableIdentifier: AbsoluteTableIdentifier,
-      localTimeStamp: Long): Boolean = synchronized {
+      localTimeStamp: Long): Boolean = {
     val schemaFilePath = CarbonTablePath.getSchemaFilePath(absoluteTableIdentifier.getTablePath)
     val schemaCarbonFile = FileFactory.getCarbonFile(schemaFilePath)
     if (schemaCarbonFile.exists()) {
@@ -81,9 +81,21 @@ private object CarbonFileMetastore {
         case None => true
       }
       if (isSchemaModified) {
-        CarbonMetadata.getInstance().removeTable(absoluteTableIdentifier
-          .getCarbonTableIdentifier.getTableUniqueName)
-        IndexStoreManager.getInstance().clearIndex(absoluteTableIdentifier)
+        if (CarbonMetadata.getInstance()
+              .getCarbonTable(absoluteTableIdentifier
+                .getCarbonTableIdentifier
+                .getTableUniqueName) != null) {
+          synchronized {
+            if (CarbonMetadata.getInstance()
+                  .getCarbonTable(absoluteTableIdentifier
+                    .getCarbonTableIdentifier
+                    .getTableUniqueName) != null) {
+              CarbonMetadata.getInstance().removeTable(absoluteTableIdentifier
+                .getCarbonTableIdentifier.getTableUniqueName)
+              IndexStoreManager.getInstance().clearIndex(absoluteTableIdentifier)
+            }
+          }
+        }
         true
       } else {
         localTimeStamp != newTime