You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/01/27 11:27:45 UTC

[carbondata] branch master updated: [CARBONDATA-3586][CARBONDATA-3587][CARBONDATA-3595]Adding valid segments into segments to be refreshed map before inserting segments to index server

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a0b592  [CARBONDATA-3586][CARBONDATA-3587][CARBONDATA-3595]Adding valid segments into segments to be refreshed map before inserting segments to index server
7a0b592 is described below

commit 7a0b5926b52f4b5aad2e89e2016ecadd25fe5467
Author: Vikram Ahuja <vi...@gmail.com>
AuthorDate: Mon Nov 18 11:43:30 2019 +0530

    [CARBONDATA-3586][CARBONDATA-3587][CARBONDATA-3595]Adding valid segments into segments
    to be refreshed map before inserting segments to index server
    
    Modification reason:
    After select query the cache is doubled and the drop metacache is not removing the
    cache in Spark 2.1 Carbon for the Index Server
    
    Modification Content:
    The preprimed segments were considered as invalid segments during the select query
    as datamapstoremanager had no information of them and was sent to a different executor
    during select query which was inturn doubling the cache. So the valid segments before
    prepriming are sent into the map for them to be considered as valid segments during
    select query and thus the query going into the same Index Server executor as during load.
    
    This closes #3466
---
 .../carbondata/indexserver/DataMapJobs.scala       |  1 +
 .../indexserver/DistributedRDDUtils.scala          | 35 +++++++++++++---------
 .../carbondata/indexserver/IndexServer.scala       | 12 +++++---
 .../command/mutation/DeleteExecution.scala         |  6 ++--
 .../processing/datamap/DataMapWriterListener.java  |  3 ++
 5 files changed, 36 insertions(+), 21 deletions(-)

diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
index 6d8a467..4986cba 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DataMapJobs.scala
@@ -131,6 +131,7 @@ class EmbeddedDataMapJob extends AbstractDataMapJob {
   }
 
   override def executeCountJob(dataMapFormat: DistributableDataMapFormat): java.lang.Long = {
+    dataMapFormat.setFallbackJob()
     IndexServer.getCount(dataMapFormat).get()
   }
 
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index ebcfeea..854f976 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -28,11 +28,12 @@ import org.apache.spark.Partition
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.{DataMapDistributable, Segment}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope
+import org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope, TableStatusReadCommittedScope}
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, OperationListenerBus}
 
