You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/11/27 11:28:10 UTC

[carbondata] branch master updated: [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new e7aec95  [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.
e7aec95 is described below

commit e7aec9585e90271751fbd89867d71607c06396cd
Author: Karan980 <ka...@gmail.com>
AuthorDate: Tue Nov 24 17:01:34 2020 +0530

    [CARBONDATA-4056] Added global sort for data files merge operation in SI segments.
    
    Why is this PR needed?
    Earlier global sort was not supported during data files merge operation of SI segments. So if some SI is created with global sort and value of carbon.si.segment.merge is true, it merges the data files in SI segments but disorder the globally sorted data.
    
    What changes were proposed in this PR?
    Added global sort for data files merge operation in SI segments.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4022
---
 .../CarbonDataFileMergeTestCaseOnSI.scala          |  53 ++++++
 .../secondaryindex/rdd/CarbonSIRebuildRDD.scala    |  47 +----
 .../secondaryindex/rdd/SecondaryIndexCreator.scala |  27 ++-
 .../secondaryindex/util/SecondaryIndexUtil.scala   | 189 ++++++++++++++++++++-
 4 files changed, 251 insertions(+), 65 deletions(-)

diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
index 4f1a40e..a8f298c 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
@@ -29,6 +29,7 @@ import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFi
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils.isFilterPushedDownToSI
 
 class CarbonDataFileMergeTestCaseOnSI
   extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -234,6 +235,58 @@ class CarbonDataFileMergeTestCaseOnSI
         CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
   }
 
+  test("Verify data file merge in SI segments with sort scope as gloabl sort and" +
+    "CARBON_SI_SEGMENT_MERGE property is enabled") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED AS carbondata
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+      s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+      s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS 'carbondata' " +
+        "properties('table_blocksize'='1', 'SORT_SCOPE'='GLOBAL_SORT')")
+   val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
+     .queryExecution.sparkPlan
+    assert(isFilterPushedDownToSI(df1))
+    assert(getDataFileCount("nonindexmerge_index1", "0") < 15)
+    assert(getDataFileCount("nonindexmerge_index1", "1") < 15)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE,
+      CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE_DEFAULT)
+  }
+
+  test("Verify REFRESH INDEX command with sort scope as global sort") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED AS carbondata
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+      s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+      s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS 'carbondata' " +
+      "properties('table_blocksize'='1', 'SORT_SCOPE'='GLOBAL_SORT')")
+    sql("REFRESH INDEX nonindexmerge_index1 ON TABLE nonindexmerge").collect()
+    val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
+      .queryExecution.sparkPlan
+    assert(isFilterPushedDownToSI(df1))
+    assert(getDataFileCount("nonindexmerge_index1", "0") < 15)
+    assert(getDataFileCount("nonindexmerge_index1", "1") < 15)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE,
+      CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE_DEFAULT)
+  }
+
   private def getDataFileCount(tableName: String, segment: String): Int = {
     val table = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
     val path = CarbonTablePath
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
index 9af9847..e076a5d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/CarbonSIRebuildRDD.scala
@@ -23,30 +23,22 @@ import java.util.Collections
 
 import scala.collection.JavaConverters._
 
-import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.CarbonMergerMapping
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.converter.SparkDataTypeConverterImpl
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, SortScopeOptions}
+import org.apache.carbondata.core.constants.SortScopeOptions
 import org.apache.carbondata.core.datastore.block.{SegmentProperties, TaskBlockInfo}
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.util.{CarbonUtil, DataTypeUtil}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.CarbonInputFormat
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -106,8 +98,8 @@ class CarbonSIRebuildRDD[K, V](
 
     // take the merge size as the block size
     val mergeSize =
-    getTableBlockSizeInMb(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)(ss) * 1024 *
-     1024
+    SecondaryIndexUtil.getTableBlockSizeInMb(carbonLoadModel
+      .getCarbonDataLoadSchema.getCarbonTable)(ss) * 1024 * 1024
 
     val resultSplits: java.util.List[CarbonSparkPartition] = new java.util.ArrayList()
     splitsGroupedMySegment.foreach { entry =>
@@ -135,37 +127,6 @@ class CarbonSIRebuildRDD[K, V](
     resultSplits.asScala.toArray
   }
 
-  /**
-   * Get the table block size from the index table, if not found in SI table, check main table
-   * If main table also not set with table block size then fall back to default block size set
-   *
-   */
-  def getTableBlockSizeInMb(indexTable: CarbonTable)(sparkSession: SparkSession): Long = {
-    var tableBlockSize: String = null
-    var tableProperties = indexTable.getTableInfo.getFactTable.getTableProperties
-    if (null != tableProperties) {
-      tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE)
-    }
-    if (null == tableBlockSize) {
-      val metaStore = CarbonEnv.getInstance(sparkSession)
-        .carbonMetaStore
-      val mainTable = metaStore
-        .lookupRelation(Some(indexTable.getDatabaseName),
-          CarbonIndexUtil.getParentTableName(indexTable))(sparkSession)
-        .asInstanceOf[CarbonRelation]
-        .carbonTable
-      tableProperties = mainTable.getTableInfo.getFactTable.getTableProperties
-      if (null != tableProperties) {
-        tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE)
-      }
-      if (null == tableBlockSize) {
-        tableBlockSize = CarbonCommonConstants.TABLE_BLOCK_SIZE_DEFAULT
-      }
-    }
-    tableBlockSize.toLong
-  }
-
-
   override def internalCompute(theSplit: Partition,
     context: TaskContext): Iterator[((K, V), String)] = {
     val queryStartTime = System.currentTimeMillis()
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index e611f8b..2b2f7c0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -214,20 +214,6 @@ object SecondaryIndexCreator {
                   } else {
                     null
                   }
-
-                  def findCarbonScanRDD(rdd: RDD[_], currentSegmentFileName: String): Unit = {
-                    rdd match {
-                      case carbonScanRDD: CarbonScanRDD[_] =>
-                        carbonScanRDD.setValidateSegmentToAccess(false)
-                        if (currentSegmentFileName != null) {
-                          carbonScanRDD.setCurrentSegmentFileName(currentSegmentFileName)
-                        }
-                      case others =>
-                        others.dependencies
-                          .foreach { x => findCarbonScanRDD(x.rdd, currentSegmentFileName) }
-                    }
-                  }
-
                   findCarbonScanRDD(dataFrame.rdd, currentSegmentFileName)
                   // accumulator to collect segment metadata
                   val segmentMetaDataAccumulator = sc.sparkSession.sqlContext
@@ -498,6 +484,19 @@ object SecondaryIndexCreator {
     }
   }
 
