You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/09/10 12:40:26 UTC

[carbondata] branch master updated: [CARBONDATA-3923] support global sort for SI

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 91f7252  [CARBONDATA-3923] support global sort for SI
91f7252 is described below

commit 91f72520b0f96c6b0d641f812688352881b1dbde
Author: ajantha-bhat <aj...@gmail.com>
AuthorDate: Thu Jun 4 18:27:04 2020 +0530

    [CARBONDATA-3923] support global sort for SI
    
    Why is this PR needed?
    Secondary index is always created with local sort, should support
    global sort, it can improve SI lookup performance.
    
    What changes were proposed in this PR?
    support sort_scope and global_sort_partitions in create index property.
    
    This closes #3787
---
 docs/index/secondary-index-guide.md                |   3 +-
 .../secondaryindex/TestSIWithSecondryIndex.scala   |  58 +++++
 .../spark/load/DataLoadProcessBuilderOnSpark.scala |   8 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala       |  17 ++
 .../apache/carbondata/spark/util/CommonUtil.scala  |  21 ++
 .../strategy/CarbonLateDecodeStrategy.scala        |  57 ++++-
 .../apache/spark/sql/index/CarbonIndexUtil.scala   |  25 +-
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |   8 +-
 .../sql/secondaryindex/command/SILoadCommand.scala |  18 ++
 .../spark/sql/secondaryindex/load/Compactor.scala  |   7 +-
 .../secondaryindex/rdd/SecondaryIndexCreator.scala | 276 ++++++++++++++++-----
 .../org/apache/spark/util/AlterTableUtil.scala     |  23 +-
 12 files changed, 426 insertions(+), 95 deletions(-)

