You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 22:11:40 UTC

[14/49] carbondata git commit: [CARBONDATA-1669] Clean up code in CarbonDataRDDFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1ad25c3..47d7c95 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -34,19 +34,17 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel}
+import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
-import org.apache.spark.util.SparkUtil
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnarFormatVersion}
-import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
@@ -61,7 +59,7 @@ import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
@@ -77,153 +75,6 @@ object CarbonDataRDDFactory {
 
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
-  def alterTableForCompaction(sqlContext: SQLContext,
-      alterTableModel: AlterTableModel,
-      carbonLoadModel: CarbonLoadModel,
-      storeLocation: String): Unit = {
-    var compactionSize: Long = 0
-    var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
-    if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
-      compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
-      compactionType = CompactionType.MAJOR_COMPACTION
-    } else if (alterTableModel.compactionType
-      .equalsIgnoreCase(CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) {
-      compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
-      if (alterTableModel.segmentUpdateStatusManager.get != None) {
-        carbonLoadModel
-          .setSegmentUpdateStatusManager(alterTableModel.segmentUpdateStatusManager.get)
-
-        carbonLoadModel
-          .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get
-            .getLoadMetadataDetails.toList.asJava)
-      }
-    } else if (alterTableModel.compactionType.
-      equalsIgnoreCase(CompactionType.SEGMENT_INDEX_COMPACTION.toString)) {
-      compactionType = CompactionType.SEGMENT_INDEX_COMPACTION
-    } else {
-      compactionType = CompactionType.MINOR_COMPACTION
-    }
-
-    LOGGER.audit(s"Compaction request received for table " +
-        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-
-    if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
-    }
-    if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
-      // Just launch job to merge index and return
-      CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
-        carbonLoadModel.getLoadMetadataDetails.asScala.map(_.getLoadName),
-        carbonLoadModel.getStorePath,
-        carbonTable)
-      return
-    }
-    // reading the start time of data load.
-    val loadStartTime : Long =
-    if (alterTableModel.factTimeStamp.isEmpty) {
-      CarbonUpdateUtil.readCurrentTime
-    } else {
-      alterTableModel.factTimeStamp.get
-    }
-    carbonLoadModel.setFactTimeStamp(loadStartTime)
-
-    val isCompactionTriggerByDDl = true
-    val compactionModel = CompactionModel(compactionSize,
-      compactionType,
-      carbonTable,
-      isCompactionTriggerByDDl
-    )
-
-    val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-          CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-        )
-        .equalsIgnoreCase("true")
-
-    // if system level compaction is enabled then only one compaction can run in the system
-    // if any other request comes at this time then it will create a compaction request file.
-    // so that this will be taken up by the compaction process which is executing.
-    if (!isConcurrentCompactionAllowed) {
-      LOGGER.info("System level compaction lock is enabled.")
-      handleCompactionForSystemLocking(sqlContext,
-        carbonLoadModel,
-        storeLocation,
-        compactionType,
-        carbonTable,
-        compactionModel
-      )
-    } else {
-      // normal flow of compaction
-      val lock = CarbonLockFactory
-          .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-            LockUsage.COMPACTION_LOCK
-          )
-
-      if (lock.lockWithRetries()) {
-        LOGGER.info("Acquired the compaction lock for table" +
-            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        try {
-          startCompactionThreads(sqlContext,
-            carbonLoadModel,
-            storeLocation,
-            compactionModel,
-            lock
-          )
-        } catch {
-          case e: Exception =>
-            LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
-            lock.unlock()
-            throw e
-        }
-      } else {
-        LOGGER.audit("Not able to acquire the compaction lock for table " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        LOGGER.error(s"Not able to acquire the compaction lock for table" +
-            s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        sys.error("Table is already locked for compaction. Please try after some time.")
-      }
-    }
-  }
-
-  def alterTableSplitPartition(sqlContext: SQLContext,
-      partitionId: String,
-      carbonLoadModel: CarbonLoadModel,
-      oldPartitionIdList: List[Int]): Unit = {
-    LOGGER.audit(s"Add partition request received for table " +
-         s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-    try {
-      startSplitThreads(sqlContext,
-        carbonLoadModel,
-        partitionId,
-        oldPartitionIdList)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception in start splitting partition thread. ${ e.getMessage }")
-        throw e
-    }
-  }
-
-  def alterTableDropPartition(sqlContext: SQLContext,
-      partitionId: String,
-      carbonLoadModel: CarbonLoadModel,
-      dropWithData: Boolean,
-      oldPartitionIds: List[Int]): Unit = {
-    LOGGER.audit(s"Drop partition request received for table " +
-                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-    try {
-      startDropThreads(sqlContext,
-        carbonLoadModel,
-        partitionId,
-        dropWithData,
-        oldPartitionIds)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception in start dropping partition thread. ${ e.getMessage }")
-        throw e
-    }
-  }
-
   def handleCompactionForSystemLocking(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       storeLocation: String,
@@ -288,15 +139,17 @@ object CarbonDataRDDFactory {
 
     val compactionThread = new Thread {
       override def run(): Unit = {
-
         try {
           // compaction status of the table which is triggered by the user.
           var triggeredCompactionStatus = false
           var exception: Exception = null
           try {
-            DataManagementFunc.executeCompaction(carbonLoadModel: CarbonLoadModel,
-              compactionModel: CompactionModel,
-              executor, sqlContext, storeLocation
+            DataManagementFunc.executeCompaction(
+              carbonLoadModel,
+              compactionModel,
+              executor,
+              sqlContext,
+              storeLocation
             )
             triggeredCompactionStatus = true
           } catch {
@@ -305,18 +158,18 @@ object CarbonDataRDDFactory {
               exception = e
           }
           // continue in case of exception also, check for all the tables.
-          val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-                CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-              ).equalsIgnoreCase("true")
+          val isConcurrentCompactionAllowed = CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+            CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+          ).equalsIgnoreCase("true")
 
           if (!isConcurrentCompactionAllowed) {
             LOGGER.info("System level compaction lock is enabled.")
             val skipCompactionTables = ListBuffer[CarbonTableIdentifier]()
-            var tableForCompaction = CarbonCompactionUtil
-              .getNextTableToCompact(CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
+            var tableForCompaction = CarbonCompactionUtil.getNextTableToCompact(
+              CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore
                 .listAllTables(sqlContext.sparkSession).toArray,
-                skipCompactionTables.toList.asJava)
+              skipCompactionTables.toList.asJava)
             while (null != tableForCompaction) {
               LOGGER.info("Compaction request has been identified for table " +
                   s"${ tableForCompaction.getDatabaseName }." +
@@ -325,11 +178,10 @@ object CarbonDataRDDFactory {
               val metadataPath = table.getMetaDataFilepath
               val compactionType = CarbonCompactionUtil.determineCompactionType(metadataPath)
 
-              val newCarbonLoadModel = new CarbonLoadModel()
-              DataManagementFunc.prepareCarbonLoadModel(table, newCarbonLoadModel)
+              val newCarbonLoadModel = prepareCarbonLoadModel(table)
 
               val compactionSize = CarbonDataMergerUtil
-                  .getCompactionSize(CompactionType.MAJOR_COMPACTION)
+                .getCompactionSize(CompactionType.MAJOR_COMPACTION)
 
               val newcompactionModel = CompactionModel(compactionSize,
                 compactionType,
@@ -386,137 +238,25 @@ object CarbonDataRDDFactory {
     compactionThread.run()
   }
 
-  case class SplitThread(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      executor: ExecutorService,
-      segmentId: String,
-      partitionId: String,
-      oldPartitionIdList: List[Int]) extends Thread {
-      override def run(): Unit = {
-        var triggeredSplitPartitionStatus = false
-        var exception: Exception = null
-        try {
-          DataManagementFunc.executePartitionSplit(sqlContext,
-            carbonLoadModel, executor, segmentId, partitionId, oldPartitionIdList)
-          triggeredSplitPartitionStatus = true
-        } catch {
-          case e: Exception =>
-            LOGGER.error(s"Exception in partition split thread: ${ e.getMessage } }")
-          exception = e
-        }
-        if (triggeredSplitPartitionStatus == false) {
-          throw new Exception("Exception in split partition " + exception.getMessage)
-        }
-      }
-  }
-
-  case class dropPartitionThread(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      executor: ExecutorService,
-      segmentId: String,
-      partitionId: String,
-      dropWithData: Boolean,
-      oldPartitionIds: List[Int]) extends Thread {
-    override def run(): Unit = {
-      try {
-        DataManagementFunc.executeDroppingPartition(sqlContext, carbonLoadModel, executor,
-          segmentId, partitionId, dropWithData, oldPartitionIds)
-      } catch {
-        case e: Exception =>
-          LOGGER.error(s"Exception in dropping partition thread: ${ e.getMessage } }")
-      }
-    }
-  }
-
-  def startSplitThreads(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      partitionId: String,
-      oldPartitionIdList: List[Int]): Unit = {
-    val numberOfCores = CarbonProperties.getInstance()
-      .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
-        CarbonCommonConstants.DEFAULT_NUMBER_CORES)
-    val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
-    try {
-      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
-      val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
-      val threadArray: Array[SplitThread] = new Array[SplitThread](validSegments.size)
-      var i = 0
-      validSegments.foreach { segmentId =>
-        threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor,
-          segmentId, partitionId, oldPartitionIdList)
-        threadArray(i).start()
-        i += 1
-      }
-      threadArray.foreach {
-        thread => thread.join()
-      }
-      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
-        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
-      refresher.refreshSegments(validSegments.asJava)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception when split partition: ${ e.getMessage }")
-      throw e
-    } finally {
-      executor.shutdown()
-      try {
-        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
-      } catch {
-        case e: Exception =>
-          LOGGER.error(s"Exception in add/split partition thread while deleting partial load file" +
-                       s" ${ e.getMessage }")
-      }
-    }
-  }
-
-  def startDropThreads(sqlContext: SQLContext,
-      carbonLoadModel: CarbonLoadModel,
-      partitionId: String,
-      dropWithData: Boolean,
-      oldPartitionIds: List[Int]): Unit = {
-    val numberOfCores = CarbonProperties.getInstance()
-    .getProperty(CarbonCommonConstants.NUM_CORES_ALT_PARTITION,
-      CarbonCommonConstants.DEFAULT_NUMBER_CORES)
-    val executor : ExecutorService = Executors.newFixedThreadPool(numberOfCores.toInt)
-    try {
-      val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      val segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier)
-      val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
-      val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
-      var i = 0
-      for (segmentId: String <- validSegments) {
-        threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor,
-            segmentId, partitionId, dropWithData, oldPartitionIds)
-        threadArray(i).start()
-        i += 1
-      }
-      for (thread <- threadArray) {
-        thread.join()
-      }
-      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getStorePath,
-        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
-      refresher.refreshSegments(validSegments.asJava)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")
-    } finally {
-      executor.shutdown()
-      try {
-        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
-      } catch {
-        case e: Exception =>
-          LOGGER.error(s"Exception in dropping partition thread while deleting partial load file" +
-                       s" ${ e.getMessage }")
-      }
-    }
+  private def prepareCarbonLoadModel(
+      table: CarbonTable
+  ): CarbonLoadModel = {
+    val loadModel = new CarbonLoadModel
+    loadModel.setTableName(table.getFactTableName)
+    val dataLoadSchema = new CarbonDataLoadSchema(table)
+    // Need to fill dimension relation
+    loadModel.setCarbonDataLoadSchema(dataLoadSchema)
+    loadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
+    loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
+    loadModel.setStorePath(table.getStorePath)
+    CommonUtil.readLoadMetadataDetails(loadModel)
+    val loadStartTime = CarbonUpdateUtil.readCurrentTime()
+    loadModel.setFactTimeStamp(loadStartTime)
+    loadModel
   }
 
-  def loadCarbonData(sqlContext: SQLContext,
+  def loadCarbonData(
+      sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       storePath: String,
       columnar: Boolean,
@@ -524,425 +264,91 @@ object CarbonDataRDDFactory {
       result: Option[DictionaryServer],
       overwriteTable: Boolean,
       dataFrame: Option[DataFrame] = None,
-      updateModel: Option[UpdateTableModel] = None): Unit = {
+      updateModel: Option[UpdateTableModel] = None
+  ): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     // for handling of the segment Merging.
-    def handleSegmentMerging(): Unit = {
-      LOGGER.info(s"compaction need status is" +
-          s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
-      if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
-        LOGGER.audit(s"Compaction request received for table " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        val compactionSize = 0
-        val isCompactionTriggerByDDl = false
-        val compactionModel = CompactionModel(compactionSize,
-          CompactionType.MINOR_COMPACTION,
-          carbonTable,
-          isCompactionTriggerByDDl
-        )
-        var storeLocation = ""
-        val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
-        if (null != configuredStore && configuredStore.nonEmpty) {
-          storeLocation = configuredStore(Random.nextInt(configuredStore.length))
-        }
-        if (storeLocation == null) {
-          storeLocation = System.getProperty("java.io.tmpdir")
-        }
-        storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
-
-        val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
-              CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
-            )
-            .equalsIgnoreCase("true")
-
-        if (!isConcurrentCompactionAllowed) {
-
-          handleCompactionForSystemLocking(sqlContext,
-            carbonLoadModel,
-            storeLocation,
-            CompactionType.MINOR_COMPACTION,
-            carbonTable,
-            compactionModel
-          )
-        } else {
-          val lock = CarbonLockFactory
-              .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-                LockUsage.COMPACTION_LOCK
-              )
 
-          if (lock.lockWithRetries()) {
-            LOGGER.info("Acquired the compaction lock.")
-            try {
-              startCompactionThreads(sqlContext,
-                carbonLoadModel,
-                storeLocation,
-                compactionModel,
-                lock
-              )
-            } catch {
-              case e: Exception =>
-                LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
-                lock.unlock()
-                throw e
-            }
-          } else {
-            LOGGER.audit("Not able to acquire the compaction lock for table " +
-                s"${ carbonLoadModel.getDatabaseName }.${
-                  carbonLoadModel
-                      .getTableName
-                }")
-            LOGGER.error("Not able to acquire the compaction lock for table " +
-                s"${ carbonLoadModel.getDatabaseName }.${
-                  carbonLoadModel
-                      .getTableName
-                }")
-          }
-        }
-      }
+    LOGGER.audit(s"Data load request has been received for table" +
+                 s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+    // Check if any load need to be deleted before loading new data
+    DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
+      carbonLoadModel.getTableName, storePath, isForceDeletion = false, carbonTable)
+    var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
+    var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
+
+    // create new segment folder  in carbon store
+    if (updateModel.isEmpty) {
+      CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
+        carbonLoadModel.getSegmentId, carbonTable)
     }
-
-    def updateStatus(status: Array[(String, (LoadMetadataDetails, ExecutionErrors))],
-        loadStatus: String) = {
-      val metadataDetails = if (status != null && status(0) != null) {
-        status(0)._2._1
-      } else {
-        new LoadMetadataDetails
-      }
-      CarbonLoaderUtil
-        .populateNewLoadMetaEntry(metadataDetails,
-          loadStatus,
-          carbonLoadModel.getFactTimeStamp,
-          true)
-      val success = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
-        carbonLoadModel, false, overwriteTable)
-      if (!success) {
-        val errorMessage = "Dataload failed due to failure in table status updation."
-        LOGGER.audit("Data load is failed for " +
-                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        LOGGER.error("Dataload failed due to failure in table status updation.")
-        throw new Exception(errorMessage)
-      } else if (!carbonLoadModel.isRetentionRequest) {
-        // TODO : Handle it
-        LOGGER.info("********Database updated**********")
-      }
-    }
-
+    var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+    var errorMessage: String = "DataLoad failure"
+    var executorMessage: String = ""
+    val isSortTable = carbonTable.getNumberOfSortColumns > 0
+    val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
     try {
-      LOGGER.audit(s"Data load request has been received for table" +
-          s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-      // Check if any load need to be deleted before loading new data
-      DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
-        carbonLoadModel.getTableName, storePath, false, carbonTable)
-      var blocksGroupBy: Array[(String, Array[BlockDetails])] = null
-      var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
-      var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
-
-      def loadDataFile(): Unit = {
-        /*
-         * when data load handle by node partition
-         * 1)clone the hadoop configuration,and set the file path to the configuration
-         * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info
-         * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block,
-         *   for locally writing carbondata files(one file one block) in nodes
-         * 4)use NewCarbonDataLoadRDD to load data and write to carbondata files
-         */
-        val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
-        // FileUtils will skip file which is no csv, and return all file path which split by ','
-        val filePaths = carbonLoadModel.getFactFilePath
-        hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths)
-        hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
-        hadoopConfiguration.set("io.compression.codecs",
-          """org.apache.hadoop.io.compress.GzipCodec,
-             org.apache.hadoop.io.compress.DefaultCodec,
-             org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
-
-        CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
-
-        val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
-        val jobContext = new Job(hadoopConfiguration)
-        val rawSplits = inputFormat.getSplits(jobContext).toArray
-        val blockList = rawSplits.map { inputSplit =>
-          val fileSplit = inputSplit.asInstanceOf[FileSplit]
-          new TableBlockInfo(fileSplit.getPath.toString,
-            fileSplit.getStart, "1",
-            fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
-          ).asInstanceOf[Distributable]
-        }
-        // group blocks to nodes, tasks
-        val startTime = System.currentTimeMillis
-        val activeNodes = DistributionUtil
-            .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
-        val nodeBlockMapping =
-          CarbonLoaderUtil
-              .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
-              .toSeq
-        val timeElapsed: Long = System.currentTimeMillis - startTime
-        LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
-        LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
-            s"No.of Nodes: ${nodeBlockMapping.size}")
-        var str = ""
-        nodeBlockMapping.foreach(entry => {
-          val tableBlock = entry._2
-          str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
-          tableBlock.asScala.foreach(tableBlockInfo =>
-            if (!tableBlockInfo.getLocations.exists(hostentry =>
-              hostentry.equalsIgnoreCase(entry._1)
-            )) {
-              str = str + " , mismatch locations: " + tableBlockInfo.getLocations
-                  .foldLeft("")((a, b) => a + "," + b)
-            }
-          )
-          str = str + "\n"
-        }
-        )
-        LOGGER.info(str)
-        blocksGroupBy = nodeBlockMapping.map { entry =>
-          val blockDetailsList =
-            entry._2.asScala.map { distributable =>
-              val tableBlock = distributable.asInstanceOf[TableBlockInfo]
-              new BlockDetails(new Path(tableBlock.getFilePath),
-                tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
-              )
-            }.toArray
-          (entry._1, blockDetailsList)
-        }.toArray
-
-        status = new NewCarbonDataLoadRDD(
-          sqlContext.sparkContext,
-          new DataLoadResultImpl(),
+      if (updateModel.isDefined) {
+        res = loadDataFrameForUpdate(
+          sqlContext,
+          dataFrame,
           carbonLoadModel,
-          blocksGroupBy
-        ).collect()
-      }
-
-      def loadDataFrame(): Unit = {
-        try {
-          val rdd = dataFrame.get.rdd
-
-          val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
-            DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
-          }.distinct.size
-          val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
-            sqlContext.sparkContext)
-          val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
-
-          status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
-            new DataLoadResultImpl(),
-            carbonLoadModel,
-            newRdd).collect()
-
-        } catch {
-          case ex: Exception =>
-            LOGGER.error(ex, "load data frame failed")
-            throw ex
-        }
-      }
-
-      def loadDataFrameForUpdate(): Unit = {
-        val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate
-
-        def triggerDataLoadForSegment(key: String, taskNo: Int,
-            iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
-          val rddResult = new updateResultImpl()
-          val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-          val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
-            var partitionID = "0"
-            val loadMetadataDetails = new LoadMetadataDetails
-            val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
-            var uniqueLoadStatusId = ""
-            try {
-              val segId = key
-              val index = taskNo
-              uniqueLoadStatusId = carbonLoadModel.getTableName +
-                                   CarbonCommonConstants.UNDERSCORE +
-                                   (index + "_0")
-
-              // convert timestamp
-              val timeStampInLong = updateModel.get.updatedTimeStamp + ""
-              loadMetadataDetails.setPartitionCount(partitionID)
-              loadMetadataDetails.setLoadName(segId)
-              loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
-              carbonLoadModel.setPartitionId(partitionID)
-              carbonLoadModel.setSegmentId(segId)
-              carbonLoadModel.setTaskNo(String.valueOf(index))
-              carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
-
-              // During Block Spill case Increment of File Count and proper adjustment of Block
-              // naming is only done when AbstractFactDataWriter.java : initializeWriter get
-              // CarbondataFileName as null. For handling Block Spill not setting the
-              // CarbondataFileName in case of Update.
-              // carbonLoadModel.setCarbondataFileName(newBlockName)
-
-              // storeLocation = CarbonDataLoadRDD.initialize(carbonLoadModel, index)
-              loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-              UpdateDataLoad.DataLoadForUpdate(segId,
-                index,
-                iter,
-                carbonLoadModel,
-                loadMetadataDetails)
-            } catch {
-              case e: NoRetryException =>
-                loadMetadataDetails
-                  .setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-                executionErrors.failureCauses = FailureCauses.BAD_RECORDS
-                executionErrors.errorMsg = e.getMessage
-                LOGGER.info("Bad Record Found")
-              case e: Exception =>
-                LOGGER.info("DataLoad failure")
-                LOGGER.error(e)
-                throw e
-            }
-
-            var finished = false
-
-            override def hasNext: Boolean = !finished
-
-            override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = {
-              finished = true
-              rddResult
-                .getKey(uniqueLoadStatusId,
-                  (loadMetadataDetails, executionErrors))
+          updateModel,
+          carbonTable)
+        res.foreach { resultOfSeg =>
+          resultOfSeg.foreach { resultOfBlock =>
+            if (resultOfBlock._2._1.getLoadStatus.equalsIgnoreCase(
+              CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
+              loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+              if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
+                updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+                updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
+              } else {
+                updateModel.get.executorErrors = resultOfBlock._2._2
+              }
+            } else if (resultOfBlock._2._1.getLoadStatus.equalsIgnoreCase(
+              CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
+              loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
+              updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
+              updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
             }
           }
-          resultIter
-        }
-
-        val updateRdd = dataFrame.get.rdd
-
-        // return directly if no rows to update
-        val noRowsToUpdate = updateRdd.isEmpty()
-        if (noRowsToUpdate) {
-          res = Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]()
-          return
-        }
-
-        // splitting as (key, value) i.e., (segment, updatedRows)
-        val keyRDD = updateRdd.map(row =>
-            (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)))
-
-        val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
-          carbonTable.getMetaDataFilepath)
-        val segmentIds = loadMetadataDetails.map(_.getLoadName)
-        val segmentIdIndex = segmentIds.zipWithIndex.toMap
-        val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath,
-          carbonTable.getCarbonTableIdentifier)
-        val segmentId2maxTaskNo = segmentIds.map { segId =>
-          (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath))
-        }.toMap
-
-        class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)
-          extends org.apache.spark.Partitioner {
-          override def numPartitions: Int = segmentIdIndex.size * parallelism
-
-          override def getPartition(key: Any): Int = {
-            val segId = key.asInstanceOf[String]
-            // partitionId
-            segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism)
-          }
-        }
-
-        val partitionByRdd = keyRDD.partitionBy(new SegmentPartitioner(segmentIdIndex,
-          segmentUpdateParallelism))
-
-        // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
-        // so segmentIdIndex=partitionId/parallelism, this has been verified.
-        res = partitionByRdd.map(_._2).mapPartitions { partition =>
-          val partitionId = TaskContext.getPartitionId()
-          val segIdIndex = partitionId / segmentUpdateParallelism
-          val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
-          val segId = segmentIds(segIdIndex)
-          val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1
-
-          List(triggerDataLoadForSegment(segId, newTaskNo, partition).toList).toIterator
-        }.collect()
-      }
-
-      def loadDataForPartitionTable(): Unit = {
-        try {
-          val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel)
-          status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext,
-            new DataLoadResultImpl(),
-            carbonLoadModel,
-            rdd).collect()
-        } catch {
-          case ex: Exception =>
-            LOGGER.error(ex, "load data failed for partition table")
-            throw ex
         }
-      }
-      // create new segment folder  in carbon store
-      if (updateModel.isEmpty) {
-        CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
-          carbonLoadModel.getSegmentId, carbonTable)
-      }
-      var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-      var errorMessage: String = "DataLoad failure"
-      var executorMessage: String = ""
-      val isSortTable = carbonTable.getNumberOfSortColumns > 0
-      val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
-      try {
-        if (updateModel.isDefined) {
-          loadDataFrameForUpdate()
-        } else if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
-          loadDataForPartitionTable()
+      } else {
+        status = if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+          loadDataForPartitionTable(sqlContext, dataFrame, carbonLoadModel)
         } else if (isSortTable && sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
-          LOGGER.audit("Using global sort for loading.")
-          status = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
+          DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
             dataFrame, carbonLoadModel)
         } else if (dataFrame.isDefined) {
-          loadDataFrame()
+          loadDataFrame(sqlContext, dataFrame, carbonLoadModel)
         } else {
-          loadDataFile()
+          loadDataFile(sqlContext, carbonLoadModel)
         }
-        if (updateModel.isDefined) {
-
-          res.foreach(resultOfSeg => resultOfSeg.foreach(
-            resultOfBlock => {
-              if (resultOfBlock._2._1.getLoadStatus
-                .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
-                loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-                if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
-                  updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
-                  updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
-                }
-                else {
-                  updateModel.get.executorErrors = resultOfBlock._2._2
-                }
-              } else if (resultOfBlock._2._1.getLoadStatus
-                .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
-                loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
-                updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
-                updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
-              }
-            }
-          ))
-
-        } else {
-          CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
-            Seq(carbonLoadModel.getSegmentId), storePath, carbonTable)
-          val newStatusMap = scala.collection.mutable.Map.empty[String, String]
-          if (status.nonEmpty) {
-            status.foreach { eachLoadStatus =>
-              val state = newStatusMap.get(eachLoadStatus._1)
-              state match {
-                case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
-                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
-                case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
-                  if eachLoadStatus._2._1.getLoadStatus ==
-                     CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
-                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
-                case _ =>
-                  newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
-              }
+        CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+          Seq(carbonLoadModel.getSegmentId), storePath, carbonTable)
+        val newStatusMap = scala.collection.mutable.Map.empty[String, String]
+        if (status.nonEmpty) {
+          status.foreach { eachLoadStatus =>
+            val state = newStatusMap.get(eachLoadStatus._1)
+            state match {
+              case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
+                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
+              case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+                if eachLoadStatus._2._1.getLoadStatus ==
+                   CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
+                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
+              case _ =>
+                newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2._1.getLoadStatus)
             }
+          }
 
           newStatusMap.foreach {
             case (key, value) =>
               if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
                 loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
               } else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
-                  !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
+                         !loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
                 loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
               }
           }
@@ -955,163 +361,402 @@ object CarbonDataRDDFactory {
           loadStatus = partitionStatus
         }
       }
-      } catch {
-        case ex: Throwable =>
-          loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-          ex match {
-            case sparkException: SparkException =>
-              if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
-                  sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
-                executorMessage = sparkException.getCause.getMessage
-                errorMessage = errorMessage + ": " + executorMessage
-              }
-            case _ =>
-              if (ex.getCause != null) {
-                executorMessage = ex.getCause.getMessage
-                errorMessage = errorMessage + ": " + executorMessage
-              }
-          }
-          LOGGER.info(errorMessage)
-          LOGGER.error(ex)
-      }
-      // handle the status file updation for the update cmd.
-      if (updateModel.isDefined) {
-
-        if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
-          // updateModel.get.executorErrors.errorMsg = errorMessage
-          if (updateModel.get.executorErrors.failureCauses == FailureCauses.NONE) {
-            updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
-            if (null != executorMessage && !executorMessage.isEmpty) {
-              updateModel.get.executorErrors.errorMsg = executorMessage
-            } else {
-              updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed."
+    } catch {
+      case ex: Throwable =>
+        loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+        ex match {
+          case sparkException: SparkException =>
+            if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
+                sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
+              executorMessage = sparkException.getCause.getMessage
+              errorMessage = errorMessage + ": " + executorMessage
             }
-          }
-          return
-        } else if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
-                   updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS &&
-                   carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
-          return
-        } else {
-          // in success case handle updation of the table status file.
-          // success case.
-          val segmentDetails = new util.HashSet[String]()
-
-          var resultSize = 0
-
-          res.foreach(resultOfSeg => {
-            resultSize = resultSize + resultOfSeg.size
-            resultOfSeg.foreach(
-            resultOfBlock => {
-              segmentDetails.add(resultOfBlock._2._1.getLoadName)
+          case _ =>
+            if (ex.getCause != null) {
+              executorMessage = ex.getCause.getMessage
+              errorMessage = errorMessage + ": " + executorMessage
             }
-          )}
-          )
-
-          // this means that the update doesnt have any records to update so no need to do table
-          // status file updation.
-          if (resultSize == 0) {
-            LOGGER.audit("Data update is successful with 0 rows updation for " +
-                         s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
-            return
-          }
-
-          if (
-            CarbonUpdateUtil
-              .updateTableMetadataStatus(segmentDetails,
-                carbonTable,
-                updateModel.get.updatedTimeStamp + "",
-                true,
-                new util.ArrayList[String](0))) {
-            LOGGER.audit("Data update is successful for " +
-                         s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        }
+        LOGGER.info(errorMessage)
+        LOGGER.error(ex)
+    }
+    // handle the status file updation for the update cmd.
+    if (updateModel.isDefined) {
+      if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+        if (updateModel.get.executorErrors.failureCauses == FailureCauses.NONE) {
+          updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+          if (null != executorMessage && !executorMessage.isEmpty) {
+            updateModel.get.executorErrors.errorMsg = executorMessage
+          } else {
+            updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed."
           }
-          else {
-            val errorMessage = "Data update failed due to failure in table status updation."
-            LOGGER.audit("Data update is failed for " +
-                         s"${carbonLoadModel.getDatabaseName}.${carbonLoadModel.getTableName}")
-            LOGGER.error("Data update failed due to failure in table status updation.")
-            updateModel.get.executorErrors.errorMsg = errorMessage
-            updateModel.get.executorErrors.failureCauses = FailureCauses
-              .STATUS_FILE_UPDATION_FAILURE
-            return
+        }
+        return
+      } else if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+                 updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS &&
+                 carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
+        return
+      } else {
+        // in success case handle updation of the table status file.
+        // success case.
+        val segmentDetails = new util.HashSet[String]()
+        var resultSize = 0
+        res.foreach { resultOfSeg =>
+          resultSize = resultSize + resultOfSeg.size
+          resultOfSeg.foreach { resultOfBlock =>
+            segmentDetails.add(resultOfBlock._2._1.getLoadName)
           }
-
         }
 
-        return
+        // this means that the update doesnt have any records to update so no need to do table
+        // status file updation.
+        if (resultSize == 0) {
+          LOGGER.audit("Data update is successful with 0 rows updation for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          return
+        }
+        if (CarbonUpdateUtil.updateTableMetadataStatus(
+          segmentDetails,
+          carbonTable,
+          updateModel.get.updatedTimeStamp + "",
+          true,
+          new util.ArrayList[String](0))) {
+          LOGGER.audit("Data update is successful for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        } else {
+          val errorMessage = "Data update failed due to failure in table status updation."
+          LOGGER.audit("Data update is failed for " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          LOGGER.error("Data update failed due to failure in table status updation.")
+          updateModel.get.executorErrors.errorMsg = errorMessage
+          updateModel.get.executorErrors.failureCauses = FailureCauses
+            .STATUS_FILE_UPDATION_FAILURE
+          return
+        }
       }
-      if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+      return
+    }
+    if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
+      // update the load entry in table status file for changing the status to failure
+      CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+      LOGGER.info("********starting clean up**********")
+      CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+      LOGGER.info("********clean up done**********")
+      LOGGER.audit(s"Data load is failed for " +
+                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+      LOGGER.warn("Cannot write load metadata file as data load failed")
+      throw new Exception(errorMessage)
+    } else {
+      // check if data load fails due to bad record and throw data load failure due to
+      // bad record exception
+      if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
+          status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
+          carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
         // update the load entry in table status file for changing the status to failure
         CommonUtil.updateTableStatusForFailure(carbonLoadModel)
         LOGGER.info("********starting clean up**********")
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
         LOGGER.info("********clean up done**********")
         LOGGER.audit(s"Data load is failed for " +
-            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        throw new Exception(status(0)._2._2.errorMsg)
+      }
+      // if segment is empty then fail the data load
+      if (!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
+        // update the load entry in table status file for changing the status to failure
+        CommonUtil.updateTableStatusForFailure(carbonLoadModel)
+        LOGGER.info("********starting clean up**********")
+        CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
+        LOGGER.info("********clean up done**********")
+        LOGGER.audit(s"Data load is failed for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
+                     " as there is no data to load")
         LOGGER.warn("Cannot write load metadata file as data load failed")
-        throw new Exception(errorMessage)
+        throw new Exception("No Data to load")
+      }
+      writeDictionary(carbonLoadModel, result, writeAll = false)
+      updateTableStatus(status, carbonLoadModel, loadStatus, overwriteTable)
+
+      if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
+        LOGGER.audit("Data load is partially successful for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       } else {
-        // check if data load fails due to bad record and throw data load failure due to
-        // bad record exception
-        if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
-            status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
-            carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
-          // update the load entry in table status file for changing the status to failure
-          CommonUtil.updateTableStatusForFailure(carbonLoadModel)
-          LOGGER.info("********starting clean up**********")
-          CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
-          LOGGER.info("********clean up done**********")
-          LOGGER.audit(s"Data load is failed for " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-          throw new Exception(status(0)._2._2.errorMsg)
-        }
-        // if segment is empty then fail the data load
-        if (!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
-          // update the load entry in table status file for changing the status to failure
-          CommonUtil.updateTableStatusForFailure(carbonLoadModel)
-          LOGGER.info("********starting clean up**********")
-          CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
-          LOGGER.info("********clean up done**********")
-          LOGGER.audit(s"Data load is failed for " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
-                       " as there is no data to load")
-          LOGGER.warn("Cannot write load metadata file as data load failed")
-          throw new Exception("No Data to load")
-        }
-        writeDictionary(carbonLoadModel, result, false)
-        updateStatus(status, loadStatus)
+        LOGGER.audit("Data load is successful for " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+      }
+      try {
+        // compaction handling
+        handleSegmentMerging(sqlContext, carbonLoadModel, carbonTable)
+      } catch {
+        case e: Exception =>
+          throw new Exception(
+            "Dataload is success. Auto-Compaction has failed. Please check logs.")
+      }
+    }
+  }
 
-        if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
-          LOGGER.audit("Data load is partially successful for " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-        } else {
-          LOGGER.audit("Data load is successful for " +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+  /**
+   * If data load is triggered by UPDATE query, this func will execute the update
+   * TODO: move it to a separate update command
+   */
+  private def loadDataFrameForUpdate(
+      sqlContext: SQLContext,
+      dataFrame: Option[DataFrame],
+      carbonLoadModel: CarbonLoadModel,
+      updateModel: Option[UpdateTableModel],
+      carbonTable: CarbonTable
+  ): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
+    val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate
+
+    val updateRdd = dataFrame.get.rdd
+
+    // return directly if no rows to update
+    val noRowsToUpdate = updateRdd.isEmpty()
+    if (noRowsToUpdate) {
+      Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]()
+    } else {
+      // splitting as (key, value) i.e., (segment, updatedRows)
+      val keyRDD = updateRdd.map(row =>
+        (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)))
+
+      val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
+        carbonTable.getMetaDataFilepath)
+      val segmentIds = loadMetadataDetails.map(_.getLoadName)
+      val segmentIdIndex = segmentIds.zipWithIndex.toMap
+      val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonLoadModel.getStorePath,
+        carbonTable.getCarbonTableIdentifier)
+      val segmentId2maxTaskNo = segmentIds.map { segId =>
+        (segId, CarbonUpdateUtil.getLatestTaskIdForSegment(segId, carbonTablePath))
+      }.toMap
+
+      class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)
+        extends org.apache.spark.Partitioner {
+        override def numPartitions: Int = segmentIdIndex.size * parallelism
+
+        override def getPartition(key: Any): Int = {
+          val segId = key.asInstanceOf[String]
+          // partitionId
+          segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism)
         }
+      }
+
+      val partitionByRdd = keyRDD.partitionBy(
+        new SegmentPartitioner(segmentIdIndex, segmentUpdateParallelism))
+
+      // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
+      // so segmentIdIndex=partitionId/parallelism, this has been verified.
+      partitionByRdd.map(_._2).mapPartitions { partition =>
+        val partitionId = TaskContext.getPartitionId()
+        val segIdIndex = partitionId / segmentUpdateParallelism
+        val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
+        val segId = segmentIds(segIdIndex)
+        val newTaskNo = segmentId2maxTaskNo(segId) + randomPart + 1
+        List(triggerDataLoadForSegment(carbonLoadModel, updateModel, segId, newTaskNo, partition)
+          .toList).toIterator
+      }.collect()
+    }
+  }
+
+  /**
+   * TODO: move it to a separate update command
+   */
+  private def triggerDataLoadForSegment(
+      carbonLoadModel: CarbonLoadModel,
+      updateModel: Option[UpdateTableModel],
+      key: String,
+      taskNo: Int,
+      iter: Iterator[Row]
+  ): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+    val rddResult = new updateResultImpl()
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
+      var partitionID = "0"
+      val loadMetadataDetails = new LoadMetadataDetails
+      val executionErrors = ExecutionErrors(FailureCauses.NONE, "")
+      var uniqueLoadStatusId = ""
+      try {
+        val segId = key
+        val index = taskNo
+        uniqueLoadStatusId = carbonLoadModel.getTableName +
+                             CarbonCommonConstants.UNDERSCORE +
+                             (index + "_0")
+
+        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setLoadName(segId)
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+        carbonLoadModel.setPartitionId(partitionID)
+        carbonLoadModel.setSegmentId(segId)
+        carbonLoadModel.setTaskNo(String.valueOf(index))
+        carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
+
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+        UpdateDataLoad.DataLoadForUpdate(segId,
+          index,
+          iter,
+          carbonLoadModel,
+          loadMetadataDetails)
+      } catch {
+        case e: NoRetryException =>
+          loadMetadataDetails
+            .setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+          executionErrors.failureCauses = FailureCauses.BAD_RECORDS
+          executionErrors.errorMsg = e.getMessage
+          LOGGER.info("Bad Record Found")
+        case e: Exception =>
+          LOGGER.info("DataLoad failure")
+          LOGGER.error(e)
+          throw e
+      }
+
+      var finished = false
+
+      override def hasNext: Boolean = !finished
+
+      override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = {
+        finished = true
+        rddResult
+          .getKey(uniqueLoadStatusId,
+            (loadMetadataDetails, executionErrors))
+      }
+    }
+    resultIter
+  }
+
+  /**
+   * Trigger to write dictionary files
+   */
+  private def writeDictionary(carbonLoadModel: CarbonLoadModel,
+      result: Option[DictionaryServer], writeAll: Boolean): Unit = {
+    // write dictionary file
+    val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${
+      carbonLoadModel.getTableName
+    }"
+    result match {
+      case Some(server) =>
         try {
-          // compaction handling
-          handleSegmentMerging()
+          server.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+            .getCarbonTableIdentifier.getTableId)
         } catch {
-          case e: Exception =>
-            throw new Exception(
-              "Dataload is success. Auto-Compaction has failed. Please check logs.")
+          case _: Exception =>
+            LOGGER.error(s"Error while writing dictionary file for $uniqueTableName")
+            throw new Exception("Dataload failed due to error while writing dictionary file!")
+        }
+      case _ =>
+    }
+  }
+
+  /**
+   * Trigger compaction after data load
+   */
+  private def handleSegmentMerging(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      carbonTable: CarbonTable
+  ): Unit = {
+    LOGGER.info(s"compaction need status is" +
+                s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired() }")
+    if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired()) {
+      LOGGER.audit(s"Compaction request received for table " +
+                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+      val compactionSize = 0
+      val isCompactionTriggerByDDl = false
+      val compactionModel = CompactionModel(compactionSize,
+        CompactionType.MINOR_COMPACTION,
+        carbonTable,
+        isCompactionTriggerByDDl
+      )
+      var storeLocation = ""
+      val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+      if (null != configuredStore && configuredStore.nonEmpty) {
+        storeLocation = configuredStore(Random.nextInt(configuredStore.length))
+      }
+      if (storeLocation == null) {
+        storeLocation = System.getProperty("java.io.tmpdir")
+      }
+      storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
+
+      val isConcurrentCompactionAllowed = CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+        CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+      ).equalsIgnoreCase("true")
+
+      if (!isConcurrentCompactionAllowed) {
+        handleCompactionForSystemLocking(sqlContext,
+          carbonLoadModel,
+          storeLocation,
+          CompactionType.MINOR_COMPACTION,
+          carbonTable,
+          compactionModel
+        )
+      } else {
+        val lock = CarbonLockFactory.getCarbonLockObj(
+          carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+          LockUsage.COMPACTION_LOCK)
+
+        if (lock.lockWithRetries()) {
+          LOGGER.info("Acquired the compaction lock.")
+          try {
+            startCompactionThreads(sqlContext,
+              carbonLoadModel,
+              storeLocation,
+              compactionModel,
+              lock
+            )
+          } catch {
+            case e: Exception =>
+              LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
+              lock.unlock()
+              throw e
+          }
+        } else {
+          LOGGER.audit("Not able to acquire the compaction lock for table " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
+          LOGGER.error("Not able to acquire the compaction lock for table " +
+                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName}")
         }
       }
     }
+  }
 
+  /**
+   * Update table status file after data loading
+   */
+  private def updateTableStatus(
+      status: Array[(String, (LoadMetadataDetails, ExecutionErrors))],
+      carbonLoadModel: CarbonLoadModel,
+      loadStatus: String,
+      overwriteTable: Boolean
+  ): Unit = {
+    val metadataDetails = if (status != null && status(0) != null) {
+      status(0)._2._1
+    } else {
+      new LoadMetadataDetails
+    }
+    CarbonLoaderUtil.populateNewLoadMetaEntry(
+      metadataDetails,
+      loadStatus,
+      carbonLoadModel.getFactTimeStamp,
+      true)
+    val success = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, carbonLoadModel, false,
+      overwriteTable)
+    if (!success) {
+      val errorMessage = "Dataload failed due to failure in table status updation."
+      LOGGER.audit("Data load is failed for " +
+                   s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+      LOGGER.error("Dataload failed due to failure in table status updation.")
+      throw new Exception(errorMessage)
+    } else if (!carbonLoadModel.isRetentionRequest) {
+      // TODO : Handle it
+      LOGGER.info("********Database updated**********")
+    }
   }
 
 
   /**
    * repartition the input data for partition table.
-   *
-   * @param sqlContext
-   * @param dataFrame
-   * @param carbonLoadModel
-   * @return
    */
-  private def repartitionInputData(sqlContext: SQLContext,
+  private def repartitionInputData(
+      sqlContext: SQLContext,
       dataFrame: Option[DataFrame],
       carbonLoadModel: CarbonLoadModel): RDD[Row] = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -1132,7 +777,7 @@ object CarbonDataRDDFactory {
       throw new DataLoadingException("Partition column not found.")
     }
 
-    val dateFormatMap = CarbonDataProcessorUtil.getDateFormatMap(carbonLoadModel.getDateFormat())
+    val dateFormatMap = CarbonDataProcessorUtil.getDateFormatMap(carbonLoadModel.getDateFormat)
     val specificFormat = Option(dateFormatMap.get(partitionColumn.toLowerCase))
     val timeStampFormat = if (specificFormat.isDefined) {
       new SimpleDateFormat(specificFormat.get)
@@ -1159,7 +804,7 @@ object CarbonDataRDDFactory {
         carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
       dataFrame.get.rdd.map { row =>
         if (null != row && row.length > partitionColumnIndex &&
-          null != row.get(partitionColumnIndex)) {
+            null != row.get(partitionColumnIndex)) {
           (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
             delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
         } else {
@@ -1214,23 +859,142 @@ object CarbonDataRDDFactory {
     }
   }
 
-  private def writeDictionary(carbonLoadModel: CarbonLoadModel,
-      result: Option[DictionaryServer], writeAll: Boolean) = {
-    // write dictionary file
-    val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${
-      carbonLoadModel.getTableName
-    }"
-    result match {
-      case Some(server) =>
-        try {
-          server.writeTableDictionary(carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-            .getCarbonTableIdentifier.getTableId)
-        } catch {
-          case _: Exception =>
-            LOGGER.error(s"Error while writing dictionary file for $uniqueTableName")
-            throw new Exception("Dataload failed due to error while writing dictionary file!")
+  /**
+   * Execute load process for partition table
+   */
+  private def loadDataForPartitionTable(
+      sqlContext: SQLContext,
+      dataFrame: Option[DataFrame],
+      carbonLoadModel: CarbonLoadModel
+  ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+    try {
+      val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel)
+      new PartitionTableDataLoaderRDD(
+        sqlContext.sparkContext,
+        new DataLoadResultImpl(),
+        carbonLoadModel,
+        rdd
+      ).collect()
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, "load data failed for partition table")
+        throw ex
+    }
+  }
+
+  /**
+   * Execute load process to load from input dataframe
+   */
+  private def loadDataFrame(
+      sqlContext: SQLContext,
+      dataFrame: Option[DataFrame],
+      carbonLoadModel: CarbonLoadModel
+  ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+    try {
+      val rdd = dataFrame.get.rdd
+
+      val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]] { p =>
+        DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
+      }.distinct.length
+      val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(
+        nodeNumOfData,
+        sqlContext.sparkContext)
+      val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)
+
+      new NewDataFrameLoaderRDD(
+        sqlContext.sparkContext,
+        new DataLoadResultImpl(),
+        carbonLoadModel,
+        newRdd
+      ).collect()
+    } catch {
+      case ex: Exception =>
+        LOGGER.error(ex, "load data frame failed")
+        throw ex
+    }
+  }
+
+  /**
+   * Execute load process to load from input file path specified in `carbonLoadModel`
+   */
+  private def loadDataFile(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel
+  ): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+    /*
+     * when data load handle by node partition
+     * 1)clone the hadoop configuration,and set the file path to the configuration
+     * 2)use org.apache.hadoop.mapreduce.lib.input.TextInputFormat to get splits,size info
+     * 3)use CarbonLoaderUtil.nodeBlockMapping to get mapping info of node and block,
+     *   for locally writing carbondata files(one file one block) in nodes
+     * 4)use NewCarbonDataLoadRDD to load data and write to carbondata files
+     */
+    val hadoopConfiguration = new Configuration(sqlContext.sparkContext.hadoopConfiguration)
+    // FileUtils will skip file which is no csv, and return all file path which split by ','
+    val filePaths = carbonLoadModel.getFactFilePath
+    hadoopConfiguration.set(FileInputFormat.INPUT_DIR, filePaths)
+    hadoopConfiguration.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true")
+    hadoopConfiguration.set("io.compression.codecs",
+      """org.apache.hadoop.io.compress.GzipCodec,
+             org.apache.hadoop.io.compress.DefaultCodec,
+             org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)
+
+    CommonUtil.configSplitMaxSize(sqlContext.sparkContext, filePaths, hadoopConfiguration)
+
+    val inputFormat = new org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+    val jobContext = new Job(hadoopConfiguration)
+    val rawSplits = inputFormat.getSplits(jobContext).toArray
+    val blockList = rawSplits.map { inputSplit =>
+      val fileSplit = inputSplit.asInstanceOf[FileSplit]
+      new TableBlockInfo(fileSplit.getPath.toString,
+        fileSplit.getStart, "1",
+        fileSplit.getLocations, fileSplit.getLength, ColumnarFormatVersion.V1, null
+      ).asInstanceOf[Distributable]
+    }
+    // group blocks to nodes, tasks
+    val startTime = System.currentTimeMillis
+    val activeNodes = DistributionUtil
+      .ensureExecutorsAndGetNodeList(blockList, sqlContext.sparkContext)
+    val nodeBlockMapping =
+      CarbonLoaderUtil
+        .nodeBlockMapping(blockList.toSeq.asJava, -1, activeNodes.toList.asJava).asScala
+        .toSeq
+    val timeElapsed: Long = System.currentTimeMillis - startTime
+    LOGGER.info("Total Time taken in block allocation: " + timeElapsed)
+    LOGGER.info(s"Total no of blocks: ${ blockList.length }, " +
+                s"No.of Nodes: ${nodeBlockMapping.size}")
+    var str = ""
+    nodeBlockMapping.foreach { entry =>
+      val tableBlock = entry._2
+      str = str + "#Node: " + entry._1 + " no.of.blocks: " + tableBlock.size()
+      tableBlock.asScala.foreach(tableBlockInfo =>
+        if (!tableBlockInfo.getLocations.exists(hostentry =>
+          hostentry.equalsIgnoreCase(entry._1)
+        )) {
+          str = str + " , mismatch locations: " + tableBlockInfo.getLocations
+            .foldLeft("")((a, b) => a + "," + b)
         }
-      case _ =>
+      )
+      str = str + "\n"
     }
+    LOGGER.info(str)
+    val blocksGroupBy: Array[(String, Array[BlockDetails])] = nodeBlockMapping.map { entry =>
+      val blockDetailsList =
+        entry._2.asScala.map(distributable => {
+          val tableBlock = distributable.asInstanceOf[TableBlockInfo]
+          new BlockDetails(new Path(tableBlock.getFilePath),
+            tableBlock.getBlockOffset, tableBlock.getBlockLength, tableBlock.getLocations
+          )
+        }).toArray
+      (entry._1, blockDetailsList)
+    }.toArray
+
+    new NewCarbonDataLoadRDD(
+      sqlContext.sparkContext,
+      new DataLoadResultImpl(),
+      carbonLoadModel,
+      blocksGroupBy
+    ).collect()
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 6815629..bf86aca 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql
 
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0578ba0f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 52df2a4..f87e734 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -17,15 +17,21 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{AlterTableModel, DataProcessCommand, RunnableCommand}
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, DataProcessCommand, RunnableCommand}
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Command for the compaction in alter table command
@@ -34,12 +40,14 @@ case class AlterTableCompactionCommand(
     alterTableModel: AlterTableModel)
   extends RunnableCommand with DataProcessCommand {
 
+  private val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
   override def run(sparkSession: SparkSession): Seq[Row] = {
     processData(sparkSession)
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
+
     val tableName = alterTableModel.tableName.toLowerCase
     val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
     val relation =
@@ -71,8 +79,7 @@ case class AlterTableCompactionCommand(
       )
     storeLocation = storeLocation + "/carbonstore/" + System.nanoTime()
     try {
-      CarbonDataRDDFactory
-        .alterTableForCompaction(sparkSession.sqlContext,
+      alterTableForCompaction(sparkSession.sqlContext,
           alterTableModel,
           carbonLoadModel,
           storeLocation
@@ -87,4 +94,111 @@ case class AlterTableCompactionCommand(
     }
     Seq.empty
   }
+
+  private def alterTableForCompaction(sqlContext: SQLContext,
+      alterTableModel: AlterTableModel,
+      carbonLoadModel: CarbonLoadModel,
+      storeLocation: String): Unit = {
+    var compactionSize: Long = 0
+    var compactionType: CompactionType = CompactionType.MINOR_COMPACTION
+    if (alterTableModel.compactionType.equalsIgnoreCase("major")) {
+      compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR_COMPACTION)
+      compactionType = CompactionType.MAJOR_COMPACTION
+    } else if (alterTableModel.compactionType.equalsIgnoreCase(
+      CompactionType.IUD_UPDDEL_DELTA_COMPACTION.toString)) {
+      compactionType = CompactionType.IUD_UPDDEL_DELTA_COMPACTION
+      if (alterTableModel.segmentUpdateStatusManager.isDefined) {
+        carbonLoadModel.setSegmentUpdateStatusManager(
+          alterTableModel.segmentUpdateStatusManager.get)
+        carbonLoadModel.setLoadMetadataDetails(
+          alterTableModel.segmentUpdateStatusManager.get.getLoadMetadataDetails.toList.asJava)
+      }
+    } else if (alterTableModel.compactionType.equalsIgnoreCase(
+      CompactionType.SEGMENT_INDEX_COMPACTION.toString)) {
+      compactionType = CompactionType.SEGMENT_INDEX_COMPACTION
+    } else {
+      compactionType = CompactionType.MINOR_COMPACTION
+    }
+
+    LOGGER.audit(s"Compaction request received for table " +
+                 s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+
+    if (null == carbonLoadModel.getLoadMetadataDetails) {
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+    }
+    if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+      // Just launch job to merge index and return
+      CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+        carbonLoadModel.getLoadMetadataDetails.asScala.map(_.getLoadName),
+        carbonLoadModel.getStorePath,
+        carbonTable)
+      return
+    }
+    // reading the start time of data load.
+    val loadStartTime : Long =
+      if (alterTableModel.factTimeStamp.isEmpty) {
+        CarbonUpdateUtil.readCurrentTime
+      } else {
+        alterTableModel.factTimeStamp.get
+      }
+    carbonLoadModel.setFactTimeStamp(loadStartTime)
+
+    val isCompactionTriggerByDDl = true
+    val compactionModel = CompactionModel(compactionSize,
+      compactionType,
+      carbonTable,
+      isCompactionTriggerByDDl
+    )
+
+    val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.ENABLE_CONCURRENT_COMPACTION,
+        CarbonCommonConstants.DEFAULT_ENABLE_CONCURRENT_COMPACTION
+      )
+      .equalsIgnoreCase("true")
+
+    // if system level compaction is enabled then only one compaction can run in the system
+    // if any other request comes at this time then it will create a compaction request file.
+    // so that this will be taken up by the compaction process which is executing.
+    if (!isConcurrentCompactionAllowed) {
+      LOGGER.info("System level compaction lock is enabled.")
+      CarbonDataRDDFactory.handleCompactionForSystemLocking(sqlContext,
+        carbonLoadModel,
+        storeLocation,
+        compactionType,
+        carbonTable,
+        compactionModel
+      )
+    } else {
+      // normal flow of compaction
+      val lock = CarbonLockFactory
+        .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+          LockUsage.COMPACTION_LOCK
+        )
+
+      if (lock.lockWithRetries()) {
+        LOGGER.info("Acquired the compaction lock for table" +
+                    s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        try {
+          CarbonDataRDDFactory.startCompactionThreads(sqlContext,
+            carbonLoadModel,
+            storeLocation,
+            compactionModel,
+            lock
+          )
+        } catch {
+          case e: Exception =>
+            LOGGER.error(s"Exception in start compaction thread. ${ e.getMessage }")
+            lock.unlock()
+            throw e
+        }
+      } else {
+        LOGGER.audit("Not able to acquire the compaction lock for table " +
+                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        LOGGER.error(s"Not able to acquire the compaction lock for table" +
+                     s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+        sys.error("Table is already locked for compaction. Please try after some time.")
+      }
+    }
+  }
 }