You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/01/27 11:27:45 UTC
[carbondata] branch master updated:
[CARBONDATA-3586][CARBONDATA-3587][CARBONDATA-3595]Adding valid segments
into segments to be refreshed map before inserting segments to index server
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 7a0b592 [CARBONDATA-3586][CARBONDATA-3587][CARBONDATA-3595]Adding valid segments into segments to be refreshed map before inserting segments to index server
7a0b592 is described below
commit 7a0b5926b52f4b5aad2e89e2016ecadd25fe5467
Author: Vikram Ahuja <vi...@gmail.com>
AuthorDate: Mon Nov 18 11:43:30 2019 +0530
[CARBONDATA-3586][CARBONDATA-3587][CARBONDATA-3595]Adding valid segments into segments
to be refreshed map before inserting segments to index server
Modification reason:
After select query the cache is doubled and the drop metacache is not removing the
cache in Spark 2.1 Carbon for the Index Server
Modification Content:
The preprimed segments were considered as invalid segments during the select query
as datamapstoremanager had no information of them and was sent to a different executor
during select query which was inturn doubling the cache. So the valid segments before
prepriming are sent into the map for them to be considered as valid segments during
select query and thus the query going into the same Index Server executor as during load.
This closes #3466
---
.../carbondata/indexserver/DataMapJobs.scala | 1 +
.../indexserver/DistributedRDDUtils.scala | 35 +++++++++++++---------
.../carbondata/indexserver/IndexServer.scala | 12 +++++---
.../command/mutation/DeleteExecution.scala | 6 ++--
.../processing/datamap/DataMapWriterListener.java | 3 ++
5 files changed, 36 insertions(+), 21 deletions(-)
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 6d8a467..4986cba 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -131,6 +131,7 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
}
override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = {
+ dataMapFormat.setFallbackJob()
IndexServer.getCount(dataMapFormat).get()
}
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index ebcfeea..854f976 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -28,11 +28,12 @@ import org.apache.spark.Partition
import org.apache.spark.sql.SparkSession
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.{DataMapDistributable, Segment}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapStoreManager, Segment}
import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
+import org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope, TableStatusReadCommittedScope}
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, OperationListenerBus}
@@ -358,23 +359,29 @@ object DistributedRDDUtils {
conf: Configuration,
segmentId: List[String]): Unit = {
if (carbonTable.isTransactionalTable) {
- val readCommittedScope = new TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
- carbonTable.getTablePath), conf)
- val validSegments: Seq[Segment] = segmentId.map {
- segmentToPrime =>
- val loadDetailsForCurrentSegment = readCommittedScope
- .getSegmentList.find(_.getLoadName.equalsIgnoreCase(segmentToPrime)).get
-
- new Segment(segmentToPrime,
- loadDetailsForCurrentSegment.getSegmentFile,
- readCommittedScope,
- loadDetailsForCurrentSegment)
- }
val indexServerEnabled = CarbonProperties.getInstance().isDistributedPruningEnabled(
carbonTable.getDatabaseName, carbonTable.getTableName)
val prePrimingEnabled = CarbonProperties.getInstance().isIndexServerPrePrimingEnabled
if (indexServerEnabled && prePrimingEnabled) {
LOGGER.info(s" Loading segments for the table: ${ carbonTable.getTableName } in the cache")
+ val readCommittedScope = new TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
+ carbonTable.getTablePath), conf)
+ val validSegments: Seq[Segment] = segmentId.map {
+ segmentToPrime =>
+ val loadDetailsForCurrentSegment = readCommittedScope
+ .getSegmentList.find(_.getLoadName.equalsIgnoreCase(segmentToPrime)).get
+
+ new Segment(segmentToPrime,
+ loadDetailsForCurrentSegment.getSegmentFile,
+ readCommittedScope,
+ loadDetailsForCurrentSegment)
+ }
+ val updateStatusManager =
+ new SegmentUpdateStatusManager(carbonTable, readCommittedScope.getSegmentList)
+ // Adding valid segments to segments to be refreshed, so that the select query
+ // goes in the same executor.
+ DataMapStoreManager.getInstance
+ .getSegmentsToBeRefreshed(carbonTable, updateStatusManager, validSegments.toList.asJava)
val indexServerLoadEvent: IndexServerLoadEvent =
IndexServerLoadEvent(
sparkSession,
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index fc2df80..3042571 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -124,22 +124,26 @@ object IndexServer extends ServerInterface {
def getCount(request: DistributableDataMapFormat): LongWritable = {
doAs {
val sparkSession = SparkSQLUtil.getSparkSession
+ var currentUser: String = null
+ if (!request.isFallbackJob) {
+ currentUser = Server.getRemoteUser.getShortUserName
+ }
lazy val getCountTask = {
if (!request.isFallbackJob) {
sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId)
sparkSession.sparkContext
.setLocalProperty("spark.job.description", request.getTaskGroupDesc)
}
+ // Fire Generic Event like ACLCheck..etc
+ val indexServerEvent = IndexServerEvent(sparkSession, request.getCarbonTable,
+ currentUser)
+ OperationListenerBus.getInstance().fireEvent(indexServerEvent, operationContext)
val splits = new DistributedCountRDD(sparkSession, request).collect()
if (!request.isFallbackJob) {
DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
}
new LongWritable(splits.map(_._2.toLong).sum)
}
- // Fire Generic Event like ACLCheck..etc
- val indexServerEvent = IndexServerEvent(sparkSession, request.getCarbonTable,
- Server.getRemoteUser.getShortUserName)
- OperationListenerBus.getInstance().fireEvent(indexServerEvent, operationContext)
if (request.ifAsyncCall) {
submitAsyncTask(getCountTask)
new LongWritable(0)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index d744e96..608fdbe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -418,13 +418,13 @@ object DeleteExecution {
def reloadDistributedSegmentCache(carbonTable: CarbonTable, deletedSegments: Seq[Segment],
operationContext: OperationContext)(sparkSession: SparkSession): Unit = {
if (carbonTable.isTransactionalTable) {
- val readCommittedScope = new TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
- carbonTable.getTablePath), FileFactory.getConfiguration)
- deletedSegments.foreach(_.setReadCommittedScope(readCommittedScope))
val indexServerEnabled = CarbonProperties.getInstance().isDistributedPruningEnabled(
carbonTable.getDatabaseName, carbonTable.getTableName)
val prePrimingEnabled = CarbonProperties.getInstance().isIndexServerPrePrimingEnabled()
if (indexServerEnabled && prePrimingEnabled) {
+ val readCommittedScope = new TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
+ carbonTable.getTablePath), FileFactory.getConfiguration)
+ deletedSegments.foreach(_.setReadCommittedScope(readCommittedScope))
LOGGER.info(s"Loading segments for table: ${ carbonTable.getTableName } in the cache")
val indexServerLoadEvent: IndexServerLoadEvent =
IndexServerLoadEvent(
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 0aa69f5..bdcd624 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -62,6 +62,9 @@ public class DataMapWriterListener {
*/
public void registerAllWriter(CarbonTable carbonTable, String segmentId,
String taskNo, SegmentProperties segmentProperties) {
+ // clear cache in executor side
+ DataMapStoreManager.getInstance()
+ .clearDataMaps(carbonTable.getTableId());
List<TableDataMap> tableIndices;
try {
tableIndices = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);