diff --git a/docs/index/secondary-index-guide.md b/docs/index/secondary-index-guide.md
index 503230c..e115260 100644
--- a/docs/index/secondary-index-guide.md
+++ b/docs/index/secondary-index-guide.md
@@ -84,7 +84,8 @@ EXPLAIN SELECT a from maintable where c = 'cd';
   'carbondata'
   PROPERTIES('table_blocksize'='1')
   ```
- 
+  **NOTE**:
+  * supported properties are table_blocksize, column_meta_cache, cache_level, carbon.column.compressor, sort_scope and global_sort_partitions.
  
 #### How SI tables are selected
 
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
index 713047e..3986783 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithSecondryIndex.scala
@@ -86,6 +86,64 @@ class TestSIWithSecondryIndex extends QueryTest with BeforeAndAfterAll {
       .contains("Alter table drop column operation failed:"))
   }
 
+  test("test create secondary index global sort after insert") {
+    sql("drop table if exists table1")
+    sql("create table table1 (name string, id string, country string) stored as carbondata")
+    sql("insert into table1 select 'xx', '2', 'china' union all select 'xx', '1', 'india'")
+    sql("create index table1_index on table table1(id, country) as 'carbondata' properties" +
+        "('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
+    checkAnswerWithoutSort(sql("select id, country from table1_index"),
+      Seq(Row("1", "india"), Row("2", "china")))
+    // check for valid sort_scope
+    checkExistence(sql("describe formatted table1_index"), true, "Sort Scope global_sort")
+    // check the invalid sort scope
+    assert(intercept[MalformedCarbonCommandException](sql(
+      "create index index_2 on table table1(id, country) as 'carbondata' properties" +
+      "('sort_scope'='tim_sort', 'Global_sort_partitions'='3')"))
+      .getMessage
+      .contains("Invalid SORT_SCOPE tim_sort"))
+    // check for invalid global_sort_partitions
+    assert(intercept[MalformedCarbonCommandException](sql(
+      "create index index_2 on table table1(id, country) as 'carbondata' properties" +
+      "('sort_scope'='global_sort', 'Global_sort_partitions'='-1')"))
+      .getMessage
+      .contains("Table property global_sort_partitions : -1 is invalid"))
+    sql("drop index table1_index on table1")
+    sql("drop table table1")
+  }
+
+  test("test create secondary index global sort before insert") {
+    sql("drop table if exists table1")
+    sql("create table table1 (name string, id string, country string) stored as carbondata")
+    sql("create index table1_index on table table1(id, country) as 'carbondata' properties" +
+        "('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
+    sql("insert into table1 select 'xx', '2', 'china' union all select 'xx', '1', 'india'")
+    checkAnswerWithoutSort(sql("select id, country from table1_index"),
+      Seq(Row("1", "india"), Row("2", "china")))
+    // check for valid sort_scope
+    checkExistence(sql("describe formatted table1_index"), true, "Sort Scope global_sort")
+    sql("drop index table1_index on table1")
+    sql("drop table table1")
+  }
+
+  test("test array<string> and string as index columns on secondary index with global sort") {
+    sql("drop table if exists complextable")
+    sql(
+      "create table complextable (id string, country array<string>, name string) stored as " +
+      "carbondata")
+    sql("insert into complextable select 1, array('china', 'us'), 'b' union all select 2, array" +
+        "('pak', 'india', 'china'), 'v' ")
+    sql("drop index if exists complextable_index_1 on complextable")
+    sql("create index complextable_index_1 on table complextable(country, name) as 'carbondata' properties" +
+        "('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
+    checkAnswerWithoutSort(sql("select country,name from complextable_index_1"),
+      Seq(Row("china", "b"), Row("china", "v"), Row("india", "v"), Row("pak", "v"), Row("us", "b")))
+    // check for valid sort_scope
+    checkExistence(sql("describe formatted complextable_index_1"), true, "Sort Scope global_sort")
+    sql("drop index complextable_index_1 on complextable")
+    sql("drop table complextable")
+  }
+
   test("Test secondry index data count") {
     checkAnswer(sql("select count(*) from si_altercolumn")
       ,Seq(Row(1)))
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index eb6c82c..c4b8924 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -288,19 +288,19 @@ object DataLoadProcessBuilderOnSpark {
   private def updateLoadStatus(model: CarbonLoadModel, partialSuccessAccum: LongAccumulator
   ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     // Update status
+    val loadMetadataDetails = new LoadMetadataDetails()
+    loadMetadataDetails.setLoadName(model.getSegmentId)
     if (partialSuccessAccum.value != 0) {
       val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE +
         "Partial_Success"
-      val loadMetadataDetails = new LoadMetadataDetails()
       loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
-      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+      val executionErrors = ExecutionErrors(FailureCauses.NONE, "")
       executionErrors.failureCauses = FailureCauses.BAD_RECORDS
       Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
     } else {
       val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success"
-      val loadMetadataDetails = new LoadMetadataDetails()
       loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
-      val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
+      val executionErrors = ExecutionErrors(FailureCauses.NONE, "")
       Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
     }
   }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 5c60640..2669a5f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -95,6 +95,11 @@ class CarbonScanRDD[T: ClassTag](
 
   private var readCommittedScope: ReadCommittedScope = _
 
+  // by default, always validate the segment to access.
+  // when set to false,
+  // doesn't validate the segment and allows query on the segments without validation.
+  private var validateSegmentToAccess: Boolean = true
+
   @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
   override def internalGetPartitions: Array[Partition] = {
@@ -123,6 +128,14 @@ class CarbonScanRDD[T: ClassTag](
       if (null != segmentsToAccess) {
         CarbonInputFormat
           .setSegmentsToAccess(job.getConfiguration, segmentsToAccess.toList.asJava)
+        // As we have already set input segments that we got from main table no need to validate.
+        CarbonInputFormat.setValidateSegmentsToAccess(job.getConfiguration, false)
+      } else {
+        if (!validateSegmentToAccess) {
+          // set to false, for SI global sort flow
+          CarbonInputFormat
+            .setValidateSegmentsToAccess(job.getConfiguration, validateSegmentToAccess)
+        }
       }
       // get splits
       getSplitsStartTime = System.currentTimeMillis()
@@ -784,4 +797,8 @@ class CarbonScanRDD[T: ClassTag](
   def setReadCommittedScope(readCommittedScope: ReadCommittedScope): Unit = {
     this.readCommittedScope = readCommittedScope
   }
+
+  def setValidateSegmentToAccess(needValidate: Boolean): Unit = {
+    validateSegmentToAccess = needValidate
+  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 695f607..1f1cd32 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -25,6 +25,7 @@ import java.util.UUID
 import java.util.regex.{Matcher, Pattern}
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.Map
 import scala.math.BigDecimal.RoundingMode
 
@@ -801,6 +802,26 @@ object CommonUtil {
     }
   }
 
+  def validateGlobalSortPartitions(propertiesMap: mutable.Map[String, String]): Unit = {
+    if (propertiesMap.contains("global_sort_partitions")) {
+      val globalSortPartitionsProp = propertiesMap("global_sort_partitions")
+      var pass = false
+      try {
+        val globalSortPartitions = Integer.parseInt(globalSortPartitionsProp)
+        if (globalSortPartitions > 0) {
+          pass = true
+        }
+      } catch {
+        case _ =>
+      }
+      if (!pass) {
+        throw new MalformedCarbonCommandException(
+          s"Table property global_sort_partitions : ${ globalSortPartitionsProp }" +
+          s" is invalid")
+      }
+    }
+  }
+
   def validateSortColumns(
       sortKey: Array[String],
       fields: Seq[(String, String)],
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index b3061d2..a53cf50 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -81,6 +81,21 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   def apply(plan: LogicalPlan): Seq[SparkPlan] = {
     val transformedPlan = makeDeterministic(plan)
     transformedPlan match {
+      case GlobalLimit(IntegerLiteral(limit),
+      LocalLimit(IntegerLiteral(limitValue),
+      _@PhysicalOperation(projects, filters, l: LogicalRelation))) if l.relation
+        .isInstanceOf[CarbonDatasourceHadoopRelation] =>
+        val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+        GlobalLimitExec(limit, LocalLimitExec(limitValue, pruneFilterProject(
+          l,
+          projects.filterNot(_.name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)),
+          filters,
+          (a, f, p) =>
+            setVectorReadSupport(
+              l,
+              a,
+              relation.buildScan(a.map(_.name).toArray, filters, projects, f, p))
+        ))) :: Nil
       case PhysicalOperation(projects, filters, l: LogicalRelation)
         if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
         val relation = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
@@ -89,7 +104,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         try {
           pruneFilterProject(
             l,
-            projects.filterNot(_.name.equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)),
+            projects,
             filters,
             (a, f, p) =>
               setVectorReadSupport(
@@ -597,8 +612,44 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       handledSet: AttributeSet,
       newProjectList: Seq[Attribute],
       updatedProjects: Seq[Expression]): (Seq[Attribute], Seq[Expression]) = {
-    ((projectsAttr.to[mutable.LinkedHashSet] ++ filterSet -- handledSet)
-       .map(relation.attributeMap).toSeq ++ newProjectList, updatedProjects)
+    val sparkSession = SparkSession.getActiveSession.get
+    val pushDownJoinEnabled = sparkSession.sparkContext.getConf
+      .getBoolean("spark.carbon.pushdown.join.as.filter", defaultValue = true)
+
+    // positionId column can be added in two cases
+    // case 1: SI pushdown case, SI rewritten plan adds positionId column
+    // case 2: if the user requested positionId column thru getPositionId() UDF
+    // positionId column should be removed only in case 1, as it is manually added
+    // Below code is added to handle case 2. But getPositionId() UDF is almost used only for testing
+    val isPositionIDRequested = relation.catalogTable match {
+      case Some(table) =>
+        val tblProperties = CarbonEnv.getCarbonTable(table.identifier)(sparkSession).getTableInfo
+          .getFactTable
+          .getTableProperties
+        val isPosIDRequested = if (tblProperties.containsKey("isPositionIDRequested")) {
+          val flag = java.lang.Boolean.parseBoolean(tblProperties.get("isPositionIDRequested"))
+          tblProperties.remove("isPositionIDRequested")
+          flag
+        } else {
+          false
+        }
+        isPosIDRequested
+      case _ => false
+    }
+    // remove positionId col only if pushdown is enabled and
+    // positionId col is not requested in the query
+    if (pushDownJoinEnabled && !isPositionIDRequested) {
+      ((projectsAttr.to[scala.collection.mutable.LinkedHashSet] ++ filterSet -- handledSet)
+         .map(relation.attributeMap).toSeq ++ newProjectList
+         .filterNot(attr => attr.name
+           .equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)), updatedProjects
+        .filterNot(attr => attr.isInstanceOf[AttributeReference] &&
+                           attr.asInstanceOf[AttributeReference].name
+                             .equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)))
+    } else {
+      ((projectsAttr.to[scala.collection.mutable.LinkedHashSet] ++ filterSet -- handledSet)
+         .map(relation.attributeMap).toSeq ++ newProjectList, updatedProjects)
+    }
   }
 
   private def getDataSourceScan(relation: LogicalRelation,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 0592808..c7f61e4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -38,16 +38,17 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
 import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.format.TableInfo
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.util.TableOptionConstant
 
 /**
  * Carbon Index util
@@ -297,6 +298,8 @@ object CarbonIndexUtil {
       segmentIdToLoadStartTimeMapping = scala.collection.mutable
         .Map((carbonLoadModel.getSegmentId, carbonLoadModel.getFactTimeStamp))
     }
+    val header = indexTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+    initializeSILoadModel(carbonLoadModel, header)
     val secondaryIndexModel = if (isLoadToFailedSISegments) {
       SecondaryIndexModel(
         sparkSession.sqlContext,
@@ -587,4 +590,22 @@ object CarbonIndexUtil {
     })
   }
 
+  def initializeSILoadModel(carbonLoadModel: CarbonLoadModel,
+      header: Array[String]): Unit = {
+    carbonLoadModel.setSerializationNullFormat(
+      TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + ",\\N")
+    carbonLoadModel.setBadRecordsLoggerEnable(
+      TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + ",false")
+    carbonLoadModel.setBadRecordsAction(
+      TableOptionConstant.BAD_RECORDS_ACTION.getName + ",force")
+    carbonLoadModel.setIsEmptyDataBadRecord(
+      DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + ",false")
+    carbonLoadModel.setTimestampFormat(CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+    carbonLoadModel.setDateFormat(CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+    carbonLoadModel.setCsvHeaderColumns(header)
+  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 855b989..feb5775 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -191,6 +191,10 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
             table.database, indexName.toLowerCase, tableColumns, properties)
           CarbonSparkSqlParserUtil.validateColumnCompressorProperty(
             properties.getOrElse(CarbonCommonConstants.COMPRESSOR, null))
+          // validate sort scope
+          CommonUtil.validateSortScope(properties)
+          // validate global_sort_partitions
+          CommonUtil.validateGlobalSortPartitions(properties)
           CarbonCreateSecondaryIndexCommand(
             indexModel,
             properties,
@@ -640,7 +644,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     val supportedPropertiesForIndexTable = Seq("TABLE_BLOCKSIZE",
       "COLUMN_META_CACHE",
       "CACHE_LEVEL",
-      CarbonCommonConstants.COMPRESSOR.toUpperCase)
+      CarbonCommonConstants.COMPRESSOR.toUpperCase,
+      "SORT_SCOPE",
+      "GLOBAL_SORT_PARTITIONS")
     tableProperties.foreach { property =>
       if (!supportedPropertiesForIndexTable.contains(property._1.toUpperCase)) {
         val errorMessage = "Unsupported Table property in index creation: " + property._1.toString
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SILoadCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SILoadCommand.scala
index be36c8e..361e915 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SILoadCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SILoadCommand.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
 import org.apache.spark.sql.secondaryindex.rdd.SecondaryIndexCreator
 
@@ -71,6 +72,18 @@ private[sql] case class LoadDataForSecondaryIndex(indexModel: IndexModel) extend
       carbonLoadModel.setTableName(relation.carbonTable.getTableName)
       carbonLoadModel.setDatabaseName(relation.carbonTable.getDatabaseName)
       carbonLoadModel.setTablePath(relation.carbonTable.getTablePath)
+      val sortScope = relation.carbonTable.getTableInfo.getFactTable
+        .getTableProperties
+        .get(CarbonCommonConstants.SORT_SCOPE)
+      if (sortScope != null) {
+        carbonLoadModel.setSortScope(sortScope)
+      }
+      val globalSortPartitions = relation.carbonTable.getTableInfo.getFactTable
+        .getTableProperties
+        .get("global_sort_partitions")
+      if (globalSortPartitions != null) {
+        carbonLoadModel.setGlobalSortPartitions(globalSortPartitions)
+      }
       var columnCompressor: String = relation.carbonTable.getTableInfo.getFactTable
         .getTableProperties
         .get(CarbonCommonConstants.COMPRESSOR)
@@ -78,6 +91,11 @@ private[sql] case class LoadDataForSecondaryIndex(indexModel: IndexModel) extend
         columnCompressor = CompressorFactory.getInstance.getCompressor.getName
       }
       carbonLoadModel.setColumnCompressor(columnCompressor)
+      val indexCarbonTable = CarbonEnv.getCarbonTable(Some(carbonLoadModel.getDatabaseName),
+        indexModel.indexName)(sparkSession)
+      val header = indexCarbonTable.getCreateOrderColumn.asScala
+        .map(_.getColName).toArray
+      CarbonIndexUtil.initializeSILoadModel(carbonLoadModel, header)
       createSecondaryIndex(sparkSession, indexModel, carbonLoadModel)
     } catch {
       case ex: Exception =>
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
index 7f4ae00..d0bdbc0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.rdd.CarbonMergeFilesRDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{CarbonEnv, SQLContext}
 import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.secondaryindex.command.{IndexModel, SecondaryIndexModel}
 import org.apache.spark.sql.secondaryindex.events.LoadTableSIPostExecutionEvent
@@ -74,6 +74,11 @@ object Compactor {
         carbonLoadModel.getTableName,
         indexColumns,
         index.getKey)
+      val indexCarbonTable = CarbonEnv.getCarbonTable(Some(carbonLoadModel.getDatabaseName),
+        index.getKey)(sqlContext.sparkSession)
+      val header = indexCarbonTable.getCreateOrderColumn.asScala
+        .map(_.getColName).toArray
+      CarbonIndexUtil.initializeSILoadModel(carbonLoadModel, header)
       val secondaryIndexModel = SecondaryIndexModel(sqlContext,
         carbonLoadModel,
         carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index cc0bdd9..a6f3120 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -23,9 +23,12 @@ import java.util.concurrent.Callable
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.rdd.CarbonMergeFilesRDD
-import org.apache.spark.sql.{CarbonEnv, SQLContext}
-import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.rdd.{CarbonMergeFilesRDD, RDD}
+import org.apache.spark.sql.{functions, CarbonEnv, CarbonUtils, DataFrame, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction}
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.logical.Project
+import org.apache.spark.sql.execution.command.ExecutionErrors
 import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.secondaryindex.command.SecondaryIndexModel
 import org.apache.spark.sql.secondaryindex.events.{LoadTableSIPostExecutionEvent, LoadTableSIPreExecutionEvent}
@@ -38,14 +41,17 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
+import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.indexserver.DistributedRDDUtils
-import org.apache.carbondata.processing.loading.TableProcessingOperations
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
+import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 /**
  * This class is aimed at creating secondary index for specified segments
@@ -152,68 +158,181 @@ object SecondaryIndexCreator {
           LOGGER.info("spark.dynamicAllocation.maxExecutors property is set to =" + execInstance)
         }
       }
-      var futureObjectList = List[java.util.concurrent.Future[Array[(String, Boolean)]]]()
-      for (eachSegment <- validSegmentList) {
-        val segId = eachSegment
-        futureObjectList :+= executorService.submit(new Callable[Array[(String, Boolean)]] {
-          @throws(classOf[Exception])
-          override def call(): Array[(String, Boolean)] = {
-            ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo
-              .put("carbonConf", SparkSQLUtil.sessionState(sc.sparkSession).newHadoopConf())
-            var eachSegmentSecondaryIndexCreationStatus: Array[(String, Boolean)] = Array.empty
-            CarbonLoaderUtil.checkAndCreateCarbonDataLocation(segId, indexCarbonTable)
-            val carbonLoadModel = getCopyObject(secondaryIndexModel)
-            carbonLoadModel
-              .setFactTimeStamp(secondaryIndexModel.segmentIdToLoadStartTimeMapping(eachSegment))
-            carbonLoadModel.setTablePath(secondaryIndexModel.carbonTable.getTablePath)
-            val secondaryIndexCreationStatus = new CarbonSecondaryIndexRDD(sc.sparkSession,
-              new SecondaryIndexCreationResultImpl,
-              carbonLoadModel,
-              secondaryIndexModel.secondaryIndex,
-              segId, execInstance, indexCarbonTable, forceAccessSegment, isCompactionCall).collect()
+      var successSISegments: List[String] = List()
+      var failedSISegments: List[String] = List()
+      val sort_scope = indexCarbonTable.getTableInfo.getFactTable.getTableProperties
+        .get("sort_scope")
+      if (sort_scope != null && sort_scope.equalsIgnoreCase("global_sort")) {
+        val mainTable = secondaryIndexModel.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+        var futureObjectList = List[java.util.concurrent.Future[Array[(String,
+          (LoadMetadataDetails, ExecutionErrors))]]]()
+        for (eachSegment <- validSegmentList) {
+          futureObjectList :+= executorService
+            .submit(new Callable[Array[(String, (LoadMetadataDetails, ExecutionErrors))]] {
+              @throws(classOf[Exception])
+              override def call(): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+                val carbonLoadModel = getCopyObject(secondaryIndexModel)
+                // to load as a global sort SI segment,
+                // we need to query main table along with position reference projection
+                val projections = indexCarbonTable.getCreateOrderColumn
+                  .asScala
+                  .map(_.getColName)
+                  .filterNot(_.equalsIgnoreCase(CarbonCommonConstants.POSITION_REFERENCE)).toSet
+                val explodeColumn = mainTable.getCreateOrderColumn.asScala
+                  .filter(x => x.getDataType.isComplexType &&
+                               projections.contains(x.getColName))
+                var dataFrame = dataFrameOfSegments(sc.sparkSession,
+                  mainTable,
+                  projections.mkString(","),
+                  Array(eachSegment))
+                // flatten the complex SI
+                if (explodeColumn.nonEmpty) {
+                  val columns = dataFrame.schema.map { x =>
+                    if (x.name.equals(explodeColumn.head.getColName)) {
+                      functions.explode_outer(functions.col(x.name))
+                    } else {
+                      functions.col(x.name)
+                    }
+                  }
+                  dataFrame = dataFrame.select(columns: _*)
+                }
+                val dataLoadSchema = new CarbonDataLoadSchema(indexCarbonTable)
+                carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+                carbonLoadModel.setTableName(indexCarbonTable.getTableName)
+                carbonLoadModel.setDatabaseName(indexCarbonTable.getDatabaseName)
+                carbonLoadModel.setTablePath(indexCarbonTable.getTablePath)
+                carbonLoadModel.setFactTimeStamp(secondaryIndexModel
+                  .segmentIdToLoadStartTimeMapping(eachSegment))
+                carbonLoadModel.setSegmentId(eachSegment)
+                var result: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
+                try {
+                  val configuration = FileFactory.getConfiguration
+                  configuration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSegment)
+                  def findCarbonScanRDD(rdd: RDD[_]): Unit = {
+                    rdd match {
+                      case carbonScanRDD: CarbonScanRDD[_] =>
+                        carbonScanRDD.setValidateSegmentToAccess(false)
+                      case others =>
+                        others.dependencies.foreach {x => findCarbonScanRDD(x.rdd)}
+                    }
+                  }
+                  findCarbonScanRDD(dataFrame.rdd)
+                  // accumulator to collect segment metadata
+                  val segmentMetaDataAccumulator = sc.sparkSession.sqlContext
+                    .sparkContext
+                    .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
+                  // TODO: use new insert into flow, instead of DataFrame prepare RDD[InternalRow]
+                  result = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(
+                    sc.sparkSession,
+                    Some(dataFrame),
+                    carbonLoadModel,
+                    hadoopConf = configuration, segmentMetaDataAccumulator)
+                }
+                SegmentFileStore
+                  .writeSegmentFile(indexCarbonTable,
+                    eachSegment,
+                    String.valueOf(carbonLoadModel.getFactTimeStamp))
+                segmentToLoadStartTimeMap
+                  .put(eachSegment, String.valueOf(carbonLoadModel.getFactTimeStamp))
+                result
+              }
+            })
+        }
+        val segmentSecondaryIndexCreationStatus = futureObjectList.filter(_.get().length > 0)
+          .groupBy(a => a.get().head._2._1.getSegmentStatus)
+        val hasSuccessSegments =
+          segmentSecondaryIndexCreationStatus.contains(SegmentStatus.LOAD_PARTIAL_SUCCESS) ||
+          segmentSecondaryIndexCreationStatus.contains(SegmentStatus.SUCCESS)
+        val hasFailedSegments = segmentSecondaryIndexCreationStatus
+          .contains(SegmentStatus.MARKED_FOR_DELETE)
+        if (hasSuccessSegments) {
+          successSISegments =
+            segmentSecondaryIndexCreationStatus(SegmentStatus.SUCCESS).collect {
+              case segments: java.util.concurrent.Future[Array[(String, (LoadMetadataDetails,
+                ExecutionErrors))]] =>
+                segments.get().head._2._1.getLoadName
+            }
+        }
+        if (hasFailedSegments) {
+          // if the call is from compaction, we need to fail the main table compaction also, and if
+          // the load is called from SIloadEventListener, which is for corresponding main table
+          // segment, then if SI load fails, we need to fail main table load also, so throw
+          // exception,
+          // if load is called from SI creation or SILoadEventListenerForFailedSegments, no need to
+          // fail, just make the segement as marked for delete, so that next load to main table will
+          // take care
+          if (isCompactionCall || !isLoadToFailedSISegments) {
+            throw new Exception("Secondary index creation failed")
+          } else {
+            failedSISegments =
+              segmentSecondaryIndexCreationStatus(SegmentStatus.MARKED_FOR_DELETE).collect {
+                case segments: java.util.concurrent.Future[Array[(String, (LoadMetadataDetails,
+                  ExecutionErrors))]] =>
+                  segments.get().head._1
+              }
+          }
+        }
+      } else {
+        var futureObjectList = List[java.util.concurrent.Future[Array[(String, Boolean)]]]()
+        for (eachSegment <- validSegmentList) {
+          val segId = eachSegment
+          futureObjectList :+= executorService.submit(new Callable[Array[(String, Boolean)]] {
+            @throws(classOf[Exception])
+            override def call(): Array[(String, Boolean)] = {
+              ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo().getNonSerializableExtraInfo
+                .put("carbonConf", SparkSQLUtil.sessionState(sc.sparkSession).newHadoopConf())
+              var eachSegmentSecondaryIndexCreationStatus: Array[(String, Boolean)] = Array.empty
+              CarbonLoaderUtil.checkAndCreateCarbonDataLocation(segId, indexCarbonTable)
+              val carbonLoadModel = getCopyObject(secondaryIndexModel)
+              carbonLoadModel
+                .setFactTimeStamp(secondaryIndexModel.segmentIdToLoadStartTimeMapping(eachSegment))
+              carbonLoadModel.setTablePath(secondaryIndexModel.carbonTable.getTablePath)
+              val secondaryIndexCreationStatus = new CarbonSecondaryIndexRDD(sc.sparkSession,
+                new SecondaryIndexCreationResultImpl,
+                carbonLoadModel,
+                secondaryIndexModel.secondaryIndex,
+                segId, execInstance, indexCarbonTable, forceAccessSegment).collect()
               SegmentFileStore
                 .writeSegmentFile(indexCarbonTable,
                   segId,
                   String.valueOf(carbonLoadModel.getFactTimeStamp))
-            segmentToLoadStartTimeMap.put(segId, carbonLoadModel.getFactTimeStamp.toString)
-            if (secondaryIndexCreationStatus.length > 0) {
-              eachSegmentSecondaryIndexCreationStatus = secondaryIndexCreationStatus
+              segmentToLoadStartTimeMap.put(segId, carbonLoadModel.getFactTimeStamp.toString)
+              if (secondaryIndexCreationStatus.length > 0) {
+                eachSegmentSecondaryIndexCreationStatus = secondaryIndexCreationStatus
+              }
+              eachSegmentSecondaryIndexCreationStatus
             }
-            eachSegmentSecondaryIndexCreationStatus
-          }
-        })
-      }
-
-      val segmentSecondaryIndexCreationStatus = futureObjectList.filter(_.get().length > 0)
-        .groupBy(a => a.get().head._2)
-      val hasSuccessSegments = segmentSecondaryIndexCreationStatus.contains("true".toBoolean)
-      val hasFailedSegments = segmentSecondaryIndexCreationStatus.contains("false".toBoolean)
-      var successSISegments: List[String] = List()
-      var failedSISegments: List[String] = List()
-      if (hasSuccessSegments) {
-        successSISegments =
-          segmentSecondaryIndexCreationStatus("true".toBoolean).collect {
-            case segments: java.util.concurrent.Future[Array[(String, Boolean)]] =>
-              segments.get().head._1
-          }
-      }
-
-      if (hasFailedSegments) {
-        // if the call is from compaction, we need to fail the main table compaction also, and if
-        // the load is called from SILoadEventListener, which is for corresponding main table
-        // segment, then if SI load fails, we need to fail main table load also, so throw exception,
-        // if load is called from SI creation or SILoadEventListenerForFailedSegments, no need to
-        // fail, just make the segment as marked for delete, so that next load to main table will
-        // take care
-        if (isCompactionCall || !isLoadToFailedSISegments) {
-          throw new Exception("Secondary index creation failed")
-        } else {
-          failedSISegments =
-            segmentSecondaryIndexCreationStatus("false".toBoolean).collect {
+          })
+        }
+        val segmentSecondaryIndexCreationStatus = futureObjectList.filter(_.get().length > 0)
+          .groupBy(a => a.get().head._2)
+        val hasSuccessSegments = segmentSecondaryIndexCreationStatus.contains("true".toBoolean)
+        val hasFailedSegments = segmentSecondaryIndexCreationStatus.contains("false".toBoolean)
+        if (hasSuccessSegments) {
+          successSISegments =
+            segmentSecondaryIndexCreationStatus("true".toBoolean).collect {
               case segments: java.util.concurrent.Future[Array[(String, Boolean)]] =>
                 segments.get().head._1
             }
         }
+        if (hasFailedSegments) {
+          // if the call is from compaction, we need to fail the main table compaction also, and if
+          // the load is called from SIloadEventListener, which is for corresponding main table
+          // segment, then if SI load fails, we need to fail main table load also, so throw
+          // exception,
+          // if load is called from SI creation or SILoadEventListenerForFailedSegments, no need to
+          // fail, just make the segement as marked for delete, so that next load to main table will
+          // take care
+          if (isCompactionCall || !isLoadToFailedSISegments) {
+            throw new Exception("Secondary index creation failed")
+          } else {
+            failedSISegments =
+              segmentSecondaryIndexCreationStatus("false".toBoolean).collect {
+                case segments: java.util.concurrent.Future[Array[(String, Boolean)]] =>
+                  segments.get().head._1
+              }
+          }
+        }
       }
       // what and all segments the load failed, only for those need make status as marked
       // for delete, remaining let them be SUCCESS
@@ -379,15 +498,20 @@ object SecondaryIndexCreator {
     copyObj.setDatabaseName(carbonLoadModel.getDatabaseName)
     copyObj.setLoadMetadataDetails(carbonLoadModel.getLoadMetadataDetails)
     copyObj.setCarbonDataLoadSchema(carbonLoadModel.getCarbonDataLoadSchema)
-
+    copyObj.setSerializationNullFormat(carbonLoadModel.getSerializationNullFormat)
+    copyObj.setBadRecordsLoggerEnable(carbonLoadModel.getBadRecordsLoggerEnable)
+    copyObj.setBadRecordsAction(carbonLoadModel.getBadRecordsAction)
+    copyObj.setIsEmptyDataBadRecord(carbonLoadModel.getIsEmptyDataBadRecord)
     val indexTable = CarbonEnv.getCarbonTable(
       Some(carbonLoadModel.getDatabaseName),
       secondaryIndexModel.secondaryIndex.indexName)(secondaryIndexModel.sqlContext.sparkSession)
-
+    copyObj.setCsvHeaderColumns(carbonLoadModel.getCsvHeaderColumns)
     copyObj.setColumnCompressor(
       CarbonIndexUtil.getCompressorForIndexTable(
         indexTable, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable))
     copyObj.setFactTimeStamp(carbonLoadModel.getFactTimeStamp)
+    copyObj.setTimestampFormat(carbonLoadModel.getTimestampFormat)
+    copyObj.setDateFormat(carbonLoadModel.getDateFormat)
     copyObj
   }
 
@@ -428,4 +552,34 @@ object SecondaryIndexCreator {
     }
     threadPoolSize
   }
+
+  def dataFrameOfSegments(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      projections: String,
+      segments: Array[String]): DataFrame = {
+    try {
+      CarbonUtils.threadSet(
+        CarbonCommonConstants.CARBON_INPUT_SEGMENTS + carbonTable.getDatabaseName +
+        CarbonCommonConstants.POINT + carbonTable.getTableName, segments.mkString(","))
+      val logicalPlan = sparkSession.sql(
+        s"select $projections from ${ carbonTable.getDatabaseName }.${
+          carbonTable.getTableName}").queryExecution.logical
+      val positionId = UnresolvedAlias(Alias(UnresolvedFunction("getPositionId",
+        Seq.empty, isDistinct = false), CarbonCommonConstants.POSITION_ID)())
+      val newLogicalPlan = logicalPlan.transform {
+        case p: Project =>
+          Project(p.projectList :+ positionId, p.child)
+      }
+      carbonTable.getTableInfo
+        .getFactTable
+        .getTableProperties.put("isPositionIDRequested", "true")
+      SparkSQLUtil.execute(newLogicalPlan, sparkSession)
+    } finally {
+      CarbonUtils
+        .threadUnset(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
+                     carbonTable.getDatabaseName + CarbonCommonConstants.POINT +
+                     carbonTable.getTableName)
+    }
+  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 9f29382..be4e424 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -414,7 +414,7 @@ object AlterTableUtil {
       // validate the range column properties
       validateRangeColumnProperties(carbonTable, lowerCasePropertiesMap)
 
-      validateGlobalSortPartitions(carbonTable, lowerCasePropertiesMap)
+      CommonUtil.validateGlobalSortPartitions(lowerCasePropertiesMap)
 
       // validate the Sort Scope and Sort Columns
       validateSortScopeAndSortColumnsProperties(carbonTable,
@@ -629,27 +629,6 @@ object AlterTableUtil {
     }
   }
 
-  def validateGlobalSortPartitions(carbonTable: CarbonTable,
-      propertiesMap: mutable.Map[String, String]): Unit = {
-    if (propertiesMap.get("global_sort_partitions").isDefined) {
-      val globalSortPartitionsProp = propertiesMap.get("global_sort_partitions").get
-      var pass = false
-      try {
-        val globalSortPartitions = Integer.parseInt(globalSortPartitionsProp)
-        if (globalSortPartitions > 0) {
-          pass = true
-        }
-      } catch {
-        case _ =>
-      }
-      if (!pass) {
-        throw new MalformedCarbonCommandException(
-          s"Table property global_sort_partitions : ${ globalSortPartitionsProp }" +
-          s" is invalid")
-      }
-    }
-  }
-
   def validateSortScopeAndSortColumnsProperties(carbonTable: CarbonTable,
                                                 propertiesMap: mutable.Map[String, String],
                                                 tblPropertiesMap: mutable.Map[String, String]