You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/10/31 07:00:18 UTC

[20/22] carbondata git commit: [CARBONDATA-1628][Streaming] Re-factory LoadTableCommand to reuse code for streaming ingest in the future

[CARBONDATA-1628][Streaming] Re-factory LoadTableCommand to reuse code for streaming ingest in the future

Re-factory LoadTableCommand to reuse code for streaming ingest in the future

This closes #1439


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5936e7fb
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5936e7fb
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5936e7fb

Branch: refs/heads/fgdatamap
Commit: 5936e7fb25c9d5125a5863aa7648cffe5a1d8f7c
Parents: 4d70a21
Author: QiangCai <qi...@qq.com>
Authored: Fri Oct 27 16:06:06 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Oct 30 14:55:00 2017 +0530

----------------------------------------------------------------------
 .../carbondata/spark/util/DataLoadingUtil.scala | 300 ++++++++++++
 .../command/management/LoadTableCommand.scala   | 464 ++++++-------------
 2 files changed, 452 insertions(+), 312 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5936e7fb/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
new file mode 100644
index 0000000..445fdbb
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.util
+
+import scala.collection.immutable
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.TableOptionConstant
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.load.ValidateUtil
+
+/**
+ * the util object of data loading
+ */
+object DataLoadingUtil {
+
+  /**
+   * get data loading options and initialise default value
+   */
+  def getDataLoadingOptions(
+      carbonProperty: CarbonProperties,
+      options: immutable.Map[String, String]): mutable.Map[String, String] = {
+    val optionsFinal = scala.collection.mutable.Map[String, String]()
+    optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
+    optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
+    optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
+    optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
+    optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
+    optionsFinal.put("columndict", options.getOrElse("columndict", null))
+
+    optionsFinal.put(
+      "serialization_null_format",
+      options.getOrElse("serialization_null_format", "\\N"))
+
+    optionsFinal.put(
+      "bad_records_logger_enable",
+      options.getOrElse(
+        "bad_records_logger_enable",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
+
+    val badRecordActionValue = carbonProperty.getProperty(
+      CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+      CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
+
+    optionsFinal.put(
+      "bad_records_action",
+      options.getOrElse(
+        "bad_records_action",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+          badRecordActionValue)))
+
+    optionsFinal.put(
+      "is_empty_data_bad_record",
+      options.getOrElse(
+        "is_empty_data_bad_record",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
+
+    optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
+
+    optionsFinal.put(
+      "complex_delimiter_level_1",
+      options.getOrElse("complex_delimiter_level_1", "\\$"))
+
+    optionsFinal.put(
+      "complex_delimiter_level_2",
+      options.getOrElse("complex_delimiter_level_2", "\\:"))
+
+    optionsFinal.put(
+      "dateformat",
+      options.getOrElse(
+        "dateformat",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
+
+    optionsFinal.put(
+      "global_sort_partitions",
+      options.getOrElse(
+        "global_sort_partitions",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+          null)))
+
+    optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
+
+    optionsFinal.put(
+      "batch_sort_size_inmb",
+      options.getOrElse(
+        "batch_sort_size_inmb",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+          carbonProperty.getProperty(
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
+
+    optionsFinal.put(
+      "bad_record_path",
+      options.getOrElse(
+        "bad_record_path",
+        carbonProperty.getProperty(
+          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+          carbonProperty.getProperty(
+            CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+            CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
+
+    val useOnePass = options.getOrElse(
+      "single_pass",
+      carbonProperty.getProperty(
+        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
+      case "true" =>
+        true
+      case "false" =>
+        // when single_pass = false  and if either alldictionarypath
+        // or columnDict is configured the do not allow load
+        if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
+            StringUtils.isNotEmpty(optionsFinal("columndict"))) {
+          throw new MalformedCarbonCommandException(
+            "Can not use all_dictionary_path or columndict without single_pass.")
+        } else {
+          false
+        }
+      case illegal =>
+        val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+        LOGGER.error(s"Can't use single_pass, because illegal syntax found: [$illegal] " +
+                     "Please set it as 'true' or 'false'")
+        false
+    }
+    optionsFinal.put("single_pass", useOnePass.toString)
+    optionsFinal
+  }
+
+  /**
+   * check whether using default value or not
+   */
+  private def checkDefaultValue(value: String, default: String) = {
+    if (StringUtils.isEmpty(value)) {
+      default
+    } else {
+      value
+    }
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   */
+  def buildCarbonLoadModel(
+      table: CarbonTable,
+      carbonProperty: CarbonProperties,
+      options: immutable.Map[String, String],
+      optionsFinal: mutable.Map[String, String],
+      carbonLoadModel: CarbonLoadModel): Unit = {
+    carbonLoadModel.setTableName(table.getFactTableName)
+    carbonLoadModel.setDatabaseName(table.getDatabaseName)
+    carbonLoadModel.setStorePath(table.getStorePath)
+    carbonLoadModel.setTableName(table.getFactTableName)
+    val dataLoadSchema = new CarbonDataLoadSchema(table)
+    // Need to fill dimension relation
+    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
+    val sort_scope = optionsFinal("sort_scope")
+    val single_pass = optionsFinal("single_pass")
+    val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
+    val bad_records_action = optionsFinal("bad_records_action")
+    val bad_record_path = optionsFinal("bad_record_path")
+    val global_sort_partitions = optionsFinal("global_sort_partitions")
+    val dateFormat = optionsFinal("dateformat")
+    val delimeter = optionsFinal("delimiter")
+    val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
+    val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
+    val all_dictionary_path = optionsFinal("all_dictionary_path")
+    val column_dict = optionsFinal("columndict")
+    ValidateUtil.validateDateFormat(dateFormat, table, table.getFactTableName)
+    ValidateUtil.validateSortScope(table, sort_scope)
+
+    if (bad_records_logger_enable.toBoolean ||
+        LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
+      if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+        sys.error("Invalid bad records location.")
+      }
+    }
+    carbonLoadModel.setBadRecordsLocation(bad_record_path)
+
+    ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
+    carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
+    carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
+    carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
+
+    // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
+    // we should use table schema to generate file header.
+    var fileHeader = optionsFinal("fileheader")
+    val headerOption = options.get("header")
+    if (headerOption.isDefined) {
+      // whether the csv file has file header
+      // the default value is true
+      val header = try {
+        headerOption.get.toBoolean
+      } catch {
+        case ex: IllegalArgumentException =>
+          throw new MalformedCarbonCommandException(
+            "'header' option should be either 'true' or 'false'. " + ex.getMessage)
+      }
+      if (header) {
+        if (fileHeader.nonEmpty) {
+          throw new MalformedCarbonCommandException(
+            "When 'header' option is true, 'fileheader' option is not required.")
+        }
+      } else {
+        if (fileHeader.isEmpty) {
+          fileHeader = table.getCreateOrderColumn(table.getFactTableName)
+            .asScala.map(_.getColName).mkString(",")
+        }
+      }
+    }
+
+    carbonLoadModel.setDateFormat(dateFormat)
+    carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
+      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
+
+    carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
+      CarbonCommonConstants.CARBON_DATE_FORMAT,
+      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
+
+    carbonLoadModel.setSerializationNullFormat(
+        TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
+        optionsFinal("serialization_null_format"))
+
+    carbonLoadModel.setBadRecordsLoggerEnable(
+        TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
+
+    carbonLoadModel.setBadRecordsAction(
+        TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
+
+    carbonLoadModel.setIsEmptyDataBadRecord(
+        DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
+        optionsFinal("is_empty_data_bad_record"))
+
+    carbonLoadModel.setSortScope(sort_scope)
+    carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
+    carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
+    carbonLoadModel.setUseOnePass(single_pass.toBoolean)
+
+    if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
+        complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
+        delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
+      sys.error(s"Field Delimiter & Complex types delimiter are same")
+    } else {
+      carbonLoadModel.setComplexDelimiterLevel1(
+        CarbonUtil.delimiterConverter(complex_delimeter_level1))
+      carbonLoadModel.setComplexDelimiterLevel2(
+        CarbonUtil.delimiterConverter(complex_delimeter_level2))
+    }
+    // set local dictionary path, and dictionary file extension
+    carbonLoadModel.setAllDictPath(all_dictionary_path)
+    carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
+    carbonLoadModel.setCsvHeader(fileHeader)
+    carbonLoadModel.setColDictFilePath(column_dict)
+    carbonLoadModel.setDirectLoad(true)
+    carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
+
+    val validatedMaxColumns = CommonUtil.validateMaxColumns(
+      carbonLoadModel.getCsvHeaderColumns,
+      optionsFinal("maxcolumns"))
+
+    carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
+    if (null == carbonLoadModel.getLoadMetadataDetails) {
+      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5936e7fb/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
index 9018f7b..630ee27 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala
@@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, DataPro
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{CausedBy, FileUtils}
 
-import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -39,14 +38,11 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format
 import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.exception.NoRetryException
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.TableOptionConstant
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.ValidateUtil
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
-import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
 
 case class LoadTableCommand(
     databaseNameOp: Option[String],
@@ -60,89 +56,6 @@ case class LoadTableCommand(
     updateModel: Option[UpdateTableModel] = None)
   extends RunnableCommand with DataProcessCommand {
 
-  private def getFinalOptions(carbonProperty: CarbonProperties):
-  scala.collection.mutable.Map[String, String] = {
-    val optionsFinal = scala.collection.mutable.Map[String, String]()
-    optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
-    optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
-    optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
-    optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
-    optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
-    optionsFinal.put("columndict", options.getOrElse("columndict", null))
-    optionsFinal
-      .put("serialization_null_format", options.getOrElse("serialization_null_format", "\\N"))
-    optionsFinal.put("bad_records_logger_enable", options.getOrElse("bad_records_logger_enable",
-      carbonProperty
-        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
-    val badRecordActionValue = carbonProperty
-      .getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-    optionsFinal.put("bad_records_action", options.getOrElse("bad_records_action", carbonProperty
-      .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-        badRecordActionValue)))
-    optionsFinal
-      .put("is_empty_data_bad_record", options.getOrElse("is_empty_data_bad_record", carbonProperty
-        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
-    optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
-    optionsFinal
-      .put("complex_delimiter_level_1", options.getOrElse("complex_delimiter_level_1", "\\$"))
-    optionsFinal
-      .put("complex_delimiter_level_2", options.getOrElse("complex_delimiter_level_2", "\\:"))
-    optionsFinal.put("dateformat", options.getOrElse("dateformat",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-        CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
-
-    optionsFinal.put("global_sort_partitions", options.getOrElse("global_sort_partitions",
-      carbonProperty
-        .getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)))
-
-    optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
-
-    optionsFinal.put("batch_sort_size_inmb", options.getOrElse("batch_sort_size_inmb",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
-        carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-          CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
-
-    optionsFinal.put("bad_record_path", options.getOrElse("bad_record_path",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-        carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-          CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
-
-    val useOnePass = options.getOrElse("single_pass",
-      carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
-        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
-      case "true" =>
-        true
-      case "false" =>
-        // when single_pass = false  and if either alldictionarypath
-        // or columnDict is configured the do not allow load
-        if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
-            StringUtils.isNotEmpty(optionsFinal("columndict"))) {
-          throw new MalformedCarbonCommandException(
-            "Can not use all_dictionary_path or columndict without single_pass.")
-        } else {
-          false
-        }
-      case illegal =>
-        val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-        LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
-                     "Please set it as 'true' or 'false'")
-        false
-    }
-    optionsFinal.put("single_pass", useOnePass.toString)
-    optionsFinal
-  }
-
-  private def checkDefaultValue(value: String, default: String) = {
-    if (StringUtils.isEmpty(value)) {
-      default
-    } else {
-      value
-    }
-  }
-
   override def run(sparkSession: SparkSession): Seq[Row] = {
     processData(sparkSession)
   }
@@ -158,7 +71,6 @@ case class LoadTableCommand(
     }
 
     val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
     if (relation == null) {
@@ -172,7 +84,7 @@ case class LoadTableCommand(
 
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
-    val optionsFinal = getFinalOptions(carbonProperty)
+    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
 
     val tableProperties = relation.tableMeta.carbonTable.getTableInfo
       .getFactTable.getTableProperties
@@ -183,133 +95,26 @@ case class LoadTableCommand(
           CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
 
     try {
+      val table = relation.tableMeta.carbonTable
+      val carbonLoadModel = new CarbonLoadModel()
       val factPath = if (dataFrame.isDefined) {
         ""
       } else {
         FileUtils.getPaths(
           CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
       }
-      val carbonLoadModel = new CarbonLoadModel()
-      carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
-      carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
-      carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
-
-      val table = relation.tableMeta.carbonTable
-      carbonLoadModel.setTableName(table.getFactTableName)
-      val dataLoadSchema = new CarbonDataLoadSchema(table)
-      // Need to fill dimension relation
-      carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-
-      val partitionLocation = relation.tableMeta.storePath + "/partition/" +
-                              relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
-                              relation.tableMeta.carbonTableIdentifier.getTableName + "/"
-      val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
-      val sort_scope = optionsFinal("sort_scope")
-      val single_pass = optionsFinal("single_pass")
-      val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
-      val bad_records_action = optionsFinal("bad_records_action")
-      val bad_record_path = optionsFinal("bad_record_path")
-      val global_sort_partitions = optionsFinal("global_sort_partitions")
-      val dateFormat = optionsFinal("dateformat")
-      val delimeter = optionsFinal("delimiter")
-      val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
-      val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
-      val all_dictionary_path = optionsFinal("all_dictionary_path")
-      val column_dict = optionsFinal("columndict")
-      ValidateUtil.validateDateFormat(dateFormat, table, tableName)
-      ValidateUtil.validateSortScope(table, sort_scope)
-
-      if (bad_records_logger_enable.toBoolean ||
-          LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
-        if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
-          sys.error("Invalid bad records location.")
-        }
-      }
-      carbonLoadModel.setBadRecordsLocation(bad_record_path)
-
-      ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
-      carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
-      carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
-      carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
-
-      // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
-      // we should use table schema to generate file header.
-      var fileHeader = optionsFinal("fileheader")
-      val headerOption = options.get("header")
-      if (headerOption.isDefined) {
-        // whether the csv file has file header
-        // the default value is true
-        val header = try {
-          headerOption.get.toBoolean
-        } catch {
-          case ex: IllegalArgumentException =>
-            throw new MalformedCarbonCommandException(
-              "'header' option should be either 'true' or 'false'. " + ex.getMessage)
-        }
-        if (header) {
-          if (fileHeader.nonEmpty) {
-            throw new MalformedCarbonCommandException(
-              "When 'header' option is true, 'fileheader' option is not required.")
-          }
-        } else {
-          if (fileHeader.isEmpty) {
-            fileHeader = table.getCreateOrderColumn(table.getFactTableName)
-              .asScala.map(_.getColName).mkString(",")
-          }
-        }
-      }
-
-      carbonLoadModel.setDateFormat(dateFormat)
-      carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
-        CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-      carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
-        CarbonCommonConstants.CARBON_DATE_FORMAT,
-        CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
-      carbonLoadModel
-        .setSerializationNullFormat(
-          TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
-          optionsFinal("serialization_null_format"))
-      carbonLoadModel
-        .setBadRecordsLoggerEnable(
-          TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
-      carbonLoadModel
-        .setBadRecordsAction(
-          TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
-      carbonLoadModel
-        .setIsEmptyDataBadRecord(
-          DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
-          optionsFinal("is_empty_data_bad_record"))
-      carbonLoadModel.setSortScope(sort_scope)
-      carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
-      carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
-      carbonLoadModel.setUseOnePass(single_pass.toBoolean)
-      if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
-          complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
-          delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
-        sys.error(s"Field Delimiter & Complex types delimiter are same")
-      } else {
-        carbonLoadModel.setComplexDelimiterLevel1(
-          CarbonUtil.delimiterConverter(complex_delimeter_level1))
-        carbonLoadModel.setComplexDelimiterLevel2(
-          CarbonUtil.delimiterConverter(complex_delimeter_level2))
-      }
-      // set local dictionary path, and dictionary file extension
-      carbonLoadModel.setAllDictPath(all_dictionary_path)
-
-      val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-      try {
+      carbonLoadModel.setFactFilePath(factPath)
+      DataLoadingUtil.buildCarbonLoadModel(
+        table,
+        carbonProperty,
+        options,
+        optionsFinal,
+        carbonLoadModel
+      )
+
+      try{
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
-        carbonLoadModel.setFactFilePath(factPath)
-        carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
-        carbonLoadModel.setCsvHeader(fileHeader)
-        carbonLoadModel.setColDictFilePath(column_dict)
-        carbonLoadModel.setDirectLoad(true)
-        carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
-        val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
-          optionsFinal("maxcolumns"))
-        carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
         GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
         val storePath = relation.tableMeta.storePath
         // add the start entry for the new load in the table status file
@@ -320,11 +125,9 @@ case class LoadTableCommand(
         if (isOverwriteTable) {
           LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
         }
-        if (null == carbonLoadModel.getLoadMetadataDetails) {
-          CommonUtil.readLoadMetadataDetails(carbonLoadModel)
-        }
         if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
-            StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) {
+            StringUtils.isEmpty(carbonLoadModel.getColDictFilePath) &&
+            StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
           LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
           LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
           carbonLoadModel.setUseOnePass(false)
@@ -337,111 +140,21 @@ case class LoadTableCommand(
         if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
           FileFactory.mkdirs(metadataDirectoryPath, fileType)
         }
+        val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
+        val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
         if (carbonLoadModel.getUseOnePass) {
-          val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-          val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-            .getCarbonTableIdentifier
-          val carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(storePath, carbonTableIdentifier)
-          val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
-          val dimensions = carbonTable.getDimensionByTableName(
-            carbonTable.getFactTableName).asScala.toArray
-          val colDictFilePath = carbonLoadModel.getColDictFilePath
-          if (!StringUtils.isEmpty(colDictFilePath)) {
-            carbonLoadModel.initPredefDictMap()
-            // generate predefined dictionary
-            GlobalDictionaryUtil
-              .generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
-                dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath)
-          }
-          if (!StringUtils.isEmpty(all_dictionary_path)) {
-            carbonLoadModel.initPredefDictMap()
-            GlobalDictionaryUtil
-              .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
-                carbonLoadModel,
-                storePath,
-                carbonTableIdentifier,
-                dictFolderPath,
-                dimensions,
-                all_dictionary_path)
-          }
-          // dictionaryServerClient dictionary generator
-          val dictionaryServerPort = carbonProperty
-            .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
-              CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
-          val sparkDriverHost = sparkSession.sqlContext.sparkContext.
-            getConf.get("spark.driver.host")
-          carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
-          // start dictionary server when use one pass load and dimension with DICTIONARY
-          // encoding is present.
-          val allDimensions = table.getAllDimensions.asScala.toList
-          val createDictionary = allDimensions.exists {
-            carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
-                               !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
-          }
-          val server: Option[DictionaryServer] = if (createDictionary) {
-            val dictionaryServer = DictionaryServer
-              .getInstance(dictionaryServerPort.toInt, carbonTable)
-            carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
-            sparkSession.sparkContext.addSparkListener(new SparkListener() {
-              override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
-                dictionaryServer.shutdown()
-              }
-            })
-            Some(dictionaryServer)
-          } else {
-            None
-          }
-          CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+          loadDataUsingOnePass(
+            sparkSession,
+            carbonProperty,
             carbonLoadModel,
-            relation.tableMeta.storePath,
             columnar,
-            partitionStatus,
-            server,
-            isOverwriteTable,
-            dataFrame,
-            updateModel)
+            partitionStatus)
         } else {
-          val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
-            val fields = dataFrame.get.schema.fields
-            import org.apache.spark.sql.functions.udf
-            // extracting only segment from tupleId
-            val getSegIdUDF = udf((tupleId: String) =>
-              CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
-            // getting all fields except tupleId field as it is not required in the value
-            var otherFields = fields.toSeq
-              .filter(field => !field.name
-                .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-              .map(field => new Column(field.name))
-
-            // extract tupleId field which will be used as a key
-            val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
-              .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
-              as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
-            // use dataFrameWithoutTupleId as dictionaryDataFrame
-            val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
-            otherFields = otherFields :+ segIdColumn
-            // use dataFrameWithTupleId as loadDataFrame
-            val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
-            (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
-          } else {
-            (dataFrame, dataFrame)
-          }
-
-          GlobalDictionaryUtil.generateGlobalDictionary(
-            sparkSession.sqlContext,
-            carbonLoadModel,
-            relation.tableMeta.storePath,
-            dictionaryDataFrame)
-          CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+          loadData(
+            sparkSession,
             carbonLoadModel,
-            relation.tableMeta.storePath,
             columnar,
-            partitionStatus,
-            None,
-            isOverwriteTable,
-            loadDataFrame,
-            updateModel)
+            partitionStatus)
         }
       } catch {
         case CausedBy(ex: NoRetryException) =>
@@ -454,6 +167,9 @@ case class LoadTableCommand(
       } finally {
         // Once the data load is successful delete the unwanted partition files
         try {
+          val partitionLocation = table.getStorePath + "/partition/" +
+                                  table.getDatabaseName + "/" +
+                                  table.getFactTableName + "/"
           val fileType = FileFactory.getFileType(partitionLocation)
           if (FileFactory.isFileExist(partitionLocation, fileType)) {
             val file = FileFactory
@@ -480,6 +196,130 @@ case class LoadTableCommand(
     Seq.empty
   }
 
+  private def loadDataUsingOnePass(
+      sparkSession: SparkSession,
+      carbonProperty: CarbonProperties,
+      carbonLoadModel: CarbonLoadModel,
+      columnar: Boolean,
+      partitionStatus: String): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
+      .getCarbonTableIdentifier
+    val carbonTablePath = CarbonStorePath
+      .getCarbonTablePath(carbonLoadModel.getStorePath, carbonTableIdentifier)
+    val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
+    val dimensions = carbonTable.getDimensionByTableName(
+      carbonTable.getFactTableName).asScala.toArray
+    val colDictFilePath = carbonLoadModel.getColDictFilePath
+    if (!StringUtils.isEmpty(colDictFilePath)) {
+      carbonLoadModel.initPredefDictMap()
+      // generate predefined dictionary
+      GlobalDictionaryUtil.generatePredefinedColDictionary(
+        colDictFilePath,
+        carbonTableIdentifier,
+        dimensions,
+        carbonLoadModel,
+        sparkSession.sqlContext,
+        carbonLoadModel.getStorePath,
+        dictFolderPath)
+    }
+    if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
+      carbonLoadModel.initPredefDictMap()
+      GlobalDictionaryUtil
+        .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
+          carbonLoadModel,
+          carbonLoadModel.getStorePath,
+          carbonTableIdentifier,
+          dictFolderPath,
+          dimensions,
+          carbonLoadModel.getAllDictPath)
+    }
+    // dictionaryServerClient dictionary generator
+    val dictionaryServerPort = carbonProperty
+      .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
+        CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
+    val sparkDriverHost = sparkSession.sqlContext.sparkContext.
+      getConf.get("spark.driver.host")
+    carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
+    // start dictionary server when use one pass load and dimension with DICTIONARY
+    // encoding is present.
+    val allDimensions =
+      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
+    val createDictionary = allDimensions.exists {
+      carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+                         !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
+    }
+    val server: Option[DictionaryServer] = if (createDictionary) {
+      val dictionaryServer = DictionaryServer
+        .getInstance(dictionaryServerPort.toInt, carbonTable)
+      carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+      sparkSession.sparkContext.addSparkListener(new SparkListener() {
+        override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
+          dictionaryServer.shutdown()
+        }
+      })
+      Some(dictionaryServer)
+    } else {
+      None
+    }
+    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+      carbonLoadModel,
+      carbonLoadModel.getStorePath,
+      columnar,
+      partitionStatus,
+      server,
+      isOverwriteTable,
+      dataFrame,
+      updateModel)
+  }
+
+  private def loadData(
+      sparkSession: SparkSession,
+      carbonLoadModel: CarbonLoadModel,
+      columnar: Boolean,
+      partitionStatus: String): Unit = {
+    val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
+      val fields = dataFrame.get.schema.fields
+      import org.apache.spark.sql.functions.udf
+      // extracting only segment from tupleId
+      val getSegIdUDF = udf((tupleId: String) =>
+        CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
+      // getting all fields except tupleId field as it is not required in the value
+      var otherFields = fields.toSeq
+        .filter(field => !field.name
+          .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
+        .map(field => new Column(field.name))
+
+      // extract tupleId field which will be used as a key
+      val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
+        .quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
+        as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
+      // use dataFrameWithoutTupleId as dictionaryDataFrame
+      val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
+      otherFields = otherFields :+ segIdColumn
+      // use dataFrameWithTupleId as loadDataFrame
+      val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
+      (Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
+    } else {
+      (dataFrame, dataFrame)
+    }
+
+    GlobalDictionaryUtil.generateGlobalDictionary(
+      sparkSession.sqlContext,
+      carbonLoadModel,
+      carbonLoadModel.getStorePath,
+      dictionaryDataFrame)
+    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+      carbonLoadModel,
+      carbonLoadModel.getStorePath,
+      columnar,
+      partitionStatus,
+      None,
+      isOverwriteTable,
+      loadDataFrame,
+      updateModel)
+  }
+
   private def updateTableMetadata(
       carbonLoadModel: CarbonLoadModel,
       sqlContext: SQLContext,