You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/09/20 10:47:59 UTC
[carbondata] branch master updated:
[CARBONDATA-4288][CARBONDATA-4289] Fix various issues with Index Server
caching mechanism.
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new ce860d0 [CARBONDATA-4288][CARBONDATA-4289] Fix various issues with Index Server caching mechanism.
ce860d0 is described below
commit ce860d0431f41e0594cee848149cdc344f728dd3
Author: Vikram Ahuja <vi...@gmail.com>
AuthorDate: Fri Sep 17 12:54:01 2021 +0530
[CARBONDATA-4288][CARBONDATA-4289] Fix various issues with Index Server caching mechanism.
Why is this PR needed?
There are 2 issues in the Index Server flow:
In case when there is a main table with a SI table with prepriming disabled and index serve
enabled, new load to main table and SI table put the cache for the main table in the index
server. Cache is also getting again when a select query is fired. This issue happens because
during load to SI table, getSplits is called on the main table segment which is in Insert In
Progress state. Index server considers this segment as a legacy segment because it's index
size = 0 and does not put it's entry in the tableToExecutor mapping. In the getsplits method
isRefreshneeded is false the first time getSplits is called. During the select query, in
getSplits method isRefreshNeeded is true and the previous loaded entry is removed from the
driver but since there is no entry for that table in tableToExecutor mapping, the previous
cache value becomes dead cache and always stays in the index server. The newly loaded cache
is loaded to a new executor and 2 copies of cache for the same segment is being mantained.
Concurrent select queries to the index server shows wrong cache values in the Index server.
What changes were proposed in this PR?
The following changes are proposed to the index server code:
Removing cache object from the index server in case the segment is INSERT IN PROGRESS and
in the case of legacy segment adding the value in tabeToExecutor mappping so that the cache
is also removed from the executor side.
Concurrent queries were able adding duplicate cache values to other executors. Changed logic
of assign executors method so that concurrent queries are not able to add cache for same segment
in other executors
This closes #4219
---
.../indexserver/DistributedPruneRDD.scala | 11 ++++
.../indexserver/DistributedRDDUtils.scala | 76 +++++++++++++++-------
.../indexserver/DistributedRDDUtilsTest.scala | 26 +++++++-
3 files changed, 88 insertions(+), 25 deletions(-)
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
index dac47ff..bd0b5c9 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedPruneRDD.scala
@@ -115,6 +115,17 @@ private[indexserver] class DistributedPruneRDD(@transient private val ss: SparkS
// scalastyle:on
service.shutdownNow()
val LOGGER = LogServiceFactory.getLogService(classOf[DistributedPruneRDD].getName)
+ // remove the cache of Inprogress segments, this case is required during the case of
+ // loading to SI. We do get splits of main table in case of Inprogress segment. No
+ // need to load it to the cache.
+ val inProgressSegments = indexInputFormat.getValidSegments.asScala.collect {
+ case seg if DistributedRDDUtils
+ .isSegmentInProgress(indexInputFormat, seg.getSegmentNo) => seg.getSegmentNo
+ }
+ if (inProgressSegments.nonEmpty) {
+ IndexStoreManager.getInstance().clearInvalidSegments(indexInputFormat.getCarbonTable,
+ inProgressSegments.asJava)
+ }
LOGGER.info(s"Time taken to collect ${ inputSplits.size } blocklets : " +
(System.currentTimeMillis() - startTime))
val cacheSize = if (CacheProvider.getInstance().getCarbonCache != null) {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
index 1994e15..41db8f1 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/indexserver/DistributedRDDUtils.scala
@@ -32,7 +32,7 @@ import org.apache.carbondata.core.indexstore.ExtendedBlockletWrapper
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.readcommitter.{LatestFilesReadCommittedScope, TableStatusReadCommittedScope}
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentUpdateStatusManager}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{IndexServerLoadEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableInputFormat}
@@ -108,7 +108,25 @@ object DistributedRDDUtils {
val wrapper: IndexInputSplit = legacySegment
.asInstanceOf[IndexInputSplitWrapper].getDistributable
val executor = validExecutorIds(index % validExecutorIds.length)
- wrapper.setLocations(Array("executor_" + executor))
+ // Below code is used to support concurrent queries on same segment going to
+ // the same executor for caching. Putting a new HashMap in tableToExecutorMapping
+ // and whichever query is able to put entry in existingSegmentMapping decides the
+ // executor which will be used later on. For one thread oldMapping will be null and
+ // for the second it will be the executor returned which will be used.
+ tableToExecutorMapping.putIfAbsent(tableUniqueName,
+ new ConcurrentHashMap[String, String]())
+ val existingSegmentMapping = tableToExecutorMapping.get(tableUniqueName)
+ val oldMapping = existingSegmentMapping.putIfAbsent(wrapper.getSegment.getSegmentNo,
+ s"${executor}")
+ if (oldMapping == null) {
+ val newSegmentMapping = new ConcurrentHashMap[String, String]()
+ newSegmentMapping.put(wrapper.getSegment.getSegmentNo, s"${executor}")
+ tableToExecutorMapping.putIfAbsent(tableUniqueName, newSegmentMapping)
+ wrapper.setLocations(Array("executor_" + executor))
+ } else {
+ wrapper.setLocations(Array("executor_" + existingSegmentMapping
+ .get(wrapper.getSegment.getSegmentNo)))
+ }
legacySegment
}
} else { Seq() }
@@ -164,6 +182,15 @@ object DistributedRDDUtils {
}
}
+ def isSegmentInProgress(request: IndexInputFormat, segment: String): Boolean = {
+ request.getReadCommittedScope.getSegmentList.find(_.getLoadName
+ .equalsIgnoreCase(segment)) match {
+ case Some(value) => value.getSegmentStatus.equals(SegmentStatus.INSERT_IN_PROGRESS) || value
+ .getSegmentStatus.equals(SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)
+ case None => false
+ }
+ }
+
/**
* Remove the table mapping from index server when the table is dropped.
*/
@@ -313,31 +340,32 @@ object DistributedRDDUtils {
case None => throw new RuntimeException("Could not find any alive executors.")
}
}
- val existingExecutorMapping = executorToCacheSizeMapping.get(newHost)
- if (existingExecutorMapping != null) {
- val existingSize = existingExecutorMapping.get(newExecutor)
- if (existingSize != null) {
- existingExecutorMapping.put(newExecutor, existingSize + segment.getIndexSize
- .toInt)
- } else {
- existingExecutorMapping.put(newExecutor, segment.getIndexSize
- .toInt)
- }
- } else {
- val newExecutorMapping = new ConcurrentHashMap[String, Long]()
- newExecutorMapping.put(newExecutor, segment.getIndexSize)
- executorToCacheSizeMapping.put(newHost, newExecutorMapping)
- }
+ tableToExecutorMapping.putIfAbsent(tableUniqueName, new ConcurrentHashMap[String, String]())
val existingSegmentMapping = tableToExecutorMapping.get(tableUniqueName)
- if (existingSegmentMapping == null) {
- val newSegmentMapping = new ConcurrentHashMap[String, String]()
- newSegmentMapping.put(segment.getSegmentNo, s"${newHost}_$newExecutor")
- tableToExecutorMapping.putIfAbsent(tableUniqueName, newSegmentMapping)
+ val oldMapping = existingSegmentMapping.putIfAbsent(segment.getSegmentNo,
+ s"${ newHost }_$newExecutor")
+ if (oldMapping == null) {
+ updateCacheSize(newHost, newExecutor, segment)
+ s"executor_${newHost}_$newExecutor"
} else {
- existingSegmentMapping.putIfAbsent(segment.getSegmentNo, s"${newHost}_$newExecutor")
- tableToExecutorMapping.putIfAbsent(tableUniqueName, existingSegmentMapping)
+ s"executor_$oldMapping"
+ }
+ }
+ }
+
+ private def updateCacheSize(host: String, executor: String, segment: Segment) = {
+ val existingExecutorMapping = executorToCacheSizeMapping.get(host)
+ if (existingExecutorMapping != null) {
+ val existingSize = existingExecutorMapping.get(executor)
+ var totalSize = segment.getIndexSize
+ if (existingSize != null) {
+ totalSize += existingSize
}
- s"executor_${newHost}_$newExecutor"
+ existingExecutorMapping.put(executor, totalSize.toInt)
+ } else {
+ val newExecutorMapping = new ConcurrentHashMap[String, Long]()
+ newExecutorMapping.put(executor, segment.getIndexSize)
+ executorToCacheSizeMapping.put(host, newExecutorMapping)
}
}
diff --git a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
index ec75441..bc7ba13 100644
--- a/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/indexserver/DistributedRDDUtilsTest.scala
@@ -17,7 +17,7 @@
package org.apache.indexserver
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, Executors}
import scala.collection.JavaConverters._
@@ -226,4 +226,28 @@ class DistributedRDDUtilsTest extends FunSuite with BeforeAndAfterEach {
assert(FileFactory.isFileExist(indexServerTempFolder))
assert(FileFactory.isFileExist(tmpPathAnother))
}
+
+ test("test concurrent assigning of executors") {
+ executorCache.clear()
+ tableCache.clear()
+ // val executorsList: Map[String, Seq[String]]
+ val executorList = Map("EX1" -> Seq("1"), "EX2" -> Seq("2"))
+ val seg = new Segment("5")
+ seg.setIndexSize(10)
+ val executorService = Executors.newFixedThreadPool(8)
+ for (num <- 1 to 8) {
+ executorService.submit(
+ new Runnable {
+ override def run(): Unit = {
+ DistributedRDDUtils.assignExecutor("tablename", seg, executorList)
+ }
+ }).get()
+ }
+ executorService.shutdownNow()
+ assert(executorCache.size() == 1)
+ assert(executorCache.entrySet().iterator().next().getValue
+ .entrySet().iterator().next().getValue == 10)
+ executorCache.clear()
+ tableCache.clear()
+ }
}