You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/11/27 11:28:10 UTC
[carbondata] branch master updated: [CARBONDATA-4056] Added global
sort for data files merge operation in SI segments.
This is an automated email from the ASF dual-hosted git repository.
ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new e7aec95 [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.
e7aec95 is described below
commit e7aec9585e90271751fbd89867d71607c06396cd
Author: Karan980 <ka...@gmail.com>
AuthorDate: Tue Nov 24 17:01:34 2020 +0530
[CARBONDATA-4056] Added global sort for data files merge operation in SI segments.
Why is this PR needed?
Earlier global sort was not supported during data files merge operation of SI segments. So if some SI is created with global sort and value of carbon.si.segment.merge is true, it merges the data files in SI segments but disorder the globally sorted data.
What changes were proposed in this PR?
Added global sort for data files merge operation in SI segments.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4022
---
.../CarbonDataFileMergeTestCaseOnSI.scala | 53 ++++++
.../secondaryindex/rdd/CarbonSIRebuildRDD.scala | 47 +----
.../secondaryindex/rdd/SecondaryIndexCreator.scala | 27 ++-
.../secondaryindex/util/SecondaryIndexUtil.scala | 189 ++++++++++++++++++++-
4 files changed, 251 insertions(+), 65 deletions(-)
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
index 4f1a40e..a8f298c 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
class CarbonDataFileMergeTestCaseOnSI
extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -234,6 +235,58 @@ class CarbonDataFileMergeTestCaseOnSI
CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
}
+ test("Verify data file merge in SI segments with sort scope as gloabl sort and" +
+ "CARBON_SI_SEGMENT_MERGE property is enabled") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
+ sql("DROP TABLE IF EXISTS nonindexmerge")
+ sql(
+ """
+ | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+ | STORED AS carbondata
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+ s"'GLOBAL_SORT_PARTITIONS'='100')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+ s"'GLOBAL_SORT_PARTITIONS'='100')")
+ sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS 'carbondata' " +
+ "properties('table_blocksize'='1', 'SORT_SCOPE'='GLOBAL_SORT')")
+ val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
+ .queryExecution.sparkPlan
+ assert(isFilterPushedDownToSI(df1))
+ assert(getDataFileCount("nonindexmerge_index1", "0") < 15)
+ assert(getDataFileCount("nonindexmerge_index1", "1") < 15)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE,
+ CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE_DEFAULT)
+ }
+
+ test("Verify REFRESH INDEX command with sort scope as global sort") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+ sql("DROP TABLE IF EXISTS nonindexmerge")
+ sql(
+ """
+ | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+ | STORED AS carbondata
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+ s"'GLOBAL_SORT_PARTITIONS'='100')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+ s"'GLOBAL_SORT_PARTITIONS'='100')")
+ sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS 'carbondata' " +
+ "properties('table_blocksize'='1', 'SORT_SCOPE'='GLOBAL_SORT')")
+ sql("REFRESH INDEX nonindexmerge_index1 ON TABLE nonindexmerge").collect()
+ val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
+ .queryExecution.sparkPlan
+ assert(isFilterPushedDownToSI(df1))
+ assert(getDataFileCount("nonindexmerge_index1", "0") < 15)
+ assert(getDataFileCount("nonindexmerge_index1", "1") < 15)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE,
+ CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE_DEFAULT)
+ }
+
private def getDataFileCount(tableName: String, segment: String): Int = {
val table = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
val path = CarbonTablePath
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
index 9af9847..e076a5d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
@@ -23,30 +23,22 @@ import java.util.Collections
import scala.collection.JavaConverters._
-import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.CarbonMergerMapping
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil
import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.converter.SparkDataTypeConverterImpl
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, SortScopeOptions}
+import org.apache.carbondata.core.constants.SortScopeOptions
import org.apache.carbondata.core.datastore.block.{SegmentProperties, TaskBlockInfo}
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
import org.apache.carbondata.hadoop.api.CarbonInputFormat
import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -106,8 +98,8 @@ class CarbonSIRebuildRDD[K, V](
// take the merge size as the block size
val mergeSize =
- getTableBlockSizeInMb(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)(ss) * 1024 *
- 1024
+ SecondaryIndexUtil.getTableBlockSizeInMb(carbonLoadModel
+ .getCarbonDataLoadSchema.getCarbonTable)(ss) * 1024 * 1024
val resultSplits: java.util.List[CarbonSparkPartition] = new java.util.ArrayList()
splitsGroupedMySegment.foreach { entry =>
@@ -135,37 +127,6 @@ class CarbonSIRebuildRDD[K, V](
resultSplits.asScala.toArray
}
- /**
- * Get the table block size from the index table, if not found in SI table, check main table
- * If main table also not set with table block size then fall back to default block size set
- *
- */
- def getTableBlockSizeInMb(indexTable: CarbonTable)(sparkSession: SparkSession): Long = {
- var tableBlockSize: String = null
- var tableProperties = indexTable.getTableInfo.getFactTable.getTableProperties
- if (null != tableProperties) {
- tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE)
- }
- if (null == tableBlockSize) {
- val metaStore = CarbonEnv.getInstance(sparkSession)
- .carbonMetaStore
- val mainTable = metaStore
- .lookupRelation(Some(indexTable.getDatabaseName),
- CarbonIndexUtil.getParentTableName(indexTable))(sparkSession)
- .asInstanceOf[CarbonRelation]
- .carbonTable
- tableProperties = mainTable.getTableInfo.getFactTable.getTableProperties
- if (null != tableProperties) {
- tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE)
- }
- if (null == tableBlockSize) {
- tableBlockSize = CarbonCommonConstants.TABLE_BLOCK_SIZE_DEFAULT
- }
- }
- tableBlockSize.toLong
- }
-
-
override def internalCompute(theSplit: Partition,
context: TaskContext): Iterator[((K, V), String)] = {
val queryStartTime = System.currentTimeMillis()
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index e611f8b..2b2f7c0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -214,20 +214,6 @@ object SecondaryIndexCreator {
} else {
null
}
-
- def findCarbonScanRDD(rdd: RDD[_], currentSegmentFileName: String): Unit = {
- rdd match {
- case carbonScanRDD: CarbonScanRDD[_] =>
- carbonScanRDD.setValidateSegmentToAccess(false)
- if (currentSegmentFileName != null) {
- carbonScanRDD.setCurrentSegmentFileName(currentSegmentFileName)
- }
- case others =>
- others.dependencies
- .foreach { x => findCarbonScanRDD(x.rdd, currentSegmentFileName) }
- }
- }
-
findCarbonScanRDD(dataFrame.rdd, currentSegmentFileName)
// accumulator to collect segment metadata
val segmentMetaDataAccumulator = sc.sparkSession.sqlContext
@@ -498,6 +484,19 @@ object SecondaryIndexCreator {
}
}
+ def findCarbonScanRDD(rdd: RDD[_], currentSegmentFileName: String): Unit = {
+ rdd match {
+ case carbonScanRDD: CarbonScanRDD[_] =>
+ carbonScanRDD.setValidateSegmentToAccess(false)
+ if (currentSegmentFileName != null) {
+ carbonScanRDD.setCurrentSegmentFileName(currentSegmentFileName)
+ }
+ case others =>
+ others.dependencies
+ .foreach { x => findCarbonScanRDD(x.rdd, currentSegmentFileName) }
+ }
+ }
+
/**
* will return the copy object of the existing object
*
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 832c33d..3506aac 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -21,38 +21,51 @@ import java.util
import java.util.{Collections, Comparator}
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
import scala.util.control.Breaks
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
import org.apache.log4j.Logger
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.CarbonMergeFilesRDD
import org.apache.spark.sql.{CarbonEnv, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD
+import org.apache.spark.sql.secondaryindex.rdd.{CarbonSIRebuildRDD, SecondaryIndexCreator}
import org.apache.spark.sql.util.SparkSQLUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
import org.apache.carbondata.core.datastore.block.{TableBlockInfo, TaskBlockInfo}
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
import org.apache.carbondata.core.locks.CarbonLockUtil
import org.apache.carbondata.core.metadata.SegmentFileStore
import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, StructField, StructType}
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
import org.apache.carbondata.hadoop.CarbonInputSplit
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
import org.apache.carbondata.indexserver.IndexServer
+import org.apache.carbondata.processing.loading.FailureCauses
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.MergeResultImpl
+import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
object SecondaryIndexUtil {
@@ -176,15 +189,16 @@ object SecondaryIndexUtil {
val mergedSegments: util.Set[LoadMetadataDetails] = new util.HashSet[LoadMetadataDetails]()
var rebuiltSegments: Set[String] = Set[String]()
val segmentIdToLoadStartTimeMap: util.Map[String, String] = new util.HashMap()
-
try {
- val mergeStatus =
- new CarbonSIRebuildRDD(
- sc.sparkSession,
+ val mergeStatus = if (SortScope.GLOBAL_SORT == indexCarbonTable.getSortScope &&
+ !indexCarbonTable.getSortColumns.isEmpty) {
+ mergeSISegmentDataFiles(sc.sparkSession, carbonLoadModel, carbonMergerMapping)
+ } else {
+ new CarbonSIRebuildRDD(sc.sparkSession,
new MergeResultImpl(),
carbonLoadModel,
- carbonMergerMapping
- ).collect
+ carbonMergerMapping).collect
+ }
if (null != mergeStatus && mergeStatus.length == 0) {
finalMergeStatus = true
} else {
@@ -206,6 +220,12 @@ object SecondaryIndexUtil {
carbonLoadModel.getFactTimeStamp,
validSegmentsToUse.toList.asJava,
indexCarbonTable)
+ if (SortScope.GLOBAL_SORT == indexCarbonTable.getSortScope &&
+ !indexCarbonTable.getSortColumns.isEmpty) {
+ deleteOldCarbonDataFiles(carbonLoadModel.getFactTimeStamp,
+ validSegmentsToUse.toList.asJava,
+ indexCarbonTable)
+ }
mergedSegments.asScala.map { seg =>
val file = SegmentFileStore.writeSegmentFile(
indexCarbonTable,
@@ -616,4 +636,157 @@ object SecondaryIndexUtil {
identifiedSegments
}
+ /**
+ * This method deletes the old carbondata files.
+ */
+ private def deleteOldCarbonDataFiles(factTimeStamp: Long,
+ validSegments: util.List[Segment],
+ indexCarbonTable: CarbonTable): Unit = {
+ validSegments.asScala.foreach { segment =>
+ val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath,
+ segment.getSegmentNo)
+ val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = {
+ file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)
+ }})
+ dataFiles.foreach(dataFile =>
+ if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) {
+ dataFile.delete()
+ })
+ }
+ }
+
+ def mergeSISegmentDataFiles(sparkSession: SparkSession,
+ carbonLoadModel: CarbonLoadModel,
+ carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = {
+ val validSegments = carbonMergerMapping.validSegments.toList
+ val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+ val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier
+ val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
+ SparkHadoopUtil.get.addCredentials(jobConf)
+ val job: Job = new Job(jobConf)
+ val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+ CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo)
+ val proj = indexCarbonTable.getCreateOrderColumn
+ .asScala
+ .map(_.getColName)
+ .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
+ var mergeStatus = ArrayBuffer[((String, Boolean), String)]()
+ val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024
+ val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+ val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable)
+ CarbonIndexUtil.initializeSILoadModel(outputModel, header)
+ outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+ val segmentMetaDataAccumulator = sparkSession.sqlContext
+ .sparkContext
+ .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
+ validSegments.foreach { segment =>
+ outputModel.setSegmentId(segment.getSegmentNo)
+ val dataFrame = SecondaryIndexCreator.dataFrameOfSegments(sparkSession,
+ indexCarbonTable,
+ proj.mkString(","),
+ Array(segment.getSegmentNo))
+ SecondaryIndexCreator.findCarbonScanRDD(dataFrame.rdd, null)
+ val segList : java.util.List[Segment] = new util.ArrayList[Segment]()
+ segList.add(segment)
+ CarbonInputFormat.setSegmentsToAccess(job.getConfiguration, segList)
+ CarbonInputFormat.setValidateSegmentsToAccess(job.getConfiguration, false)
+ val splits = format.getSplits(job)
+ val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+ outputModel.setGlobalSortPartitions(identifyGlobalSortPartitions(carbonInputSplits.asJava,
+ mergeSize))
+ DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sparkSession,
+ Option(dataFrame),
+ outputModel,
+ SparkSQLUtil.sessionState(sparkSession).newHadoopConf(),
+ segmentMetaDataAccumulator)
+ .map{ row =>
+ ((row._1, FailureCauses.NONE == row._2._2.failureCauses), segment.getSegmentNo)}
+ .foreach(status => mergeStatus += status)
+ }
+ mergeStatus.toArray
+ }
+
+ /**
+ * create CarbonLoadModel for global_sort of SI segment data files merge
+ */
+ def getLoadModelForGlobalSort(sparkSession: SparkSession,
+ carbonTable: CarbonTable): CarbonLoadModel = {
+ val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
+ CarbonTableOutputFormat.setDatabaseName(conf, carbonTable.getDatabaseName)
+ CarbonTableOutputFormat.setTableName(conf, carbonTable.getTableName)
+ CarbonTableOutputFormat.setCarbonTable(conf, carbonTable)
+ val fieldList = carbonTable.getCreateOrderColumn
+ .asScala
+ .map { column =>
+ new StructField(column.getColName, column.getDataType)
+ }
+ CarbonTableOutputFormat.setInputSchema(conf, new StructType(fieldList.asJava))
+ val loadModel = CarbonTableOutputFormat.getLoadModel(conf)
+ loadModel
+ }
+
+ /**
+ * Get the table block size from the index table, if not found in SI table, check main table
+ * If main table also not set with table block size then fall back to default block size set
+ *
+ */
+ def getTableBlockSizeInMb(indexTable: CarbonTable)(sparkSession: SparkSession): Long = {
+ var tableBlockSize: String = null
+ var tableProperties = indexTable.getTableInfo.getFactTable.getTableProperties
+ if (null != tableProperties) {
+ tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE)
+ }
+ if (null == tableBlockSize) {
+ val metaStore = CarbonEnv.getInstance(sparkSession)
+ .carbonMetaStore
+ val mainTable = metaStore
+ .lookupRelation(Some(indexTable.getDatabaseName),
+ CarbonIndexUtil.getParentTableName(indexTable))(sparkSession)
+ .asInstanceOf[CarbonRelation]
+ .carbonTable
+ tableProperties = mainTable.getTableInfo.getFactTable.getTableProperties
+ if (null != tableProperties) {
+ tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE)
+ }
+ if (null == tableBlockSize) {
+ tableBlockSize = CarbonCommonConstants.TABLE_BLOCK_SIZE_DEFAULT
+ }
+ }
+ tableBlockSize.toLong
+ }
+
+ /**
+ * Identifies number of global sort partitions used for SI segment data file merge load
+ * For eg: if block is 1MB and number of splits are 6 having size as follows.
+ * (100KB, 200KB, 300KB, 400KB, 500KB, 600KB), Then,
+ * (100KB + 200KB + 300KB + 400KB) >= 1MB, so it gives first partition
+ * (500KB + 600KB) >= 1MB, and it gives second partition
+ *
+ */
+ def identifyGlobalSortPartitions(splits: util.List[CarbonInputSplit], mergeSize: Long):
+ String = {
+ var partitions: Long = 0L
+ var totalSize: Long = 0L
+ // sort the splits based on the block size and then make groups based on the threshold
+ Collections.sort(splits, new Comparator[CarbonInputSplit]() {
+ def compare(split1: CarbonInputSplit, split2: CarbonInputSplit): Int = {
+ (split1.getLength - split2.getLength).toInt
+ }
+ })
+ for (i <- 0 until splits.size()) {
+ val block = splits.get(i)
+ val blockFileSize = block.getLength
+ totalSize += blockFileSize
+ if (totalSize >= mergeSize) {
+ partitions = partitions + 1
+ totalSize = 0L
+ }
+ }
+ if (totalSize > 0) {
+ partitions = partitions + 1
+ }
+ partitions.toString
+ }
+
}