You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/09/20 10:47:59 UTC

[carbondata] branch master updated: [CARBONDATA-4288][CARBONDATA-4289] Fix various issues with Index Server caching mechanism.

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ce860d0  [CARBONDATA-4288][CARBONDATA-4289] Fix various issues with Index Server caching mechanism.
ce860d0 is described below

commit ce860d0431f41e0594cee848149cdc344f728dd3
Author: Vikram Ahuja <vi...@gmail.com>
AuthorDate: Fri Sep 17 12:54:01 2021 +0530

    [CARBONDATA-4288][CARBONDATA-4289] Fix various issues with Index Server caching mechanism.
    
    Why is this PR needed?
    There are 2 issues in the Index Server flow:
    In case when there is a main table with a SI table with prepriming disabled and index serve
    enabled, new load to main table and SI table put the cache for the main table in the index
    server. Cache is also getting again when a select query is fired. This issue happens because
    during load to SI table, getSplits is called on the main table segment which is in Insert In
    Progress state. Index server considers this segment as a legacy segment because it's index
    size = 0 and does not put it's entry in the tableToExecutor mapping. In the getsplits method
    isRefreshneeded is false the first time getSplits is called. During the select query, in
    getSplits method isRefreshNeeded is true and the previous loaded entry is removed from the
    driver but since there is no entry for that table in tableToExecutor mapping, the previous
    cache value becomes dead cache and always stays in the index server. The newly loaded cache
    is loaded to a new executor and 2 copies of cache for the same segment is being mantained.
    Concurrent select queries to the index server shows wrong cache values in the Index server.
    
    What changes were proposed in this PR?
    The following changes are proposed to the index server code:
    Removing cache object from the index server in case the segment is INSERT IN PROGRESS and
    in the case of legacy segment adding the value in tabeToExecutor mappping so that the cache
    is also removed from the executor side.
    Concurrent queries were able adding duplicate cache values to other executors. Changed logic
    of assign executors method so that concurrent queries are not able to add cache for same segment
    in other executors
    
    This closes #4219
---
 .../indexserver/DistributedPruneRDD.scala          | 11 ++++
 .../indexserver/DistributedRDDUtils.scala          | 76 +++++++++++++++-------
 .../indexserver/DistributedRDDUtilsTest.scala      | 26 +++++++-
 3 files changed, 88 insertions(+), 25 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
index dac47ff..bd0b5c9 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
@@ -115,6 +115,17 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
       // scalastyle:on
       service.shutdownNow()
       val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD].getName)
+      // remove the cache of Inprogress segments, this case is required during the case of
+      // loading to SI. We do get splits of main table in case of Inprogress segment. No
+      // need to load it to the cache.
+      val inProgressSegments = indexInputFormat.getValidSegments.asScala.collect {
+        case seg if DistributedRDDUtils
+          .isSegmentInProgress(indexInputFormat, seg.getSegmentNo) => seg.getSegmentNo
+      }
+      if (inProgressSegments.nonEmpty) {
+        IndexStoreManager.getInstance().clearInvalidSegments(indexInputFormat.getCarbonTable,
+          inProgressSegments.asJava)
+      }
       LOGGER.info(s"Time taken to collect ${ inputSplits.size } blocklets : " +
                   (System.currentTimeMillis() - startTime))
       val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index 1994e15..41db8f1 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlockletWrapper
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope, TableStatusReadCommittedScope}
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
@@ -108,7 +108,25 @@ object DistributedRDDUtils {
           val wrapper: IndexInputSplit = legacySegment
             .asInstanceOf[IndexInputSplitWrapper].getDistributable
           val executor = validExecutorIds(index % validExecutorIds.length)
-          wrapper.setLocations(Array("executor_" + executor))
+          // Below code is used to support concurrent queries on same segment going to
+          // the same executor for caching. Putting a new HashMap in tableToExecutorMapping
+          // and whichever query is able to put entry in existingSegmentMapping decides the
+          // executor which will be used later on. For one thread oldMapping will be null and
+          // for the second it will be the executor returned which will be used.
+          tableToExecutorMapping.putIfAbsent(tableUniqueName,
+            new ConcurrentHashMap[String, String]())
+          val existingSegmentMapping = tableToExecutorMapping.get(tableUniqueName)
+          val oldMapping = existingSegmentMapping.putIfAbsent(wrapper.getSegment.getSegmentNo,
+            s"${executor}")
+          if (oldMapping == null) {
+            val newSegmentMapping = new ConcurrentHashMap[String, String]()
+            newSegmentMapping.put(wrapper.getSegment.getSegmentNo, s"${executor}")
+            tableToExecutorMapping.putIfAbsent(tableUniqueName, newSegmentMapping)
+            wrapper.setLocations(Array("executor_" + executor))
+          } else {
+            wrapper.setLocations(Array("executor_" + existingSegmentMapping
+              .get(wrapper.getSegment.getSegmentNo)))
+          }
           legacySegment
       }
     } else { Seq() }