+  def findCarbonScanRDD(rdd: RDD[_], currentSegmentFileName: String): Unit = {
+    rdd match {
+      case carbonScanRDD: CarbonScanRDD[_] =>
+        carbonScanRDD.setValidateSegmentToAccess(false)
+        if (currentSegmentFileName != null) {
+          carbonScanRDD.setCurrentSegmentFileName(currentSegmentFileName)
+        }
+      case others =>
+        others.dependencies
+          .foreach { x => findCarbonScanRDD(x.rdd, currentSegmentFileName) }
+    }
+  }
+
   /**
    * will return the copy object of the existing object
    *
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 832c33d..3506aac 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -21,38 +21,51 @@ import java.util
 import java.util.{Collections, Comparator}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 import scala.util.control.Breaks
 
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
 import org.apache.log4j.Logger
+import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.CarbonMergeFilesRDD
 import org.apache.spark.sql.{CarbonEnv, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
-import org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD
+import org.apache.spark.sql.secondaryindex.rdd.{CarbonSIRebuildRDD, SecondaryIndexCreator}
 import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.constants.SortScopeOptions.SortScope
 import org.apache.carbondata.core.datastore.block.{TableBlockInfo, TaskBlockInfo}
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
 import org.apache.carbondata.core.locks.CarbonLockUtil
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
-import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.datatype.{DataType, StructField, StructType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
 import org.apache.carbondata.hadoop.CarbonInputSplit
+import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableOutputFormat}
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
 import org.apache.carbondata.indexserver.IndexServer
+import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.MergeResultImpl
+import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
 
 object SecondaryIndexUtil {
 
@@ -176,15 +189,16 @@ object SecondaryIndexUtil {
     val mergedSegments: util.Set[LoadMetadataDetails] = new util.HashSet[LoadMetadataDetails]()
     var rebuiltSegments: Set[String] = Set[String]()
     val segmentIdToLoadStartTimeMap: util.Map[String, String] = new util.HashMap()
-
     try {
-      val mergeStatus =
-        new CarbonSIRebuildRDD(
-          sc.sparkSession,
+      val mergeStatus = if (SortScope.GLOBAL_SORT == indexCarbonTable.getSortScope &&
+      !indexCarbonTable.getSortColumns.isEmpty) {
+        mergeSISegmentDataFiles(sc.sparkSession, carbonLoadModel, carbonMergerMapping)
+      } else {
+        new CarbonSIRebuildRDD(sc.sparkSession,
           new MergeResultImpl(),
           carbonLoadModel,
-          carbonMergerMapping
-        ).collect
+          carbonMergerMapping).collect
+      }
       if (null != mergeStatus && mergeStatus.length == 0) {
         finalMergeStatus = true
       } else {
@@ -206,6 +220,12 @@ object SecondaryIndexUtil {
             carbonLoadModel.getFactTimeStamp,
             validSegmentsToUse.toList.asJava,
             indexCarbonTable)
+          if (SortScope.GLOBAL_SORT == indexCarbonTable.getSortScope &&
+            !indexCarbonTable.getSortColumns.isEmpty) {
+            deleteOldCarbonDataFiles(carbonLoadModel.getFactTimeStamp,
+              validSegmentsToUse.toList.asJava,
+              indexCarbonTable)
+          }
           mergedSegments.asScala.map { seg =>
             val file = SegmentFileStore.writeSegmentFile(
               indexCarbonTable,
@@ -616,4 +636,157 @@ object SecondaryIndexUtil {
     identifiedSegments
   }
 
+  /**
+   * This method deletes the old carbondata files.
+   */
+  private def deleteOldCarbonDataFiles(factTimeStamp: Long,
+              validSegments: util.List[Segment],
+              indexCarbonTable: CarbonTable): Unit = {
+    validSegments.asScala.foreach { segment =>
+      val segmentPath = CarbonTablePath.getSegmentPath(indexCarbonTable.getTablePath,
+        segment.getSegmentNo)
+      val dataFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.endsWith(CarbonTablePath.CARBON_DATA_EXT)
+        }})
+      dataFiles.foreach(dataFile =>
+      if (DataFileUtil.getTimeStampFromFileName(dataFile.getAbsolutePath).toLong < factTimeStamp) {
+        dataFile.delete()
+      })
+    }
+  }
+
+  def mergeSISegmentDataFiles(sparkSession: SparkSession,
+      carbonLoadModel: CarbonLoadModel,
+      carbonMergerMapping: CarbonMergerMapping): Array[((String, Boolean), String)] = {
+    val validSegments = carbonMergerMapping.validSegments.toList
+    val indexCarbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val absoluteTableIdentifier = indexCarbonTable.getAbsoluteTableIdentifier
+    val jobConf: JobConf = new JobConf(FileFactory.getConfiguration)
+    SparkHadoopUtil.get.addCredentials(jobConf)
+    val job: Job = new Job(jobConf)
+    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+    CarbonInputFormat.setTableInfo(job.getConfiguration, indexCarbonTable.getTableInfo)
+    val proj = indexCarbonTable.getCreateOrderColumn
+      .asScala
+      .map(_.getColName)
+      .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
+    var mergeStatus = ArrayBuffer[((String, Boolean), String)]()
+    val mergeSize = getTableBlockSizeInMb(indexCarbonTable)(sparkSession) * 1024 * 1024
+    val header = indexCarbonTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+    val outputModel = getLoadModelForGlobalSort(sparkSession, indexCarbonTable)
+    CarbonIndexUtil.initializeSILoadModel(outputModel, header)
+    outputModel.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+    val segmentMetaDataAccumulator = sparkSession.sqlContext
+      .sparkContext
+      .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
+    validSegments.foreach { segment =>
+      outputModel.setSegmentId(segment.getSegmentNo)
+      val dataFrame = SecondaryIndexCreator.dataFrameOfSegments(sparkSession,
+        indexCarbonTable,
+        proj.mkString(","),
+        Array(segment.getSegmentNo))
+      SecondaryIndexCreator.findCarbonScanRDD(dataFrame.rdd, null)
+      val segList : java.util.List[Segment] = new util.ArrayList[Segment]()
+      segList.add(segment)
+      CarbonInputFormat.setSegmentsToAccess(job.getConfiguration, segList)
+      CarbonInputFormat.setValidateSegmentsToAccess(job.getConfiguration, false)
+      val splits = format.getSplits(job)
+      val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+      outputModel.setGlobalSortPartitions(identifyGlobalSortPartitions(carbonInputSplits.asJava,
+        mergeSize))
+      DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sparkSession,
+        Option(dataFrame),
+        outputModel,
+        SparkSQLUtil.sessionState(sparkSession).newHadoopConf(),
+        segmentMetaDataAccumulator)
+        .map{ row =>
+          ((row._1, FailureCauses.NONE == row._2._2.failureCauses), segment.getSegmentNo)}
+        .foreach(status => mergeStatus += status)
+    }
+    mergeStatus.toArray
+  }
+
+  /**
+   * create CarbonLoadModel for global_sort of SI segment data files merge
+   */
+  def getLoadModelForGlobalSort(sparkSession: SparkSession,
+      carbonTable: CarbonTable): CarbonLoadModel = {
+    val conf = SparkSQLUtil.sessionState(sparkSession).newHadoopConf()
+    CarbonTableOutputFormat.setDatabaseName(conf, carbonTable.getDatabaseName)
+    CarbonTableOutputFormat.setTableName(conf, carbonTable.getTableName)
+    CarbonTableOutputFormat.setCarbonTable(conf, carbonTable)
+    val fieldList = carbonTable.getCreateOrderColumn
+      .asScala
+      .map { column =>
+        new StructField(column.getColName, column.getDataType)
+      }
+    CarbonTableOutputFormat.setInputSchema(conf, new StructType(fieldList.asJava))
+    val loadModel = CarbonTableOutputFormat.getLoadModel(conf)
+    loadModel
+  }
+
+  /**
+   * Get the table block size from the index table, if not found in SI table, check main table
+   * If main table also not set with table block size then fall back to default block size set
+   *
+   */
+  def getTableBlockSizeInMb(indexTable: CarbonTable)(sparkSession: SparkSession): Long = {
+    var tableBlockSize: String = null
+    var tableProperties = indexTable.getTableInfo.getFactTable.getTableProperties
+    if (null != tableProperties) {
+      tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE)
+    }
+    if (null == tableBlockSize) {
+      val metaStore = CarbonEnv.getInstance(sparkSession)
+        .carbonMetaStore
+      val mainTable = metaStore
+        .lookupRelation(Some(indexTable.getDatabaseName),
+          CarbonIndexUtil.getParentTableName(indexTable))(sparkSession)
+        .asInstanceOf[CarbonRelation]
+        .carbonTable
+      tableProperties = mainTable.getTableInfo.getFactTable.getTableProperties
+      if (null != tableProperties) {
+        tableBlockSize = tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE)
+      }
+      if (null == tableBlockSize) {
+        tableBlockSize = CarbonCommonConstants.TABLE_BLOCK_SIZE_DEFAULT
+      }
+    }
+    tableBlockSize.toLong
+  }
+
+  /**
+   * Identifies number of global sort partitions used for SI segment data file merge load
+   * For eg: if block is 1MB and number of splits are 6 having size as follows.
+   * (100KB, 200KB, 300KB, 400KB, 500KB, 600KB), Then,
+   * (100KB + 200KB + 300KB + 400KB) >= 1MB, so it gives first partition
+   * (500KB + 600KB) >= 1MB, and it gives second partition
+   *
+   */
+  def identifyGlobalSortPartitions(splits: util.List[CarbonInputSplit], mergeSize: Long):
+  String = {
+    var partitions: Long = 0L
+    var totalSize: Long = 0L
+    // sort the splits based on the block size and then make groups based on the threshold
+    Collections.sort(splits, new Comparator[CarbonInputSplit]() {
+      def compare(split1: CarbonInputSplit, split2: CarbonInputSplit): Int = {
+        (split1.getLength - split2.getLength).toInt
+      }
+    })
+    for (i <- 0 until splits.size()) {
+      val block = splits.get(i)
+      val blockFileSize = block.getLength
+      totalSize += blockFileSize
+      if (totalSize >= mergeSize) {
+        partitions = partitions + 1
+        totalSize = 0L
+      }
+    }
+    if (totalSize > 0) {
+      partitions = partitions + 1
+    }
+    partitions.toString
+  }
+
 }