You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 03:28:59 UTC

[39/49] carbondata git commit: [CARBONDATA-2159] Remove carbon-spark dependency in store-sdk module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 8d394db..e69de29 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -1,610 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.{Date, List, Locale}
-
-import scala.collection.{immutable, mutable}
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
-import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.sql.util.SparkSQLUtil.sessionState
-
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark.LOGGER
-import org.apache.carbondata.spark.load.ValidateUtil
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
-
-/**
- * the util object of data loading
- */
-object DataLoadingUtil {
-
-  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * get data loading options and initialise default value
-   */
-  def getDataLoadingOptions(
-      carbonProperty: CarbonProperties,
-      options: immutable.Map[String, String]): mutable.Map[String, String] = {
-    val optionsFinal = scala.collection.mutable.Map[String, String]()
-    optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
-    optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
-    optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
-    optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
-    optionsFinal.put("columndict", options.getOrElse("columndict", null))
-
-    optionsFinal.put("escapechar",
-      CarbonLoaderUtil.getEscapeChar(options.getOrElse("escapechar", "\\")))
-
-    optionsFinal.put(
-      "serialization_null_format",
-      options.getOrElse("serialization_null_format", "\\N"))
-
-    optionsFinal.put(
-      "bad_records_logger_enable",
-      options.getOrElse(
-        "bad_records_logger_enable",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
-
-    val badRecordActionValue = carbonProperty.getProperty(
-      CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-      CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-
-    optionsFinal.put(
-      "bad_records_action",
-      options.getOrElse(
-        "bad_records_action",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-          badRecordActionValue)))
-
-    optionsFinal.put(
-      "is_empty_data_bad_record",
-      options.getOrElse(
-        "is_empty_data_bad_record",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
-
-    optionsFinal.put(
-      "skip_empty_line",
-      options.getOrElse(
-        "skip_empty_line",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)))
-
-    optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
-
-    optionsFinal.put(
-      "complex_delimiter_level_1",
-      options.getOrElse("complex_delimiter_level_1", "$"))
-
-    optionsFinal.put(
-      "complex_delimiter_level_2",
-      options.getOrElse("complex_delimiter_level_2", ":"))
-
-    optionsFinal.put(
-      "dateformat",
-      options.getOrElse(
-        "dateformat",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
-
-    optionsFinal.put(
-      "timestampformat",
-      options.getOrElse(
-        "timestampformat",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)))
-
-    optionsFinal.put(
-      "global_sort_partitions",
-      options.getOrElse(
-        "global_sort_partitions",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
-          null)))
-
-    optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
-
-    optionsFinal.put(
-      "batch_sort_size_inmb",
-      options.getOrElse(
-        "batch_sort_size_inmb",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
-          carbonProperty.getProperty(
-            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
-
-    optionsFinal.put(
-      "bad_record_path",
-      options.getOrElse(
-        "bad_record_path",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-          carbonProperty.getProperty(
-            CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-            CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
-
-    val useOnePass = options.getOrElse(
-      "single_pass",
-      carbonProperty.getProperty(
-        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
-        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
-      case "true" =>
-        true
-      case "false" =>
-        // when single_pass = false  and if either alldictionarypath
-        // or columnDict is configured the do not allow load
-        if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
-            StringUtils.isNotEmpty(optionsFinal("columndict"))) {
-          throw new MalformedCarbonCommandException(
-            "Can not use all_dictionary_path or columndict without single_pass.")
-        } else {
-          false
-        }
-      case illegal =>
-        val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-        LOGGER.error(s"Can't use single_pass, because illegal syntax found: [$illegal] " +
-                     "Please set it as 'true' or 'false'")
-        false
-    }
-    optionsFinal.put("single_pass", useOnePass.toString)
-    optionsFinal
-  }
-
-  /**
-   * check whether using default value or not
-   */
-  private def checkDefaultValue(value: String, default: String) = {
-    if (StringUtils.isEmpty(value)) {
-      default
-    } else {
-      value
-    }
-  }
-
-  /**
-   * build CarbonLoadModel for data loading
-   * @param table CarbonTable object containing all metadata information for the table
-   *              like table name, table path, schema, etc
-   * @param options Load options from user input
-   * @return a new CarbonLoadModel instance
-   */
-  def buildCarbonLoadModelJava(
-      table: CarbonTable,
-      options: java.util.Map[String, String]
-  ): CarbonLoadModel = {
-    val carbonProperty: CarbonProperties = CarbonProperties.getInstance
-    val optionsFinal = getDataLoadingOptions(carbonProperty, options.asScala.toMap)
-    optionsFinal.put("sort_scope", "no_sort")
-    if (!options.containsKey("fileheader")) {
-      val csvHeader = table.getCreateOrderColumn(table.getTableName)
-        .asScala.map(_.getColName).mkString(",")
-      optionsFinal.put("fileheader", csvHeader)
-    }
-    val model = new CarbonLoadModel()
-    buildCarbonLoadModel(
-      table = table,
-      carbonProperty = carbonProperty,
-      options = options.asScala.toMap,
-      optionsFinal = optionsFinal,
-      carbonLoadModel = model,
-      hadoopConf = null)  // we have provided 'fileheader', so it can be null
-
-    // set default values
-    model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-    model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
-    model.setUseOnePass(options.asScala.getOrElse("onepass", "false").toBoolean)
-    model.setDictionaryServerHost(options.asScala.getOrElse("dicthost", null))
-    model.setDictionaryServerPort(options.asScala.getOrElse("dictport", "-1").toInt)
-    model
-  }
-
-  /**
-   * build CarbonLoadModel for data loading
-   * @param table CarbonTable object containing all metadata information for the table
-   *              like table name, table path, schema, etc
-   * @param carbonProperty Carbon property instance
-   * @param options Load options from user input
-   * @param optionsFinal Load options that populated with default values for optional options
-   * @param carbonLoadModel The output load model
-   * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
-   *                   user provided load options
-   */
-  def buildCarbonLoadModel(
-      table: CarbonTable,
-      carbonProperty: CarbonProperties,
-      options: immutable.Map[String, String],
-      optionsFinal: mutable.Map[String, String],
-      carbonLoadModel: CarbonLoadModel,
-      hadoopConf: Configuration,
-      partition: Map[String, Option[String]] = Map.empty,
-      isDataFrame: Boolean = false): Unit = {
-    carbonLoadModel.setTableName(table.getTableName)
-    carbonLoadModel.setDatabaseName(table.getDatabaseName)
-    carbonLoadModel.setTablePath(table.getTablePath)
-    carbonLoadModel.setTableName(table.getTableName)
-    val dataLoadSchema = new CarbonDataLoadSchema(table)
-    // Need to fill dimension relation
-    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    val sort_scope = optionsFinal("sort_scope")
-    val single_pass = optionsFinal("single_pass")
-    val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
-    val bad_records_action = optionsFinal("bad_records_action")
-    var bad_record_path = optionsFinal("bad_record_path")
-    val global_sort_partitions = optionsFinal("global_sort_partitions")
-    val timestampformat = optionsFinal("timestampformat")
-    val dateFormat = optionsFinal("dateformat")
-    val delimeter = optionsFinal("delimiter")
-    val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
-    val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
-    val all_dictionary_path = optionsFinal("all_dictionary_path")
-    val column_dict = optionsFinal("columndict")
-    ValidateUtil.validateDateTimeFormat(timestampformat, "TimestampFormat")
-    ValidateUtil.validateDateTimeFormat(dateFormat, "DateFormat")
-    ValidateUtil.validateSortScope(table, sort_scope)
-
-    if (bad_records_logger_enable.toBoolean ||
-        LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
-      if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
-        CarbonException.analysisException("Invalid bad records location.")
-      }
-      bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path)
-    }
-    carbonLoadModel.setBadRecordsLocation(bad_record_path)
-
-    ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
-    carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
-    carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
-    carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
-
-    // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
-    // we should use table schema to generate file header.
-    var fileHeader = optionsFinal("fileheader")
-    val headerOption = options.get("header")
-    if (headerOption.isDefined) {
-      // whether the csv file has file header
-      // the default value is true
-      val header = try {
-        headerOption.get.toBoolean
-      } catch {
-        case ex: IllegalArgumentException =>
-          throw new MalformedCarbonCommandException(
-            "'header' option should be either 'true' or 'false'. " + ex.getMessage)
-      }
-      if (header) {
-        if (fileHeader.nonEmpty) {
-          throw new MalformedCarbonCommandException(
-            "When 'header' option is true, 'fileheader' option is not required.")
-        }
-      } else {
-        if (fileHeader.isEmpty) {
-          fileHeader = table.getCreateOrderColumn(table.getTableName)
-            .asScala.map(_.getColName).mkString(",")
-        }
-      }
-    }
-
-    carbonLoadModel.setTimestampformat(timestampformat)
-    carbonLoadModel.setDateFormat(dateFormat)
-    carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
-      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-
-    carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
-      CarbonCommonConstants.CARBON_DATE_FORMAT,
-      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
-
-    carbonLoadModel.setSerializationNullFormat(
-        TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
-        optionsFinal("serialization_null_format"))
-
-    carbonLoadModel.setBadRecordsLoggerEnable(
-        TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
-
-    carbonLoadModel.setBadRecordsAction(
-        TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action.toUpperCase)
-
-    carbonLoadModel.setIsEmptyDataBadRecord(
-        DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
-        optionsFinal("is_empty_data_bad_record"))
-
-    carbonLoadModel.setSkipEmptyLine(optionsFinal("skip_empty_line"))
-
-    carbonLoadModel.setSortScope(sort_scope)
-    carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
-    carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
-    carbonLoadModel.setUseOnePass(single_pass.toBoolean)
-
-    if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
-        complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
-        delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
-      CarbonException.analysisException(s"Field Delimiter and Complex types delimiter are same")
-    } else {
-      carbonLoadModel.setComplexDelimiterLevel1(complex_delimeter_level1)
-      carbonLoadModel.setComplexDelimiterLevel2(complex_delimeter_level2)
-    }
-    // set local dictionary path, and dictionary file extension
-    carbonLoadModel.setAllDictPath(all_dictionary_path)
-    carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
-    carbonLoadModel.setCsvHeader(fileHeader)
-    carbonLoadModel.setColDictFilePath(column_dict)
-
-    val ignoreColumns = new util.ArrayList[String]()
-    if (!isDataFrame) {
-      ignoreColumns.addAll(partition.filter(_._2.isDefined).keys.toList.asJava)
-    }
-    carbonLoadModel.setCsvHeaderColumns(
-      CommonUtil.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns))
-
-    val validatedMaxColumns = CommonUtil.validateMaxColumns(
-      carbonLoadModel.getCsvHeaderColumns,
-      optionsFinal("maxcolumns"))
-
-    carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
-    if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
-    }
-  }
-
-  private def isLoadDeletionRequired(metaDataLocation: String): Boolean = {
-    val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
-    if (details != null && details.nonEmpty) for (oneRow <- details) {
-      if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
-           SegmentStatus.COMPACTED == oneRow.getSegmentStatus ||
-           SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus ||
-           SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus) &&
-          oneRow.getVisibility.equalsIgnoreCase("true")) {
-        return true
-      }
-    }
-    false
-  }
-
-  def deleteLoadsAndUpdateMetadata(
-      isForceDeletion: Boolean,
-      carbonTable: CarbonTable,
-      specs: util.List[PartitionSpec]): Unit = {
-    if (isLoadDeletionRequired(carbonTable.getMetadataPath)) {
-      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-
-      val (details, updationRequired) =
-        isUpdationRequired(
-          isForceDeletion,
-          carbonTable,
-          absoluteTableIdentifier)
-
-
-      if (updationRequired) {
-        val carbonTableStatusLock =
-          CarbonLockFactory.getCarbonLockObj(
-            absoluteTableIdentifier,
-            LockUsage.TABLE_STATUS_LOCK
-          )
-        var locked = false
-        var updationCompletionStaus = false
-        try {
-          // Update load metadate file after cleaning deleted nodes
-          locked = carbonTableStatusLock.lockWithRetries()
-          if (locked) {
-            LOGGER.info("Table status lock has been successfully acquired.")
-            // Again read status and check to verify updation required or not.
-            val (details, updationRequired) =
-              isUpdationRequired(
-                isForceDeletion,
-                carbonTable,
-                absoluteTableIdentifier)
-            if (!updationRequired) {
-              return
-            }
-            // read latest table status again.
-            val latestMetadata = SegmentStatusManager
-              .readLoadMetadata(carbonTable.getMetadataPath)
-
-            // update the metadata details from old to new status.
-            val latestStatus = CarbonLoaderUtil
-              .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
-            CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus)
-          } else {
-            val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
-            val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
-            val errorMsg = "Clean files request is failed for " +
-                           s"$dbName.$tableName" +
-                           ". Not able to acquire the table status lock due to other operation " +
-                           "running in the background."
-            LOGGER.audit(errorMsg)
-            LOGGER.error(errorMsg)
-            throw new Exception(errorMsg + " Please try after some time.")
-          }
-          updationCompletionStaus = true
-        } finally {
-          if (locked) {
-            CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
-          }
-        }
-        if (updationCompletionStaus) {
-          DeleteLoadFolders
-            .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier,
-              carbonTable.getMetadataPath, isForceDeletion, specs)
-        }
-      }
-    }
-  }
-
-  private def isUpdationRequired(isForceDeletion: Boolean,
-      carbonTable: CarbonTable,
-      absoluteTableIdentifier: AbsoluteTableIdentifier): (Array[LoadMetadataDetails], Boolean) = {
-    val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-    // Delete marked loads
-    val isUpdationRequired =
-      DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
-        absoluteTableIdentifier,
-        isForceDeletion,
-        details,
-        carbonTable.getMetadataPath
-      )
-    (details, isUpdationRequired)
-  }
-
-  /**
-   * creates a RDD that does reading of multiple CSV files
-   */
-  def csvFileScanRDD(
-      spark: SparkSession,
-      model: CarbonLoadModel,
-      hadoopConf: Configuration
-  ): RDD[InternalRow] = {
-    // 1. partition
-    val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
-    val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
-    val defaultParallelism = spark.sparkContext.defaultParallelism
-    CommonUtil.configureCSVInputFormat(hadoopConf, model)
-    hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
-    val jobConf = new JobConf(hadoopConf)
-    SparkHadoopUtil.get.addCredentials(jobConf)
-    val jobContext = new JobContextImpl(jobConf, null)
-    val inputFormat = new CSVInputFormat()
-    val rawSplits = inputFormat.getSplits(jobContext).toArray
-    val splitFiles = rawSplits.map { split =>
-      val fileSplit = split.asInstanceOf[FileSplit]
-      PartitionedFile(
-        InternalRow.empty,
-        fileSplit.getPath.toString,
-        fileSplit.getStart,
-        fileSplit.getLength,
-        fileSplit.getLocations)
-    }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
-    val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
-    val bytesPerCore = totalBytes / defaultParallelism
-
-    val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
-    LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
-                s"open cost is considered as scanning $openCostInBytes bytes.")
-
-    val partitions = new ArrayBuffer[FilePartition]
-    val currentFiles = new ArrayBuffer[PartitionedFile]
-    var currentSize = 0L
-
-    def closePartition(): Unit = {
-      if (currentFiles.nonEmpty) {
-        val newPartition =
-          FilePartition(
-            partitions.size,
-            currentFiles.toArray.toSeq)
-        partitions += newPartition
-      }
-      currentFiles.clear()
-      currentSize = 0
-    }
-
-    splitFiles.foreach { file =>
-      if (currentSize + file.length > maxSplitBytes) {
-        closePartition()
-      }
-      // Add the given file to the current partition.
-      currentSize += file.length + openCostInBytes
-      currentFiles += file
-    }
-    closePartition()
-
-    // 2. read function
-    val serializableConfiguration = new SerializableConfiguration(jobConf)
-    val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
-      override def apply(file: PartitionedFile): Iterator[InternalRow] = {
-        new Iterator[InternalRow] {
-          val hadoopConf = serializableConfiguration.value
-          val jobTrackerId: String = {
-            val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
-            formatter.format(new Date())
-          }
-          val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
-          val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
-          val inputSplit =
-            new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
-          var finished = false
-          val inputFormat = new CSVInputFormat()
-          val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
-          reader.initialize(inputSplit, hadoopAttemptContext)
-
-          override def hasNext: Boolean = {
-            if (!finished) {
-              if (reader != null) {
-                if (reader.nextKeyValue()) {
-                  true
-                } else {
-                  finished = true
-                  reader.close()
-                  false
-                }
-              } else {
-                finished = true
-                false
-              }
-            } else {
-              false
-            }
-          }
-
-          override def next(): InternalRow = {
-            new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
-          }
-        }
-      }
-    }
-    new FileScanRDD(spark, readFunction, partitions)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 2bd4f45..fb9ecac 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -321,7 +321,7 @@ object GlobalDictionaryUtil {
       carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
     // get load count
     if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
     }
     val absoluteTableIdentifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, table)
     DictionaryLoadModel(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 84215fd..b1e4083 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.PartitionUtils
 
 import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8d3110a..6f8155c 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -153,7 +153,7 @@ object CarbonDataRDDFactory {
     if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
       // update the updated table status. For the case of Update Delta Compaction the Metadata
       // is filled in LoadModel, no need to refresh.
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
     }
 
     val compactionThread = new Thread {
@@ -274,7 +274,7 @@ object CarbonDataRDDFactory {
     loadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
     loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
     loadModel.setTablePath(table.getTablePath)
-    CommonUtil.readLoadMetadataDetails(loadModel)
+    loadModel.readAndSetLoadMetadataDetails()
     val loadStartTime = CarbonUpdateUtil.readCurrentTime()
     loadModel.setFactTimeStamp(loadStartTime)
     loadModel

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index e1bef9c..47d918e 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -76,7 +76,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       }
 
       // scan again and determine if anything is there to merge again.
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
       segList = carbonLoadModel.getLoadMetadataDetails
       // in case of major compaction we will scan only once and come out as it will keep
       // on doing major for the new loads also.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 7d70534..adf5e04 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -35,13 +35,13 @@ import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory}
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 0fd5437..4e1ae56 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -23,10 +23,10 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 
 /**
  * Below command class will be used to create datamap on table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 2675036..dcc71a2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException}
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -34,7 +35,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.events._
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, NoSuchDataMapException}
 
 /**
  * Drops the datamap and any related tables associated with the datamap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 4645f98..75de1fe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -21,15 +21,17 @@ import java.io.{File, IOException}
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel, DataCommand}
+import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.AlterTableUtil
 
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -44,7 +46,6 @@ import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException}
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.streaming.StreamHandoffRDD
@@ -90,7 +91,16 @@ case class CarbonAlterTableCompactionCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    operationContext.setProperty("compactionException", "true")
+    val LOGGER: LogService =
+      LogServiceFactory.getLogService(this.getClass.getName)
+    val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
+    if (isLoadInProgress) {
+      val message = "Cannot run data loading and compaction on same table concurrently. " +
+                    "Please wait for load to finish"
+      LOGGER.error(message)
+      throw new ConcurrentOperationException(message)
+    }
+
     var compactionType: CompactionType = null
     var compactionException = "true"
     try {
@@ -103,7 +113,6 @@ case class CarbonAlterTableCompactionCommand(
           .fireEvent(alterTableCompactionExceptionEvent, operationContext)
         compactionException = operationContext.getProperty("compactionException").toString
     }
-
     if (compactionException.equalsIgnoreCase("true") && null == compactionType) {
       throw new MalformedCarbonCommandException(
         "Unsupported alter operation on carbon table")
@@ -159,8 +168,7 @@ case class CarbonAlterTableCompactionCommand(
       operationContext: OperationContext): Unit = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
     val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
-    val compactionSize: Long = CarbonDataMergerUtil
-      .getCompactionSize(compactionType, carbonLoadModel)
+    val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel)
     if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
       if (alterTableModel.segmentUpdateStatusManager.isDefined) {
         carbonLoadModel.setSegmentUpdateStatusManager(
@@ -175,7 +183,7 @@ case class CarbonAlterTableCompactionCommand(
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
     }
 
     if (compactionType == CompactionType.STREAMING) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 5dbd383..4806a6f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -46,6 +46,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -70,12 +71,15 @@ import org.apache.carbondata.processing.exception.DataLoadingException
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionRDD}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
 import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
@@ -174,7 +178,7 @@ case class CarbonLoadDataCommand(
     val carbonLoadModel = new CarbonLoadModel()
     try {
       val tableProperties = table.getTableInfo.getFactTable.getTableProperties
-      val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+      val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
       optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
         carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
           carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
@@ -189,10 +193,8 @@ case class CarbonLoadDataCommand(
       carbonLoadModel.setAggLoadRequest(
         internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
       carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
-      DataLoadingUtil.buildCarbonLoadModel(
-        table,
-        carbonProperty,
-        options,
+      new CarbonLoadModelBuilder(table).build(
+        options.asJava,
         optionsFinal,
         carbonLoadModel,
         hadoopConf,
@@ -221,7 +223,7 @@ case class CarbonLoadDataCommand(
             carbonLoadModel,
             factPath,
             dataFrame.isDefined,
-            optionsFinal.asJava,
+            optionsFinal,
             options.asJava,
             isOverwriteTable)
         operationContext.setProperty("isOverwrite", isOverwriteTable)
@@ -229,6 +231,7 @@ case class CarbonLoadDataCommand(
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         // Clean up the old invalid segment data before creating a new entry for new load.
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false)
         DataLoadingUtil.deleteLoadsAndUpdateMetadata(
           isForceDeletion = false,
           table,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index a37d6dc..f074285 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,13 +22,13 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 /**
  * IUD update delete and compaction framework.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 4886676..5165342 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.ArrayType
 import org.apache.spark.storage.StorageLevel
 
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.Segment
@@ -34,7 +35,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
 import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 private[sql] case class CarbonProjectForUpdateCommand(
     plan: LogicalPlan,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
index 5817d88..220d75d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
@@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.HiveSessionCatalog
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * Util for IUD common function

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 4d0a4c5..c836584 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
 import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.spark.util.DataLoadingUtil
 
 /**
  * Below command class will be used to create pre-aggregate table
@@ -179,11 +178,7 @@ case class CreatePreAggregateTableCommand(
     // This will be used to check if the parent table has any segments or not. If not then no
     // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
     // table.
-    DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false,
-      parentTable,
-      CarbonFilters.getCurrentPartitions(sparkSession,
-      TableIdentifier(parentTable.getTableName,
-        Some(parentTable.getDatabaseName))).map(_.asJava).orNull)
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
     val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
     if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
       load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 9df3241..adce70e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types.DataType
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchem
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 870c140..5f8eb12 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.util.AlterTableUtil
 
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -35,7 +37,6 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException}
 
 private[sql] case class CarbonAlterTableRenameCommand(
     alterTableRenameModel: AlterTableRenameModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 45767da..2b35416 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -18,12 +18,12 @@ package org.apache.spark.sql.execution.command.timeseries
 
 import org.apache.spark.sql.execution.command.{DataMapField, Field}
 
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
 import org.apache.carbondata.core.metadata.schema.datamap.Granularity
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.preagg.TimeSeriesUDF
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 
 /**
  * Utility class for time series to keep

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index b4d3bea..61a31a5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -50,6 +50,9 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
 
@@ -87,7 +90,7 @@ with Serializable {
       TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
     val model = new CarbonLoadModel
     val carbonProperty = CarbonProperties.getInstance()
-    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+    val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
     val tableProperties = table.getTableInfo.getFactTable.getTableProperties
     optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
       carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
@@ -102,14 +105,11 @@ with Serializable {
     val optionsLocal = new mutable.HashMap[String, String]()
     optionsLocal ++= options
     optionsLocal += (("header", "false"))
-    DataLoadingUtil.buildCarbonLoadModel(
-      table,
-      carbonProperty,
-      optionsLocal.toMap,
+    new CarbonLoadModelBuilder(table).build(
+      optionsLocal.toMap.asJava,
       optionsFinal,
       model,
-      conf
-    )
+      conf)
     model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
     model.setDictionaryServerHost(options.getOrElse("dicthost", null))
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index f69ccc1..59ff58e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -32,10 +32,10 @@ import org.apache.spark.sql.execution.datasources.RefreshTable
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.merger.CompactionType
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * Carbon strategies for ddl commands

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 608ec60..7028dcf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command.AlterTableRenameCommand
 import org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, CarbonProjectForUpdateCommand}
 import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
 
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 /**
  * Strategy for streaming table, like blocking unsupported operation

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 7ca34af..27c7d17 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
   extends RunnableCommand {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 7addd26..a0f80f0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -36,9 +36,9 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index ad6d0c7..ef4836e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -32,10 +32,10 @@ import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index bc36e9c..aaa87a3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog}
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -39,7 +40,6 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 object AlterTableUtil {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
index bc62902..a8094b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
@@ -19,10 +19,10 @@ package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * table api util

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index b9425d6..1e4d001 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -44,10 +44,11 @@ import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlPa
 import org.apache.spark.sql.types.DecimalType
 import org.apache.spark.util.CarbonReflectionUtils
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index c676b01..5ade510 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -4,7 +4,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 /**
  * Created by rahul on 19/9/17.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 1d41ddc..3298009 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.TableOptionConstant
 
 /**
@@ -63,7 +63,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
       CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     carbonLoadModel.setCsvHeaderColumns(
-      CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
+      LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
     // Create table and metadata folders if not exist
     val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
     val fileType = FileFactory.getFileType(metadataDirectoryPath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 7ca0b56..43d8c03 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -23,14 +23,14 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.TableOptionConstant
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * test case for external column dictionary generation
@@ -176,7 +176,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
       CarbonCommonConstants.CARBON_DATE_FORMAT,
       CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
     carbonLoadModel.setCsvHeaderColumns(
-      CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
+      LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
     carbonLoadModel.setMaxColumns("100")
     // Create table and metadata folders if not exist
     val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index f1c27ae..9d9f01d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -32,14 +32,12 @@ import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 9da7244..65a006b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -21,11 +21,11 @@ import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index ac10b9a..995f041 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -26,9 +26,9 @@ import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException}
 
 class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 638ad39..890492e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -18,14 +18,16 @@
 package org.apache.carbondata.processing.loading.model;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 public class CarbonLoadModel implements Serializable {
 
@@ -868,6 +870,7 @@ public class CarbonLoadModel implements Serializable {
     this.skipEmptyLine = skipEmptyLine;
   }
 
+
   public boolean isPartitionLoad() {
     return isPartitionLoad;
   }
@@ -883,4 +886,13 @@ public class CarbonLoadModel implements Serializable {
   public void setDataWritePath(String dataWritePath) {
     this.dataWritePath = dataWritePath;
   }
+
+  /**
+   * Read segments metadata from table status file and set it to this load model object
+   */
+  public void readAndSetLoadMetadataDetails() {
+    String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metadataPath);
+    setLoadMetadataDetails(Arrays.asList(details));
+  }
 }