@@ -164,6 +182,15 @@ object DistributedRDDUtils {
     }
   }
 
+  def isSegmentInProgress(request: IndexInputFormat, segment: String): Boolean = {
+    request.getReadCommittedScope.getSegmentList.find(_.getLoadName
+      .equalsIgnoreCase(segment)) match {
+      case Some(value) => value.getSegmentStatus.equals(SegmentStatus.INSERT_IN_PROGRESS) || value
+        .getSegmentStatus.equals(SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)
+      case None => false
+    }
+  }
+
   /**
    * Remove the table mapping from index server when the table is dropped.
    */
@@ -313,31 +340,32 @@ object DistributedRDDUtils {
           case None => throw new RuntimeException("Could not find any alive executors.")
         }
       }
-      val existingExecutorMapping = executorToCacheSizeMapping.get(newHost)
-      if (existingExecutorMapping != null) {
-        val existingSize = existingExecutorMapping.get(newExecutor)
-        if (existingSize != null) {
-          existingExecutorMapping.put(newExecutor, existingSize + segment.getIndexSize
-            .toInt)
-        } else {
-          existingExecutorMapping.put(newExecutor, segment.getIndexSize
-            .toInt)
-        }
-      } else {
-        val newExecutorMapping = new ConcurrentHashMap[String, Long]()
-        newExecutorMapping.put(newExecutor, segment.getIndexSize)
-        executorToCacheSizeMapping.put(newHost, newExecutorMapping)
-      }
+      tableToExecutorMapping.putIfAbsent(tableUniqueName, new ConcurrentHashMap[String, String]())
       val existingSegmentMapping = tableToExecutorMapping.get(tableUniqueName)
-      if (existingSegmentMapping == null) {
-        val newSegmentMapping = new ConcurrentHashMap[String, String]()
-        newSegmentMapping.put(segment.getSegmentNo, s"${newHost}_$newExecutor")
-        tableToExecutorMapping.putIfAbsent(tableUniqueName, newSegmentMapping)
+      val oldMapping = existingSegmentMapping.putIfAbsent(segment.getSegmentNo,
+        s"${ newHost }_$newExecutor")
+      if (oldMapping == null) {
+        updateCacheSize(newHost, newExecutor, segment)
+        s"executor_${newHost}_$newExecutor"
       } else {
-        existingSegmentMapping.putIfAbsent(segment.getSegmentNo, s"${newHost}_$newExecutor")
-        tableToExecutorMapping.putIfAbsent(tableUniqueName, existingSegmentMapping)
+        s"executor_$oldMapping"
+      }
+    }
+  }
+
+  private def updateCacheSize(host: String, executor: String, segment: Segment) = {
+    val existingExecutorMapping = executorToCacheSizeMapping.get(host)
+    if (existingExecutorMapping != null) {
+      val existingSize = existingExecutorMapping.get(executor)
+      var totalSize = segment.getIndexSize
+      if (existingSize != null) {
+        totalSize += existingSize
       }
-      s"executor_${newHost}_$newExecutor"
+      existingExecutorMapping.put(executor, totalSize.toInt)
+    } else {
+      val newExecutorMapping = new ConcurrentHashMap[String, Long]()
+      newExecutorMapping.put(executor, segment.getIndexSize)
+      executorToCacheSizeMapping.put(host, newExecutorMapping)
     }
   }
 
diff --git a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
index ec75441..bc7ba13 100644
--- a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
@@ -17,7 +17,7 @@
 
 package org.apache.indexserver
 
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, Executors}
 
 import scala.collection.JavaConverters._
 
@@ -226,4 +226,28 @@ class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
     assert(FileFactory.isFileExist(indexServerTempFolder))
     assert(FileFactory.isFileExist(tmpPathAnother))
   }
+
+  test("test concurrent assigning of executors") {
+    executorCache.clear()
+    tableCache.clear()
+    // val executorsList: Map[String, Seq[String]]
+    val executorList = Map("EX1" -> Seq("1"), "EX2" -> Seq("2"))
+    val seg = new Segment("5")
+    seg.setIndexSize(10)
+    val executorService = Executors.newFixedThreadPool(8)
+    for (num <- 1 to 8) {
+      executorService.submit(
+        new Runnable {
+          override def run(): Unit = {
+            DistributedRDDUtils.assignExecutor("tablename", seg, executorList)
+          }
+        }).get()
+    }
+    executorService.shutdownNow()
+    assert(executorCache.size() == 1)
+    assert(executorCache.entrySet().iterator().next().getValue
+      .entrySet().iterator().next().getValue == 10)
+    executorCache.clear()
+    tableCache.clear()
+  }
 }