You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/08/18 09:07:04 UTC
[carbondata] branch master updated: [CARBONDATA-3919] Improve
concurrent query performance
This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 1cca062 [CARBONDATA-3919] Improve concurrent query performance
1cca062 is described below
commit 1cca06233b957aa6bf7874991248f5bce0670131
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Wed Jul 22 16:43:15 2020 +0530
[CARBONDATA-3919] Improve concurrent query performance
Why is this PR needed?
1. when 500 queries executed concurrently.
checkIfRefreshIsNeeded method was synchronized. so only one thread was working at a time.
But actually synchronization is required only when schema modified to drop tables. Not
for whole function.
2. TokenCache.obtainTokensForNamenodes was causing a performance bottleneck for concurrent
queries.so, removed it
What changes were proposed in this PR?
1. Synchronize only remove table part. Observed 500 query total performance improved from
10s to 3 seconds in cluster.
2. Avoid calling the API.
This closes #3858
---
.../carbondata/hadoop/api/CarbonInputFormat.java | 4 ----
.../apache/spark/sql/hive/CarbonFileMetastore.scala | 20 ++++++++++++++++----
2 files changed, 16 insertions(+), 8 deletions(-)
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 130e0d9..557fbfa 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -84,7 +84,6 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.log4j.Logger;
/**
@@ -472,9 +471,6 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder();
QueryStatistic statistic = new QueryStatistic();
- // get tokens for all the required FileSystem for table path
- TokenCache.obtainTokensForNamenodes(job.getCredentials(),
- new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration());
List<ExtendedBlocklet> prunedBlocklets =
getPrunedBlocklets(job, carbonTable, expression, segmentIds, invalidSegments,
segmentsToBeRefreshed);
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index c9f78b5..b16579e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -67,7 +67,7 @@ private object CarbonFileMetastore {
final val tableModifiedTimeStore = new ConcurrentHashMap[String, Long]()
def checkIfRefreshIsNeeded(absoluteTableIdentifier: AbsoluteTableIdentifier,
- localTimeStamp: Long): Boolean = synchronized {
+ localTimeStamp: Long): Boolean = {
val schemaFilePath = CarbonTablePath.getSchemaFilePath(absoluteTableIdentifier.getTablePath)
val schemaCarbonFile = FileFactory.getCarbonFile(schemaFilePath)
if (schemaCarbonFile.exists()) {
@@ -81,9 +81,21 @@ private object CarbonFileMetastore {
case None => true
}
if (isSchemaModified) {
- CarbonMetadata.getInstance().removeTable(absoluteTableIdentifier
- .getCarbonTableIdentifier.getTableUniqueName)
- IndexStoreManager.getInstance().clearIndex(absoluteTableIdentifier)
+ if (CarbonMetadata.getInstance()
+ .getCarbonTable(absoluteTableIdentifier
+ .getCarbonTableIdentifier
+ .getTableUniqueName) != null) {
+ synchronized {
+ if (CarbonMetadata.getInstance()
+ .getCarbonTable(absoluteTableIdentifier
+ .getCarbonTableIdentifier
+ .getTableUniqueName) != null) {
+ CarbonMetadata.getInstance().removeTable(absoluteTableIdentifier
+ .getCarbonTableIdentifier.getTableUniqueName)
+ IndexStoreManager.getInstance().clearIndex(absoluteTableIdentifier)
+ }
+ }
+ }
true
} else {
localTimeStamp != newTime