@@ -358,23 +359,29 @@ object DistributedRDDUtils {
       conf: Configuration,
       segmentId: List[String]): Unit = {
     if (carbonTable.isTransactionalTable) {
-      val readCommittedScope = new TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
-        carbonTable.getTablePath), conf)
-      val validSegments: Seq[Segment] = segmentId.map {
-        segmentToPrime =>
-          val loadDetailsForCurrentSegment = readCommittedScope
-            .getSegmentList.find(_.getLoadName.equalsIgnoreCase(segmentToPrime)).get
-
-          new Segment(segmentToPrime,
-            loadDetailsForCurrentSegment.getSegmentFile,
-            readCommittedScope,
-            loadDetailsForCurrentSegment)
-      }
       val indexServerEnabled = CarbonProperties.getInstance().isDistributedPruningEnabled(
         carbonTable.getDatabaseName, carbonTable.getTableName)
       val prePrimingEnabled = CarbonProperties.getInstance().isIndexServerPrePrimingEnabled
       if (indexServerEnabled && prePrimingEnabled) {
         LOGGER.info(s" Loading segments for the table: ${ carbonTable.getTableName } in the cache")
+        val readCommittedScope = new TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
+          carbonTable.getTablePath), conf)
+        val validSegments: Seq[Segment] = segmentId.map {
+          segmentToPrime =>
+            val loadDetailsForCurrentSegment = readCommittedScope
+              .getSegmentList.find(_.getLoadName.equalsIgnoreCase(segmentToPrime)).get
+
+            new Segment(segmentToPrime,
+              loadDetailsForCurrentSegment.getSegmentFile,
+              readCommittedScope,
+              loadDetailsForCurrentSegment)
+        }
+        val updateStatusManager =
+          new SegmentUpdateStatusManager(carbonTable, readCommittedScope.getSegmentList)
+        // Adding valid segments to segments to be refreshed, so that the select query
+        // goes in the same executor.
+        DataMapStoreManager.getInstance
+          .getSegmentsToBeRefreshed(carbonTable, updateStatusManager, validSegments.toList.asJava)
         val indexServerLoadEvent: IndexServerLoadEvent =
           IndexServerLoadEvent(
             sparkSession,
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
index fc2df80..3042571 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/indexserver/IndexServer.scala
@@ -124,22 +124,26 @@ object IndexServer extends ServerInterface {
   def getCount(request: DistributableDataMapFormat): LongWritable = {
     doAs {
       val sparkSession = SparkSQLUtil.getSparkSession
+      var currentUser: String = null
+      if (!request.isFallbackJob) {
+        currentUser = Server.getRemoteUser.getShortUserName
+      }
       lazy val getCountTask = {
         if (!request.isFallbackJob) {
           sparkSession.sparkContext.setLocalProperty("spark.jobGroup.id", request.getTaskGroupId)
           sparkSession.sparkContext
             .setLocalProperty("spark.job.description", request.getTaskGroupDesc)
         }
+        // Fire Generic Event like ACLCheck..etc
+        val indexServerEvent = IndexServerEvent(sparkSession, request.getCarbonTable,
+          currentUser)
+        OperationListenerBus.getInstance().fireEvent(indexServerEvent, operationContext)
         val splits = new DistributedCountRDD(sparkSession, request).collect()
         if (!request.isFallbackJob) {
           DistributedRDDUtils.updateExecutorCacheSize(splits.map(_._1).toSet)
         }
         new LongWritable(splits.map(_._2.toLong).sum)
       }
-      // Fire Generic Event like ACLCheck..etc
-      val indexServerEvent = IndexServerEvent(sparkSession, request.getCarbonTable,
-        Server.getRemoteUser.getShortUserName)
-      OperationListenerBus.getInstance().fireEvent(indexServerEvent, operationContext)
       if (request.ifAsyncCall) {
         submitAsyncTask(getCountTask)
         new LongWritable(0)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index d744e96..608fdbe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -418,13 +418,13 @@ object DeleteExecution {
   def reloadDistributedSegmentCache(carbonTable: CarbonTable, deletedSegments: Seq[Segment],
       operationContext: OperationContext)(sparkSession: SparkSession): Unit = {
     if (carbonTable.isTransactionalTable) {
-      val readCommittedScope = new TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
-        carbonTable.getTablePath), FileFactory.getConfiguration)
-      deletedSegments.foreach(_.setReadCommittedScope(readCommittedScope))
       val indexServerEnabled = CarbonProperties.getInstance().isDistributedPruningEnabled(
         carbonTable.getDatabaseName, carbonTable.getTableName)
       val prePrimingEnabled = CarbonProperties.getInstance().isIndexServerPrePrimingEnabled()
       if (indexServerEnabled && prePrimingEnabled) {
+        val readCommittedScope = new TableStatusReadCommittedScope(AbsoluteTableIdentifier.from(
+          carbonTable.getTablePath), FileFactory.getConfiguration)
+        deletedSegments.foreach(_.setReadCommittedScope(readCommittedScope))
         LOGGER.info(s"Loading segments for table: ${ carbonTable.getTableName } in the cache")
         val indexServerLoadEvent: IndexServerLoadEvent =
           IndexServerLoadEvent(
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 0aa69f5..bdcd624 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -62,6 +62,9 @@ public class DataMapWriterListener {
    */
   public void registerAllWriter(CarbonTable carbonTable, String segmentId,
       String taskNo, SegmentProperties segmentProperties) {
+    // clear cache in executor side
+    DataMapStoreManager.getInstance()
+        .clearDataMaps(carbonTable.getTableId());
     List<TableDataMap> tableIndices;
     try {
       tableIndices = DataMapStoreManager.getInstance().getAllDataMap(carbonTable);