You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/12 08:07:07 UTC

[1/3] carbondata git commit: [CARBONDATA-2159] Remove carbon-spark dependency in store-sdk module

Repository: carbondata
Updated Branches:
  refs/heads/carbonstore a848ccff8 -> 0d50f6546


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
new file mode 100644
index 0000000..fbb93b6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.Strings;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Builder for {@link CarbonLoadModel}
+ */
+@InterfaceAudience.Developer
+public class CarbonLoadModelBuilder {
+
+  private CarbonTable table;
+
+  public CarbonLoadModelBuilder(CarbonTable table) {
+    this.table = table;
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   * @param options Load options from user input
+   * @return a new CarbonLoadModel instance
+   */
+  public CarbonLoadModel build(
+      Map<String, String> options) throws InvalidLoadOptionException, IOException {
+    Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options);
+    optionsFinal.put("sort_scope", "no_sort");
+    if (!options.containsKey("fileheader")) {
+      List<CarbonColumn> csvHeader = table.getCreateOrderColumn(table.getTableName());
+      String[] columns = new String[csvHeader.size()];
+      for (int i = 0; i < columns.length; i++) {
+        columns[i] = csvHeader.get(i).getColName();
+      }
+      optionsFinal.put("fileheader", Strings.mkString(columns, ","));
+    }
+    CarbonLoadModel model = new CarbonLoadModel();
+
+    // we have provided 'fileheader', so it hadoopConf can be null
+    build(options, optionsFinal, model, null);
+
+    // set default values
+    model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+    model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+    model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options, "onepass", "false")));
+    model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost", null));
+    try {
+      model.setDictionaryServerPort(Integer.parseInt(Maps.getOrDefault(options, "dictport", "-1")));
+    } catch (NumberFormatException e) {
+      throw new InvalidLoadOptionException(e.getMessage());
+    }
+    return model;
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   * @param options Load options from user input
+   * @param optionsFinal Load options that populated with default values for optional options
+   * @param carbonLoadModel The output load model
+   * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
+   *                   user provided load options
+   */
+  public void build(
+      Map<String, String> options,
+      Map<String, String> optionsFinal,
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf) throws InvalidLoadOptionException, IOException {
+    carbonLoadModel.setTableName(table.getTableName());
+    carbonLoadModel.setDatabaseName(table.getDatabaseName());
+    carbonLoadModel.setTablePath(table.getTablePath());
+    carbonLoadModel.setTableName(table.getTableName());
+    CarbonDataLoadSchema dataLoadSchema = new CarbonDataLoadSchema(table);
+    // Need to fill dimension relation
+    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema);
+    String sort_scope = optionsFinal.get("sort_scope");
+    String single_pass = optionsFinal.get("single_pass");
+    String bad_records_logger_enable = optionsFinal.get("bad_records_logger_enable");
+    String bad_records_action = optionsFinal.get("bad_records_action");
+    String bad_record_path = optionsFinal.get("bad_record_path");
+    String global_sort_partitions = optionsFinal.get("global_sort_partitions");
+    String timestampformat = optionsFinal.get("timestampformat");
+    String dateFormat = optionsFinal.get("dateformat");
+    String delimeter = optionsFinal.get("delimiter");
+    String complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1");
+    String complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2");
+    String all_dictionary_path = optionsFinal.get("all_dictionary_path");
+    String column_dict = optionsFinal.get("columndict");
+    validateDateTimeFormat(timestampformat, "TimestampFormat");
+    validateDateTimeFormat(dateFormat, "DateFormat");
+    validateSortScope(sort_scope);
+
+    if (Boolean.parseBoolean(bad_records_logger_enable) ||
+        LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
+      bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path);
+      if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+        throw new InvalidLoadOptionException("Invalid bad records location.");
+      }
+    }
+    carbonLoadModel.setBadRecordsLocation(bad_record_path);
+
+    validateGlobalSortPartitions(global_sort_partitions);
+    carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar"), "\\"));
+    carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar"), "\""));
+    carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar"), "#"));
+
+    // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
+    // we should use table schema to generate file header.
+    String fileHeader = optionsFinal.get("fileheader");
+    String headerOption = options.get("header");
+    if (headerOption != null) {
+      if (!headerOption.equalsIgnoreCase("true") &&
+          !headerOption.equalsIgnoreCase("false")) {
+        throw new InvalidLoadOptionException(
+            "'header' option should be either 'true' or 'false'.");
+      }
+      // whether the csv file has file header, the default value is true
+      if (Boolean.valueOf(headerOption)) {
+        if (!StringUtils.isEmpty(fileHeader)) {
+          throw new InvalidLoadOptionException(
+              "When 'header' option is true, 'fileheader' option is not required.");
+        }
+      } else {
+        if (StringUtils.isEmpty(fileHeader)) {
+          List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
+          String[] columnNames = new String[columns.size()];
+          for (int i = 0; i < columnNames.length; i++) {
+            columnNames[i] = columns.get(i).getColName();
+          }
+          fileHeader = Strings.mkString(columnNames, ",");
+        }
+      }
+    }
+
+    carbonLoadModel.setTimestampformat(timestampformat);
+    carbonLoadModel.setDateFormat(dateFormat);
+    carbonLoadModel.setDefaultTimestampFormat(
+        CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+
+    carbonLoadModel.setDefaultDateFormat(
+        CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CARBON_DATE_FORMAT,
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+
+    carbonLoadModel.setSerializationNullFormat(
+        TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," +
+            optionsFinal.get("serialization_null_format"));
+
+    carbonLoadModel.setBadRecordsLoggerEnable(
+        TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + bad_records_logger_enable);
+
+    carbonLoadModel.setBadRecordsAction(
+        TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + bad_records_action.toUpperCase());
+
+    carbonLoadModel.setIsEmptyDataBadRecord(
+        DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
+            optionsFinal.get("is_empty_data_bad_record"));
+
+    carbonLoadModel.setSkipEmptyLine(optionsFinal.get("skip_empty_line"));
+
+    carbonLoadModel.setSortScope(sort_scope);
+    carbonLoadModel.setBatchSortSizeInMb(optionsFinal.get("batch_sort_size_inmb"));
+    carbonLoadModel.setGlobalSortPartitions(global_sort_partitions);
+    carbonLoadModel.setUseOnePass(Boolean.parseBoolean(single_pass));
+
+    if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
+        complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
+        delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
+      throw new InvalidLoadOptionException("Field Delimiter and Complex types delimiter are same");
+    } else {
+      carbonLoadModel.setComplexDelimiterLevel1(
+          CarbonUtil.delimiterConverter(complex_delimeter_level1));
+      carbonLoadModel.setComplexDelimiterLevel2(
+          CarbonUtil.delimiterConverter(complex_delimeter_level2));
+    }
+    // set local dictionary path, and dictionary file extension
+    carbonLoadModel.setAllDictPath(all_dictionary_path);
+    carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter));
+    carbonLoadModel.setCsvHeader(fileHeader);
+    carbonLoadModel.setColDictFilePath(column_dict);
+    carbonLoadModel.setCsvHeaderColumns(
+        LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf));
+
+    int validatedMaxColumns = validateMaxColumns(
+        carbonLoadModel.getCsvHeaderColumns(),
+        optionsFinal.get("maxcolumns"));
+
+    carbonLoadModel.setMaxColumns(String.valueOf(validatedMaxColumns));
+    carbonLoadModel.readAndSetLoadMetadataDetails();
+  }
+
+  private int validateMaxColumns(String[] csvHeaders, String maxColumns)
+      throws InvalidLoadOptionException {
+    /*
+    User configures both csvheadercolumns, maxcolumns,
+      if csvheadercolumns >= maxcolumns, give error
+      if maxcolumns > threashold, give error
+    User configures csvheadercolumns
+      if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
+      if csvheadercolumns >= threashold, give error
+    User configures nothing
+      if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
+      if csvheadercolumns >= threashold, give error
+     */
+    int columnCountInSchema = csvHeaders.length;
+    int maxNumberOfColumnsForParsing = 0;
+    Integer maxColumnsInt = getMaxColumnValue(maxColumns);
+    if (maxColumnsInt != null) {
+      if (columnCountInSchema >= maxColumnsInt) {
+        throw new InvalidLoadOptionException(
+            "csv headers should be less than the max columns " + maxColumnsInt);
+      } else if (maxColumnsInt > CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+        throw new InvalidLoadOptionException(
+            "max columns cannot be greater than the threshold value: " +
+                CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
+      } else {
+        maxNumberOfColumnsForParsing = maxColumnsInt;
+      }
+    } else if (columnCountInSchema >= CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+      throw new InvalidLoadOptionException(
+          "csv header columns should be less than max threashold: " +
+              CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
+    } else if (columnCountInSchema >= CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+      maxNumberOfColumnsForParsing = columnCountInSchema + 1;
+    } else {
+      maxNumberOfColumnsForParsing = CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING;
+    }
+    return maxNumberOfColumnsForParsing;
+  }
+
+  private Integer getMaxColumnValue(String maxColumn) {
+    return (maxColumn == null) ? null : Integer.parseInt(maxColumn);
+  }
+
+  /**
+   * validates both timestamp and date for illegal values
+   */
+  private void validateDateTimeFormat(String dateTimeLoadFormat, String dateTimeLoadOption)
+      throws InvalidLoadOptionException {
+    // allowing empty value to be configured for dateformat option.
+    if (dateTimeLoadFormat != null && !dateTimeLoadFormat.trim().equalsIgnoreCase("")) {
+      try {
+        new SimpleDateFormat(dateTimeLoadFormat);
+      } catch (IllegalArgumentException e) {
+        throw new InvalidLoadOptionException(
+            "Error: Wrong option: " + dateTimeLoadFormat + " is provided for option "
+                + dateTimeLoadOption);
+      }
+    }
+  }
+
+  private void validateSortScope(String sortScope) throws InvalidLoadOptionException {
+    if (sortScope != null) {
+      // Don't support use global sort on partitioned table.
+      if (table.getPartitionInfo(table.getTableName()) != null &&
+          sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString())) {
+        throw new InvalidLoadOptionException("Don't support use global sort on partitioned table.");
+      }
+    }
+  }
+
+  private void validateGlobalSortPartitions(String globalSortPartitions)
+      throws InvalidLoadOptionException {
+    if (globalSortPartitions != null) {
+      try {
+        int num = Integer.parseInt(globalSortPartitions);
+        if (num <= 0) {
+          throw new InvalidLoadOptionException("'GLOBAL_SORT_PARTITIONS' should be greater than 0");
+        }
+      } catch (NumberFormatException e) {
+        throw new InvalidLoadOptionException(e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * check whether using default value or not
+   */
+  private String checkDefaultValue(String value, String defaultValue) {
+    if (StringUtils.isEmpty(value)) {
+      return defaultValue;
+    } else {
+      return value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
new file mode 100644
index 0000000..8ec93a9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provide utilities to populate loading options
+ */
+@InterfaceAudience.Developer
+public class LoadOption {
+
+  private static LogService LOG = LogServiceFactory.getLogService(LoadOption.class.getName());
+
+  /**
+   * Based on the input options, fill and return data loading options with default value
+   */
+  public static Map<String, String> fillOptionWithDefaultValue(
+      Map<String, String> options) throws InvalidLoadOptionException {
+    Map<String, String> optionsFinal = new HashMap<>();
+    optionsFinal.put("delimiter", Maps.getOrDefault(options, "delimiter", ","));
+    optionsFinal.put("quotechar", Maps.getOrDefault(options, "quotechar", "\""));
+    optionsFinal.put("fileheader", Maps.getOrDefault(options, "fileheader", ""));
+    optionsFinal.put("commentchar", Maps.getOrDefault(options, "commentchar", "#"));
+    optionsFinal.put("columndict", Maps.getOrDefault(options, "columndict", null));
+
+    optionsFinal.put(
+        "escapechar",
+        CarbonLoaderUtil.getEscapeChar(Maps.getOrDefault(options,"escapechar", "\\")));
+
+    optionsFinal.put(
+        "serialization_null_format",
+        Maps.getOrDefault(options, "serialization_null_format", "\\N"));
+
+    optionsFinal.put(
+        "bad_records_logger_enable",
+        Maps.getOrDefault(
+            options,
+            "bad_records_logger_enable",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)));
+
+    String badRecordActionValue = CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+
+    optionsFinal.put(
+        "bad_records_action",
+        Maps.getOrDefault(
+            options,
+            "bad_records_action",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+                badRecordActionValue)));
+
+    optionsFinal.put(
+        "is_empty_data_bad_record",
+        Maps.getOrDefault(
+            options,
+            "is_empty_data_bad_record",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)));
+
+    optionsFinal.put(
+        "skip_empty_line",
+        Maps.getOrDefault(
+            options,
+            "skip_empty_line",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)));
+
+    optionsFinal.put(
+        "all_dictionary_path",
+        Maps.getOrDefault(options, "all_dictionary_path", ""));
+
+    optionsFinal.put(
+        "complex_delimiter_level_1",
+        Maps.getOrDefault(options,"complex_delimiter_level_1", "\\$"));
+
+    optionsFinal.put(
+        "complex_delimiter_level_2",
+        Maps.getOrDefault(options, "complex_delimiter_level_2", "\\:"));
+
+    optionsFinal.put(
+        "dateformat",
+        Maps.getOrDefault(
+            options,
+            "dateformat",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)));
+
+    optionsFinal.put(
+        "timestampformat",
+        Maps.getOrDefault(
+            options,
+            "timestampformat",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)));
+
+    optionsFinal.put(
+        "global_sort_partitions",
+        Maps.getOrDefault(
+            options,
+            "global_sort_partitions",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+                null)));
+
+    optionsFinal.put("maxcolumns", Maps.getOrDefault(options, "maxcolumns", null));
+
+    optionsFinal.put(
+        "batch_sort_size_inmb",
+        Maps.getOrDefault(
+            options,
+            "batch_sort_size_inmb",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+                CarbonProperties.getInstance().getProperty(
+                    CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+                    CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))));
+
+    optionsFinal.put(
+        "bad_record_path",
+        Maps.getOrDefault(
+            options,
+            "bad_record_path",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+                CarbonProperties.getInstance().getProperty(
+                    CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+                    CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))));
+
+    String useOnePass = Maps.getOrDefault(
+        options,
+        "single_pass",
+        CarbonProperties.getInstance().getProperty(
+            CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+            CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim().toLowerCase();
+
+    boolean singlePass;
+
+    if (useOnePass.equalsIgnoreCase("true")) {
+      singlePass = true;
+    } else {
+      // when single_pass = false  and if either alldictionarypath
+      // or columnDict is configured the do not allow load
+      if (StringUtils.isNotEmpty(optionsFinal.get("all_dictionary_path")) ||
+          StringUtils.isNotEmpty(optionsFinal.get("columndict"))) {
+        throw new InvalidLoadOptionException(
+            "Can not use all_dictionary_path or columndict without single_pass.");
+      } else {
+        singlePass = false;
+      }
+    }
+
+    optionsFinal.put("single_pass", String.valueOf(singlePass));
+    return optionsFinal;
+  }
+
+  /**
+   * Return CSV header field names
+   */
+  public static String[] getCsvHeaderColumns(
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf) throws IOException {
+    String delimiter;
+    if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) {
+      delimiter = CarbonCommonConstants.COMMA;
+    } else {
+      delimiter = CarbonUtil.delimiterConverter(carbonLoadModel.getCsvDelimiter());
+    }
+    String csvFile = null;
+    String csvHeader = carbonLoadModel.getCsvHeader();
+    String[] csvColumns;
+    if (StringUtils.isBlank(csvHeader)) {
+      // read header from csv file
+      csvFile = carbonLoadModel.getFactFilePath().split(",")[0];
+      csvHeader = CarbonUtil.readHeader(csvFile, hadoopConf);
+      if (StringUtils.isBlank(csvHeader)) {
+        throw new CarbonDataLoadingException("First line of the csv is not valid.");
+      }
+      String[] headers = csvHeader.toLowerCase().split(delimiter);
+      csvColumns = new String[headers.length];
+      for (int i = 0; i < csvColumns.length; i++) {
+        csvColumns[i] = headers[i].replaceAll("\"", "").trim();
+      }
+    } else {
+      String[] headers = csvHeader.toLowerCase().split(CarbonCommonConstants.COMMA);
+      csvColumns = new String[headers.length];
+      for (int i = 0; i < csvColumns.length; i++) {
+        csvColumns[i] = headers[i].trim();
+      }
+    }
+
+    if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
+        carbonLoadModel.getCarbonDataLoadSchema())) {
+      if (csvFile == null) {
+        LOG.error("CSV header in DDL is not proper."
+            + " Column names in schema and CSV header are not the same.");
+        throw new CarbonDataLoadingException(
+            "CSV header in DDL is not proper. Column names in schema and CSV header are "
+                + "not the same.");
+      } else {
+        LOG.error(
+            "CSV header in input file is not proper. Column names in schema and csv header are not "
+                + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile));
+        throw new CarbonDataLoadingException(
+            "CSV header in input file is not proper. Column names in schema and csv header are not "
+                + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile));
+      }
+    }
+    return csvColumns;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index c2f4501..6843946 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -16,14 +16,9 @@
  */
 package org.apache.carbondata.processing.util;
 
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
 import java.util.*;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -39,9 +34,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -57,9 +49,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.merger.NodeBlockRelation;
 import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
-import static org.apache.carbondata.core.enums.EscapeSequences.*;
 
-import com.google.gson.Gson;
+import static org.apache.carbondata.core.enums.EscapeSequences.*;
 
 
 public final class CarbonLoaderUtil {
@@ -287,49 +278,6 @@ public final class CarbonLoaderUtil {
     loadMetadataDetails.setLoadStartTime(loadStartTime);
   }
 
-  public static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
-      List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
-
-    DataOutputStream dataOutputStream;
-    Gson gsonObjectToWrite = new Gson();
-    BufferedWriter brWriter = null;
-
-    AtomicFileOperations writeOperation =
-        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
-
-    try {
-
-      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
-      brWriter.write(metadataInstance);
-    } finally {
-      try {
-        if (null != brWriter) {
-          brWriter.flush();
-        }
-      } catch (Exception e) {
-        LOGGER.error("error in  flushing ");
-
-      }
-      CarbonUtil.closeStreams(brWriter);
-      writeOperation.close();
-    }
-
-  }
-
-  public static String readCurrentTime() {
-    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-    String date = null;
-
-    date = sdf.format(new Date());
-
-    return date;
-  }
-
   public static boolean isValidEscapeSequence(String escapeChar) {
     return escapeChar.equalsIgnoreCase(NEW_LINE.getName()) ||
         escapeChar.equalsIgnoreCase(CARRIAGE_RETURN.getName()) ||
@@ -445,17 +393,6 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * This method will divide the blocks among the nodes as per the data locality
-   *
-   * @param blockInfos
-   * @return
-   */
-  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
-    // -1 if number of nodes has to be decided based on block location information
-    return nodeBlockMapping(blockInfos, -1);
-  }
-
-  /**
    * the method returns the number of required executors
    *
    * @param blockInfos
@@ -830,25 +767,6 @@ public final class CarbonLoaderUtil {
     CarbonUtil.checkAndCreateFolder(segmentFolder);
   }
 
-  /**
-   * This will update the old table status details before clean files to the latest table status.
-   * @param oldList
-   * @param newList
-   * @return
-   */
-  public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
-      LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
-
-    List<LoadMetadataDetails> newListMetadata =
-        new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
-    for (LoadMetadataDetails oldSegment : oldList) {
-      if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
-        newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
-      }
-    }
-    return newListMetadata;
-  }
-
   /*
    * This method will add data size and index size into tablestatus for each segment. And also
    * returns the size of the segment.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
deleted file mode 100644
index 1fdce32..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.util;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.locks.CarbonLockFactory;
-import org.apache.carbondata.core.locks.ICarbonLock;
-import org.apache.carbondata.core.locks.LockUsage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatus;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-public final class DeleteLoadFolders {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
-
-  private DeleteLoadFolders() {
-
-  }
-
-  /**
-   * returns segment path
-   *
-   * @param identifier
-   * @param oneLoad
-   * @return
-   */
-  private static String getSegmentPath(AbsoluteTableIdentifier identifier,
-      LoadMetadataDetails oneLoad) {
-    String segmentId = oneLoad.getLoadName();
-    return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
-  }
-
-  public static void physicalFactAndMeasureMetadataDeletion(
-      AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete) {
-    LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
-    for (LoadMetadataDetails oneLoad : currentDetails) {
-      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
-        String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
-        boolean status = false;
-        try {
-          if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
-            CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
-            CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
-
-              @Override public boolean accept(CarbonFile file) {
-                return (CarbonTablePath.isCarbonDataFile(file.getName())
-                    || CarbonTablePath.isCarbonIndexFile(file.getName())
-                    || CarbonTablePath.isPartitionMapFile(file.getName()));
-              }
-            });
-
-            //if there are no fact and msr metadata files present then no need to keep
-            //entry in metadata.
-            if (filesToBeDeleted.length == 0) {
-              status = true;
-            } else {
-
-              for (CarbonFile eachFile : filesToBeDeleted) {
-                if (!eachFile.delete()) {
-                  LOGGER.warn("Unable to delete the file as per delete command " + eachFile
-                      .getAbsolutePath());
-                  status = false;
-                } else {
-                  status = true;
-                }
-              }
-            }
-            // need to delete the complete folder.
-            if (status) {
-              if (!file.delete()) {
-                LOGGER.warn(
-                    "Unable to delete the folder as per delete command " + file.getAbsolutePath());
-              }
-            }
-
-          } else {
-            LOGGER.warn("Files are not found in segment " + path
-                + " it seems, files are already being deleted");
-          }
-        } catch (IOException e) {
-          LOGGER.warn("Unable to delete the file as per delete command " + path);
-        }
-      }
-    }
-  }
-
-  private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
-      boolean isForceDelete) {
-    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
-        SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() ||
-        SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() ||
-        SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus())
-        && oneLoad.getVisibility().equalsIgnoreCase("true")) {
-      if (isForceDelete) {
-        return true;
-      }
-      long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
-
-      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
-
-    }
-
-    return false;
-  }
-
-  private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
-      boolean isForceDelete) {
-    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
-        SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) {
-      if (isForceDelete) {
-        return true;
-      }
-      long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
-
-      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
-
-    }
-
-    return false;
-  }
-
-  private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
-      String metadataPath) {
-    LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
-    for (LoadMetadataDetails oneLoad : currentDetails) {
-      if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) {
-        return oneLoad;
-      }
-    }
-    return null;
-  }
-
-  public static boolean deleteLoadFoldersFromFileSystem(
-      AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
-      LoadMetadataDetails[] details, String metadataPath) {
-    boolean isDeleted = false;
-    if (details != null && details.length != 0) {
-      for (LoadMetadataDetails oneLoad : details) {
-        if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
-          ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
-              CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
-          try {
-            if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
-                || oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
-              if (segmentLock.lockWithRetries(1, 5)) {
-                LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName());
-                LoadMetadataDetails currentDetails =
-                    getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath);
-                if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails,
-                    isForceDelete)) {
-                  oneLoad.setVisibility("false");
-                  isDeleted = true;
-                  LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
-                }
-              } else {
-                LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName());
-                return isDeleted;
-              }
-            } else {
-              oneLoad.setVisibility("false");
-              isDeleted = true;
-              LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
-            }
-          } finally {
-            segmentLock.unlock();
-            LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released");
-          }
-        }
-      }
-    }
-    return isDeleted;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 6663683..51d2cf9 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -21,7 +21,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-spark-common</artifactId>
+      <artifactId>carbondata-hadoop</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 51ca09c..e06200a 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -28,6 +28,7 @@ import java.util.Objects;
 import org.apache.carbondata.common.Strings;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
@@ -40,7 +41,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.ThriftWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.spark.util.DataLoadingUtil;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
 
 /**
  * Biulder for {@link CarbonWriter}
@@ -94,9 +95,9 @@ public class CarbonWriterBuilder {
   }
 
   /**
-   * Build a {@link CSVCarbonWriter}, which accepts row in CSV format
+   * Build a {@link CarbonWriter}, which accepts row in CSV format
    */
-  public CarbonWriter buildWriterForCSVInput() throws IOException {
+  public CarbonWriter buildWriterForCSVInput() throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(schema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
 
@@ -113,7 +114,7 @@ public class CarbonWriterBuilder {
   }
 
   /**
-   * Build a {@link AvroCarbonWriter}, which accepts Avro object
+   * Build a {@link CarbonWriter}, which accepts Avro object
    * @return
    * @throws IOException
    */
@@ -184,11 +185,13 @@ public class CarbonWriterBuilder {
   /**
    * Build a {@link CarbonLoadModel}
    */
-  private CarbonLoadModel buildLoadModel(CarbonTable table) {
+  private CarbonLoadModel buildLoadModel(CarbonTable table)
+      throws InvalidLoadOptionException, IOException {
     Map<String, String> options = new HashMap<>();
     if (sortColumns != null) {
       options.put("sort_columns", Strings.mkString(sortColumns, ","));
     }
-    return DataLoadingUtil.buildCarbonLoadModelJava(table, options);
+    CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
+    return builder.build(options);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
index 531ec7c..aca2b2d 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
@@ -77,7 +77,7 @@ public class CSVCarbonWriterSuite {
         writer.write(new String[]{"robot" + i, String.valueOf(i), String.valueOf((double) i / 2)});
       }
       writer.close();
-    } catch (IOException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index c417fbe..843a9d1 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -34,10 +34,9 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.util.DataLoadingUtil
 import org.apache.carbondata.streaming.segment.StreamSegment
 
 /**
@@ -208,17 +207,15 @@ object StreamSinkFactory {
       segmentId: String): CarbonLoadModel = {
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
-    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, parameters)
+    val optionsFinal = LoadOption.fillOptionWithDefaultValue(parameters.asJava)
     optionsFinal.put("sort_scope", "no_sort")
     if (parameters.get("fileheader").isEmpty) {
       optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName)
         .asScala.map(_.getColName).mkString(","))
     }
     val carbonLoadModel = new CarbonLoadModel()
-    DataLoadingUtil.buildCarbonLoadModel(
-      carbonTable,
-      carbonProperty,
-      parameters,
+    new CarbonLoadModelBuilder(carbonTable).build(
+      parameters.asJava,
       optionsFinal,
       carbonLoadModel,
       hadoopConf)


[2/3] carbondata git commit: [CARBONDATA-2159] Remove carbon-spark dependency in store-sdk module

Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
deleted file mode 100644
index 9c5ab69..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.collection.{immutable, mutable}
-import scala.collection.JavaConverters._
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.util.CarbonException
-
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.ValidateUtil
-
-/**
- * the util object of data loading
- */
-object DataLoadingUtil {
-
-  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
-  /**
-   * get data loading options and initialise default value
-   */
-  def getDataLoadingOptions(
-      carbonProperty: CarbonProperties,
-      options: immutable.Map[String, String]): mutable.Map[String, String] = {
-    val optionsFinal = scala.collection.mutable.Map[String, String]()
-    optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
-    optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
-    optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
-    optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
-    optionsFinal.put("columndict", options.getOrElse("columndict", null))
-
-    optionsFinal.put("escapechar",
-      CarbonLoaderUtil.getEscapeChar(options.getOrElse("escapechar", "\\")))
-
-    optionsFinal.put(
-      "serialization_null_format",
-      options.getOrElse("serialization_null_format", "\\N"))
-
-    optionsFinal.put(
-      "bad_records_logger_enable",
-      options.getOrElse(
-        "bad_records_logger_enable",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
-
-    val badRecordActionValue = carbonProperty.getProperty(
-      CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
-      CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-
-    optionsFinal.put(
-      "bad_records_action",
-      options.getOrElse(
-        "bad_records_action",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-          badRecordActionValue)))
-
-    optionsFinal.put(
-      "is_empty_data_bad_record",
-      options.getOrElse(
-        "is_empty_data_bad_record",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
-
-    optionsFinal.put(
-      "skip_empty_line",
-      options.getOrElse(
-        "skip_empty_line",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)))
-
-    optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
-
-    optionsFinal.put(
-      "complex_delimiter_level_1",
-      options.getOrElse("complex_delimiter_level_1", "\\$"))
-
-    optionsFinal.put(
-      "complex_delimiter_level_2",
-      options.getOrElse("complex_delimiter_level_2", "\\:"))
-
-    optionsFinal.put(
-      "dateformat",
-      options.getOrElse(
-        "dateformat",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
-
-    optionsFinal.put(
-      "timestampformat",
-      options.getOrElse(
-        "timestampformat",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
-          CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)))
-
-    optionsFinal.put(
-      "global_sort_partitions",
-      options.getOrElse(
-        "global_sort_partitions",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
-          null)))
-
-    optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
-
-    optionsFinal.put(
-      "batch_sort_size_inmb",
-      options.getOrElse(
-        "batch_sort_size_inmb",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
-          carbonProperty.getProperty(
-            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
-            CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
-
-    optionsFinal.put(
-      "bad_record_path",
-      options.getOrElse(
-        "bad_record_path",
-        carbonProperty.getProperty(
-          CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
-          carbonProperty.getProperty(
-            CarbonCommonConstants.CARBON_BADRECORDS_LOC,
-            CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
-
-    val useOnePass = options.getOrElse(
-      "single_pass",
-      carbonProperty.getProperty(
-        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
-        CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
-      case "true" =>
-        true
-      case "false" =>
-        // when single_pass = false  and if either alldictionarypath
-        // or columnDict is configured the do not allow load
-        if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
-            StringUtils.isNotEmpty(optionsFinal("columndict"))) {
-          throw new MalformedCarbonCommandException(
-            "Can not use all_dictionary_path or columndict without single_pass.")
-        } else {
-          false
-        }
-      case illegal =>
-        val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-        LOGGER.error(s"Can't use single_pass, because illegal syntax found: [$illegal] " +
-                     "Please set it as 'true' or 'false'")
-        false
-    }
-    optionsFinal.put("single_pass", useOnePass.toString)
-    optionsFinal
-  }
-
-  /**
-   * check whether using default value or not
-   */
-  private def checkDefaultValue(value: String, default: String) = {
-    if (StringUtils.isEmpty(value)) {
-      default
-    } else {
-      value
-    }
-  }
-
-  /**
-   * build CarbonLoadModel for data loading
-   * @param table CarbonTable object containing all metadata information for the table
-   *              like table name, table path, schema, etc
-   * @param options Load options from user input
-   * @return a new CarbonLoadModel instance
-   */
-  def buildCarbonLoadModelJava(
-      table: CarbonTable,
-      options: java.util.Map[String, String]
-  ): CarbonLoadModel = {
-    val carbonProperty: CarbonProperties = CarbonProperties.getInstance
-    val optionsFinal = getDataLoadingOptions(carbonProperty, options.asScala.toMap)
-    optionsFinal.put("sort_scope", "no_sort")
-    if (!options.containsKey("fileheader")) {
-      val csvHeader = table.getCreateOrderColumn(table.getTableName)
-        .asScala.map(_.getColName).mkString(",")
-      optionsFinal.put("fileheader", csvHeader)
-    }
-    val model = new CarbonLoadModel()
-    buildCarbonLoadModel(
-      table = table,
-      carbonProperty = carbonProperty,
-      options = options.asScala.toMap,
-      optionsFinal = optionsFinal,
-      carbonLoadModel = model,
-      hadoopConf = null)  // we have provided 'fileheader', so it can be null
-
-    // set default values
-    model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
-    model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
-    model.setUseOnePass(options.asScala.getOrElse("onepass", "false").toBoolean)
-    model.setDictionaryServerHost(options.asScala.getOrElse("dicthost", null))
-    model.setDictionaryServerPort(options.asScala.getOrElse("dictport", "-1").toInt)
-    model
-  }
-
-  /**
-   * build CarbonLoadModel for data loading
-   * @param table CarbonTable object containing all metadata information for the table
-   *              like table name, table path, schema, etc
-   * @param carbonProperty Carbon property instance
-   * @param options Load options from user input
-   * @param optionsFinal Load options that populated with default values for optional options
-   * @param carbonLoadModel The output load model
-   * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
-   *                   user provided load options
-   */
-  def buildCarbonLoadModel(
-      table: CarbonTable,
-      carbonProperty: CarbonProperties,
-      options: immutable.Map[String, String],
-      optionsFinal: mutable.Map[String, String],
-      carbonLoadModel: CarbonLoadModel,
-      hadoopConf: Configuration): Unit = {
-    carbonLoadModel.setTableName(table.getTableName)
-    carbonLoadModel.setDatabaseName(table.getDatabaseName)
-    carbonLoadModel.setTablePath(table.getTablePath)
-    carbonLoadModel.setTableName(table.getTableName)
-    val dataLoadSchema = new CarbonDataLoadSchema(table)
-    // Need to fill dimension relation
-    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
-    val sort_scope = optionsFinal("sort_scope")
-    val single_pass = optionsFinal("single_pass")
-    val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
-    val bad_records_action = optionsFinal("bad_records_action")
-    var bad_record_path = optionsFinal("bad_record_path")
-    val global_sort_partitions = optionsFinal("global_sort_partitions")
-    val timestampformat = optionsFinal("timestampformat")
-    val dateFormat = optionsFinal("dateformat")
-    val delimeter = optionsFinal("delimiter")
-    val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
-    val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
-    val all_dictionary_path = optionsFinal("all_dictionary_path")
-    val column_dict = optionsFinal("columndict")
-    ValidateUtil.validateDateTimeFormat(timestampformat, "TimestampFormat")
-    ValidateUtil.validateDateTimeFormat(dateFormat, "DateFormat")
-    ValidateUtil.validateSortScope(table, sort_scope)
-
-    if (bad_records_logger_enable.toBoolean ||
-        LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
-      bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path)
-      if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
-        CarbonException.analysisException("Invalid bad records location.")
-      }
-    }
-    carbonLoadModel.setBadRecordsLocation(bad_record_path)
-
-    ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
-    carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
-    carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
-    carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
-
-    // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
-    // we should use table schema to generate file header.
-    var fileHeader = optionsFinal("fileheader")
-    val headerOption = options.get("header")
-    if (headerOption.isDefined) {
-      // whether the csv file has file header
-      // the default value is true
-      val header = try {
-        headerOption.get.toBoolean
-      } catch {
-        case ex: IllegalArgumentException =>
-          throw new MalformedCarbonCommandException(
-            "'header' option should be either 'true' or 'false'. " + ex.getMessage)
-      }
-      if (header) {
-        if (fileHeader.nonEmpty) {
-          throw new MalformedCarbonCommandException(
-            "When 'header' option is true, 'fileheader' option is not required.")
-        }
-      } else {
-        if (fileHeader.isEmpty) {
-          fileHeader = table.getCreateOrderColumn(table.getTableName)
-            .asScala.map(_.getColName).mkString(",")
-        }
-      }
-    }
-
-    carbonLoadModel.setTimestampformat(timestampformat)
-    carbonLoadModel.setDateFormat(dateFormat)
-    carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
-      CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
-      CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-
-    carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
-      CarbonCommonConstants.CARBON_DATE_FORMAT,
-      CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
-
-    carbonLoadModel.setSerializationNullFormat(
-        TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
-        optionsFinal("serialization_null_format"))
-
-    carbonLoadModel.setBadRecordsLoggerEnable(
-        TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
-
-    carbonLoadModel.setBadRecordsAction(
-        TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action.toUpperCase)
-
-    carbonLoadModel.setIsEmptyDataBadRecord(
-        DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
-        optionsFinal("is_empty_data_bad_record"))
-
-    carbonLoadModel.setSkipEmptyLine(optionsFinal("skip_empty_line"))
-
-    carbonLoadModel.setSortScope(sort_scope)
-    carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
-    carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
-    carbonLoadModel.setUseOnePass(single_pass.toBoolean)
-
-    if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
-        complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
-        delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
-      CarbonException.analysisException(s"Field Delimiter and Complex types delimiter are same")
-    } else {
-      carbonLoadModel.setComplexDelimiterLevel1(
-        CarbonUtil.delimiterConverter(complex_delimeter_level1))
-      carbonLoadModel.setComplexDelimiterLevel2(
-        CarbonUtil.delimiterConverter(complex_delimeter_level2))
-    }
-    // set local dictionary path, and dictionary file extension
-    carbonLoadModel.setAllDictPath(all_dictionary_path)
-    carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
-    carbonLoadModel.setCsvHeader(fileHeader)
-    carbonLoadModel.setColDictFilePath(column_dict)
-    carbonLoadModel.setCsvHeaderColumns(
-      CommonUtil.getCsvHeaderColumns(carbonLoadModel, hadoopConf))
-
-    val validatedMaxColumns = CommonUtil.validateMaxColumns(
-      carbonLoadModel.getCsvHeaderColumns,
-      optionsFinal("maxcolumns"))
-
-    carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
-    if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
-    }
-  }
-
-  private def isLoadDeletionRequired(metaDataLocation: String): Boolean = {
-    val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
-    if (details != null && details.nonEmpty) for (oneRow <- details) {
-      if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
-           SegmentStatus.COMPACTED == oneRow.getSegmentStatus ||
-           SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus ||
-           SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus) &&
-          oneRow.getVisibility.equalsIgnoreCase("true")) {
-        return true
-      }
-    }
-    false
-  }
-
-  def deleteLoadsAndUpdateMetadata(
-      isForceDeletion: Boolean,
-      carbonTable: CarbonTable): Unit = {
-    if (isLoadDeletionRequired(carbonTable.getMetadataPath)) {
-      val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-      val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      val carbonTableStatusLock =
-        CarbonLockFactory.getCarbonLockObj(
-          absoluteTableIdentifier,
-          LockUsage.TABLE_STATUS_LOCK
-        )
-
-      // Delete marked loads
-      val isUpdationRequired =
-        DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
-          absoluteTableIdentifier,
-          isForceDeletion,
-          details,
-          carbonTable.getMetadataPath
-        )
-
-      var updationCompletionStaus = false
-
-      if (isUpdationRequired) {
-        try {
-          // Update load metadate file after cleaning deleted nodes
-          if (carbonTableStatusLock.lockWithRetries()) {
-            LOGGER.info("Table status lock has been successfully acquired.")
-
-            // read latest table status again.
-            val latestMetadata = SegmentStatusManager
-              .readLoadMetadata(carbonTable.getMetadataPath)
-
-            // update the metadata details from old to new status.
-            val latestStatus = CarbonLoaderUtil
-              .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
-            CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus)
-          } else {
-            val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
-            val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
-            val errorMsg = "Clean files request is failed for " +
-                           s"$dbName.$tableName" +
-                           ". Not able to acquire the table status lock due to other operation " +
-                           "running in the background."
-            LOGGER.audit(errorMsg)
-            LOGGER.error(errorMsg)
-            throw new Exception(errorMsg + " Please try after some time.")
-          }
-          updationCompletionStaus = true
-        } finally {
-          CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
-        }
-        if (updationCompletionStaus) {
-          DeleteLoadFolders
-            .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier,
-              carbonTable.getMetadataPath, isForceDeletion)
-        }
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index bbf345c..bcb7bd9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -321,7 +321,7 @@ object GlobalDictionaryUtil {
       carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
     // get load count
     if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
     }
     val absoluteTableIdentifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, table)
     DictionaryLoadModel(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 3da603b..2dcff81 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.PartitionUtils
 
 import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f47c9bc..c7dd553 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -163,7 +163,7 @@ object CarbonDataRDDFactory {
     if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
       // update the updated table status. For the case of Update Delta Compaction the Metadata
       // is filled in LoadModel, no need to refresh.
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
     }
 
     val compactionThread = new Thread {
@@ -282,7 +282,7 @@ object CarbonDataRDDFactory {
     loadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
     loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
     loadModel.setTablePath(table.getTablePath)
-    CommonUtil.readLoadMetadataDetails(loadModel)
+    loadModel.readAndSetLoadMetadataDetails()
     val loadStartTime = CarbonUpdateUtil.readCurrentTime()
     loadModel.setFactTimeStamp(loadStartTime)
     loadModel

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index ddc8586..1e19111 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -77,7 +77,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       }
 
       // scan again and determine if anything is there to merge again.
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
       segList = carbonLoadModel.getLoadMetadataDetails
       // in case of major compaction we will scan only once and come out as it will keep
       // on doing major for the new loads also.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index e61b636..5dcca6d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -35,13 +35,13 @@ import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory}
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index c4d32b4..e93ab25 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -22,9 +22,9 @@ import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
 
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 
 /**
  * Below command class will be used to create datamap on table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 1fa2494..e5db286 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
 import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException}
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -34,7 +35,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.events._
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, NoSuchDataMapException}
 
 /**
  * Drops the datamap and any related tables associated with the datamap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index febb83e..18c1339 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -24,12 +24,13 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel, DataCommand}
+import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.AlterTableUtil
 
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -44,7 +45,6 @@ import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCo
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.CommonUtil
 import org.apache.carbondata.streaming.StreamHandoffRDD
@@ -89,8 +89,6 @@ case class CarbonAlterTableCompactionCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService =
       LogServiceFactory.getLogService(this.getClass.getName)
-    val tableName = alterTableModel.tableName.toLowerCase
-    val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
     val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
     if (isLoadInProgress) {
       val message = "Cannot run data loading and compaction on same table concurrently. " +
@@ -146,8 +144,7 @@ case class CarbonAlterTableCompactionCommand(
       operationContext: OperationContext): Unit = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
     val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
-    val compactionSize: Long = CarbonDataMergerUtil
-      .getCompactionSize(compactionType, carbonLoadModel)
+    val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel)
     if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
       if (alterTableModel.segmentUpdateStatusManager.isDefined) {
         carbonLoadModel.setSegmentUpdateStatusManager(
@@ -162,7 +159,7 @@ case class CarbonAlterTableCompactionCommand(
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
-      CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+      carbonLoadModel.readAndSetLoadMetadataDetails()
     }
 
     if (compactionType == CompactionType.STREAMING) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index c7b59d4..f105778 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -68,13 +69,12 @@ import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
 
 case class CarbonLoadDataCommand(
     databaseNameOp: Option[String],
@@ -148,7 +148,7 @@ case class CarbonLoadDataCommand(
     val carbonLoadModel = new CarbonLoadModel()
     try {
       val tableProperties = table.getTableInfo.getFactTable.getTableProperties
-      val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+      val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
       optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
         carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
           carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
@@ -163,10 +163,8 @@ case class CarbonLoadDataCommand(
       carbonLoadModel.setAggLoadRequest(
         internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
       carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
-      DataLoadingUtil.buildCarbonLoadModel(
-        table,
-        carbonProperty,
-        options,
+      new CarbonLoadModelBuilder(table).build(
+        options.asJava,
         optionsFinal,
         carbonLoadModel,
         hadoopConf
@@ -183,7 +181,7 @@ case class CarbonLoadDataCommand(
             carbonLoadModel,
             factPath,
             dataFrame.isDefined,
-            optionsFinal.asJava,
+            optionsFinal,
             options.asJava,
             isOverwriteTable)
         operationContext.setProperty("isOverwrite", isOverwriteTable)
@@ -191,7 +189,7 @@ case class CarbonLoadDataCommand(
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         // Clean up the old invalid segment data before creating a new entry for new load.
-        DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false)
         // add the start entry for the new load in the table status file
         if (updateModel.isEmpty && !table.isHivePartitionTable) {
           CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 874d416..50c5eca 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,13 +22,13 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 /**
  * IUD update delete and compaction framework.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 2f12bef..bbea15b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.storage.StorageLevel
 
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -32,7 +33,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
 import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 private[sql] case class CarbonProjectForUpdateCommand(
     plan: LogicalPlan,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
index 5817d88..220d75d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
@@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.HiveSessionCatalog
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * Util for IUD common function

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index fed4235..bf72325 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -33,7 +33,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.spark.util.DataLoadingUtil
 
 /**
  * Below command class will be used to create pre-aggregate table
@@ -182,7 +181,7 @@ case class CreatePreAggregateTableCommand(
     // This will be used to check if the parent table has any segments or not. If not then no
     // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
     // table.
-    DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, parentTable)
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
     val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
     if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
       load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index feef7a1..7e3b80e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 import org.apache.spark.sql.types.DataType
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchem
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 7a56dbf..fc780cb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.util.AlterTableUtil
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -37,7 +38,6 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 private[sql] case class CarbonAlterTableRenameCommand(
     alterTableRenameModel: AlterTableRenameModel)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 987d4fe..9e0cee5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -18,12 +18,12 @@ package org.apache.spark.sql.execution.command.timeseries
 
 import org.apache.spark.sql.execution.command.{DataMapField, Field}
 
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
 import org.apache.carbondata.core.metadata.schema.datamap.Granularity
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.preagg.TimeSeriesUDF
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 
 /**
  * Utility class for time series to keep

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 945f47f..072216c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -46,8 +46,8 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
 
 class CarbonFileFormat
   extends FileFormat
@@ -82,7 +82,7 @@ with Serializable {
       TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
     val model = new CarbonLoadModel
     val carbonProperty = CarbonProperties.getInstance()
-    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+    val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
     val tableProperties = table.getTableInfo.getFactTable.getTableProperties
     optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
       carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
@@ -97,14 +97,11 @@ with Serializable {
     val optionsLocal = new mutable.HashMap[String, String]()
     optionsLocal ++= options
     optionsLocal += (("header", "false"))
-    DataLoadingUtil.buildCarbonLoadModel(
-      table,
-      carbonProperty,
-      optionsLocal.toMap,
+    new CarbonLoadModelBuilder(table).build(
+      optionsLocal.toMap.asJava,
       optionsFinal,
       model,
-      conf
-    )
+      conf)
     model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
     model.setDictionaryServerHost(options.getOrElse("dicthost", null))
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index b174b94..b36d7c0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -32,10 +32,10 @@ import org.apache.spark.sql.execution.datasources.RefreshTable
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.merger.CompactionType
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * Carbon strategies for ddl commands

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 608ec60..7028dcf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command.AlterTableRenameCommand
 import org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, CarbonProjectForUpdateCommand}
 import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
 
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 /**
  * Strategy for streaming table, like blocking unsupported operation

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 7ca34af..27c7d17 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
   extends RunnableCommand {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 4045478..542115e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -36,9 +36,9 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index ad6d0c7..ef4836e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -32,10 +32,10 @@ import org.apache.spark.sql.types.StructField
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.CarbonReflectionUtils
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.hadoop.util.SchemaReader
 import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index bc36e9c..aaa87a3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog}
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -39,7 +40,6 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 object AlterTableUtil {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
index bc62902..a8094b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
@@ -19,10 +19,10 @@ package org.apache.spark.util
 
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * table api util

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index 3c151f0..a0dd7d9 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -43,10 +43,10 @@ import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlPa
 import org.apache.spark.sql.types.DecimalType
 import org.apache.spark.util.CarbonReflectionUtils
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index c676b01..5ade510 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -4,7 +4,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 /**
  * Created by rahul on 19/9/17.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 71c5477..0b37e46 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.TableOptionConstant
 
 /**
@@ -63,7 +63,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
       CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
     carbonLoadModel.setCsvHeaderColumns(
-      CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
+      LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
     // Create table and metadata folders if not exist
     val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
     val fileType = FileFactory.getFileType(metadataDirectoryPath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index c0e1781..6149e82 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -23,14 +23,14 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.processing.util.TableOptionConstant
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 /**
  * test case for external column dictionary generation
@@ -176,7 +176,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
       CarbonCommonConstants.CARBON_DATE_FORMAT,
       CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
     carbonLoadModel.setCsvHeaderColumns(
-      CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
+      LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
     carbonLoadModel.setMaxColumns("100")
     // Create table and metadata folders if not exist
     val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 2552ca8..afb34b9 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -31,12 +31,12 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.types.StructType
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 9da7244..65a006b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -21,11 +21,11 @@ import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.apache.spark.sql.execution.exchange.ShuffleExchange
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index d36dd26..b035834 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -26,9 +26,9 @@ import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index fef2da6..31c5b27 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -18,14 +18,16 @@
 package org.apache.carbondata.processing.loading.model;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 
 import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 public class CarbonLoadModel implements Serializable {
 
@@ -785,4 +787,13 @@ public class CarbonLoadModel implements Serializable {
   public void setSkipEmptyLine(String skipEmptyLine) {
     this.skipEmptyLine = skipEmptyLine;
   }
+
+  /**
+   * Read segments metadata from table status file and set it to this load model object
+   */
+  public void readAndSetLoadMetadataDetails() {
+    String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metadataPath);
+    setLoadMetadataDetails(Arrays.asList(details));
+  }
 }


[3/3] carbondata git commit: [CARBONDATA-2159] Remove carbon-spark dependency in store-sdk module

Posted by ja...@apache.org.
[CARBONDATA-2159] Remove carbon-spark dependency in store-sdk module

To make assembling JAR of store-sdk module, it should not depend on carbon-spark module

This closes #1970


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0d50f654
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0d50f654
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0d50f654

Branch: refs/heads/carbonstore
Commit: 0d50f65461ae3855db66f44fa06e01174de50ccd
Parents: a848ccf
Author: Jacky Li <ja...@qq.com>
Authored: Sun Feb 11 21:37:04 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Feb 12 16:06:49 2018 +0800

----------------------------------------------------------------------
 .../java/org/apache/carbondata/common/Maps.java |  39 ++
 .../org/apache/carbondata/common/Strings.java   |   3 +
 .../ConcurrentOperationException.java           |  56 +++
 .../exceptions/TableStatusLockException.java    |  34 ++
 .../sql/InvalidLoadOptionException.java         |  33 ++
 .../sql/MalformedCarbonCommandException.java    |  75 +++
 .../sql/MalformedDataMapCommandException.java   |  37 ++
 .../exceptions/sql/NoSuchDataMapException.java  |  39 ++
 .../statusmanager/SegmentStatusManager.java     | 124 +++++
 .../carbondata/core/util/DeleteLoadFolders.java | 200 ++++++++
 .../preaggregate/TestPreAggCreateCommand.scala  |   2 +-
 .../preaggregate/TestPreAggregateDrop.scala     |   2 +-
 .../timeseries/TestTimeSeriesCreateTable.scala  |   2 +-
 .../timeseries/TestTimeSeriesDropSuite.scala    |   2 +-
 .../TestTimeseriesTableSelection.scala          |   2 +-
 .../TestDataLoadWithColumnsMoreThanSchema.scala |   3 +-
 .../dataload/TestGlobalSortDataLoad.scala       |   2 +-
 .../TestLoadDataWithDiffTimestampFormat.scala   |   2 +-
 .../TestLoadDataWithFileHeaderException.scala   |  11 +-
 ...ataWithMalformedCarbonCommandException.scala |   3 +-
 .../testsuite/dataload/TestLoadOptions.scala    |   2 +-
 .../dataload/TestTableLevelBlockSize.scala      |   4 +-
 .../testsuite/datamap/TestDataMapCommand.scala  |   2 +-
 .../dataretention/DataRetentionTestCase.scala   |   2 +-
 .../spark/testsuite/datetype/DateTypeTest.scala |   2 +-
 .../testsuite/sortcolumns/TestSortColumns.scala |   3 +-
 integration/spark-common/pom.xml                |   5 -
 .../exception/ConcurrentOperationException.java |  44 --
 .../MalformedCarbonCommandException.java        |  69 ---
 .../MalformedDataMapCommandException.java       |  32 --
 .../spark/exception/NoSuchDataMapException.java |  33 --
 .../org/apache/carbondata/api/CarbonStore.scala |   6 +-
 .../spark/CarbonColumnValidator.scala           |   8 +-
 .../carbondata/spark/load/ValidateUtil.scala    |  71 ---
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   6 +-
 .../carbondata/spark/util/CommonUtil.scala      |  56 +--
 .../carbondata/spark/util/DataLoadingUtil.scala | 451 -------------------
 .../spark/util/GlobalDictionaryUtil.scala       |   2 +-
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 +-
 .../spark/rdd/CarbonTableCompactor.scala        |   2 +-
 .../org/apache/spark/sql/CarbonSource.scala     |   2 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |   2 +-
 .../datamap/CarbonDropDataMapCommand.scala      |   2 +-
 .../CarbonAlterTableCompactionCommand.scala     |  11 +-
 .../management/CarbonLoadDataCommand.scala      |  18 +-
 .../CarbonProjectForDeleteCommand.scala         |   2 +-
 .../CarbonProjectForUpdateCommand.scala         |   2 +-
 .../command/mutation/IUDCommonUtil.scala        |   2 +-
 .../CreatePreAggregateTableCommand.scala        |   3 +-
 .../preaaggregate/PreAggregateUtil.scala        |   2 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |   2 +-
 .../command/timeseries/TimeSeriesUtil.scala     |   2 +-
 .../datasources/CarbonFileFormat.scala          |  15 +-
 .../sql/execution/strategy/DDLStrategy.scala    |   2 +-
 .../strategy/StreamingTableStrategy.scala       |   2 +-
 .../execution/command/CarbonHiveCommands.scala  |   2 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   2 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   2 +-
 .../org/apache/spark/util/AlterTableUtil.scala  |   2 +-
 .../org/apache/spark/util/TableAPIUtil.scala    |   2 +-
 .../spark/sql/hive/CarbonSessionState.scala     |   2 +-
 .../segmentreading/TestSegmentReading.scala     |   2 +-
 .../spark/util/AllDictionaryTestCase.scala      |   4 +-
 .../util/ExternalColumnDictionaryTestCase.scala |   6 +-
 .../TestStreamingTableOperation.scala           |   2 +-
 .../bucketing/TableBucketingTestCase.scala      |   2 +-
 .../vectorreader/AddColumnTestCases.scala       |   2 +-
 .../loading/model/CarbonLoadModel.java          |  13 +-
 .../loading/model/CarbonLoadModelBuilder.java   | 322 +++++++++++++
 .../processing/loading/model/LoadOption.java    | 251 +++++++++++
 .../processing/util/CarbonLoaderUtil.java       |  84 +---
 .../processing/util/DeleteLoadFolders.java      | 200 --------
 store/sdk/pom.xml                               |   2 +-
 .../sdk/file/CarbonWriterBuilder.java           |  15 +-
 .../sdk/file/CSVCarbonWriterSuite.java          |   2 +-
 .../streaming/StreamSinkFactory.scala           |  11 +-
 77 files changed, 1330 insertions(+), 1146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/Maps.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/Maps.java b/common/src/main/java/org/apache/carbondata/common/Maps.java
new file mode 100644
index 0000000..14fc329
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/Maps.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common;
+
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+@InterfaceAudience.Developer
+public class Maps {
+
+  /**
+   * Return value if key is contained in the map, else return defauleValue.
+   * This is added to avoid JDK 8 dependency
+   */
+  public static <K, V> V getOrDefault(Map<K, V> map, K key, V defaultValue) {
+    V value = map.get(key);
+    if (value != null) {
+      return value;
+    } else {
+      return defaultValue;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/Strings.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/Strings.java b/common/src/main/java/org/apache/carbondata/common/Strings.java
index 23288dd..08fdc3c 100644
--- a/common/src/main/java/org/apache/carbondata/common/Strings.java
+++ b/common/src/main/java/org/apache/carbondata/common/Strings.java
@@ -19,6 +19,9 @@ package org.apache.carbondata.common;
 
 import java.util.Objects;
 
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+@InterfaceAudience.Developer
 public class Strings {
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java
new file mode 100644
index 0000000..a14d161
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown when executing concurrent operations which
+ * is not supported in carbon.
+ *
+ * For example, when INSERT OVERWRITE is executing, other operations are not
+ * allowed, so this exception will be thrown
+ */
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class ConcurrentOperationException extends Exception {
+
+  /**
+   * The Error message.
+   */
+  private String msg;
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public ConcurrentOperationException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java
new file mode 100644
index 0000000..89cfd46
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown when failed to acquire lock for table status metadata,
+ * or re-try timed out
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class TableStatusLockException extends RuntimeException {
+
+  public TableStatusLockException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java
new file mode 100644
index 0000000..41b2434
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions.sql;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown when loading option is invalid for SQL
+ * loading statement (LOAD DATA, INSERT INTO)
+ */
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class InvalidLoadOptionException extends MalformedCarbonCommandException {
+  public InvalidLoadOptionException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java
new file mode 100644
index 0000000..5fe3ce8
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions.sql;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown if any validation fails then parsing
+ * SQL statement.
+ */
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class MalformedCarbonCommandException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public MalformedCarbonCommandException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public MalformedCarbonCommandException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override
+  public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java
new file mode 100644
index 0000000..7c25b2c
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions.sql;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown when Datamap related SQL statement is invalid
+ */
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class MalformedDataMapCommandException extends MalformedCarbonCommandException {
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  public MalformedDataMapCommandException(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java
new file mode 100644
index 0000000..7ab9048
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions.sql;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown if datamap is not found when executing datamap
+ * related SQL statement
+ */
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class NoSuchDataMapException extends MalformedCarbonCommandException {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  public NoSuchDataMapException(String dataMapName, String tableName) {
+    super("Datamap with name " + dataMapName + " does not exist under table " + tableName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index c613735..2e73aef 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.carbondata.common.exceptions.TableStatusLockException;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -46,6 +47,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DeleteLoadFolders;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import com.google.gson.Gson;
@@ -708,4 +710,126 @@ public class SegmentStatusManager {
     }
   }
 
+  private static boolean isLoadDeletionRequired(String metaDataLocation) {
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+    if (details != null && details.length > 0) {
+      for (LoadMetadataDetails oneRow : details) {
+        if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus()
+            || SegmentStatus.COMPACTED == oneRow.getSegmentStatus()
+            || SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus()
+            || SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus())
+            && oneRow.getVisibility().equalsIgnoreCase("true")) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  /**
+   * This will update the old table status details before clean files to the latest table status.
+   * @param oldList
+   * @param newList
+   * @return
+   */
+  public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
+      LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
+
+    List<LoadMetadataDetails> newListMetadata =
+        new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
+    for (LoadMetadataDetails oldSegment : oldList) {
+      if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
+        newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
+      }
+    }
+    return newListMetadata;
+  }
+
+  private static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
+      List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+    String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
+
+    DataOutputStream dataOutputStream;
+    Gson gsonObjectToWrite = new Gson();
+    BufferedWriter brWriter = null;
+
+    AtomicFileOperations writeOperation =
+        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+
+    try {
+
+      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
+      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+          Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
+      brWriter.write(metadataInstance);
+    } finally {
+      try {
+        if (null != brWriter) {
+          brWriter.flush();
+        }
+      } catch (Exception e) {
+        LOG.error("error in  flushing ");
+
+      }
+      CarbonUtil.closeStreams(brWriter);
+      writeOperation.close();
+    }
+  }
+
+  public static void deleteLoadsAndUpdateMetadata(
+      CarbonTable carbonTable,
+      boolean isForceDeletion) throws IOException {
+    if (isLoadDeletionRequired(carbonTable.getMetadataPath())) {
+      LoadMetadataDetails[] details =
+          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+      AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+      ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
+          identifier, LockUsage.TABLE_STATUS_LOCK);
+
+      // Delete marked loads
+      boolean isUpdationRequired = DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
+          identifier, isForceDeletion, details, carbonTable.getMetadataPath());
+
+      boolean updationCompletionStatus = false;
+
+      if (isUpdationRequired) {
+        try {
+          // Update load metadate file after cleaning deleted nodes
+          if (carbonTableStatusLock.lockWithRetries()) {
+            LOG.info("Table status lock has been successfully acquired.");
+
+            // read latest table status again.
+            LoadMetadataDetails[] latestMetadata =
+                SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+
+            // update the metadata details from old to new status.
+            List<LoadMetadataDetails> latestStatus =
+                updateLoadMetadataFromOldToNew(details, latestMetadata);
+
+            writeLoadMetadata(identifier, latestStatus);
+          } else {
+            String dbName = identifier.getCarbonTableIdentifier().getDatabaseName();
+            String tableName = identifier.getCarbonTableIdentifier().getTableName();
+            String errorMsg = "Clean files request is failed for " +
+                dbName + "." + tableName +
+                ". Not able to acquire the table status lock due to other operation " +
+                "running in the background.";
+            LOG.audit(errorMsg);
+            LOG.error(errorMsg);
+            throw new TableStatusLockException(errorMsg + " Please try after some time.");
+          }
+          updationCompletionStatus = true;
+        } finally {
+          CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
+          if (updationCompletionStatus) {
+            DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
+                identifier, carbonTable.getMetadataPath(), isForceDeletion);
+          }
+        }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
new file mode 100644
index 0000000..ba4c4fc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+public final class DeleteLoadFolders {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
+
+  private DeleteLoadFolders() {
+
+  }
+
+  /**
+   * returns segment path
+   *
+   * @param identifier
+   * @param oneLoad
+   * @return
+   */
+  private static String getSegmentPath(AbsoluteTableIdentifier identifier,
+      LoadMetadataDetails oneLoad) {
+    String segmentId = oneLoad.getLoadName();
+    return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
+  }
+
+  public static void physicalFactAndMeasureMetadataDeletion(
+      AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete) {
+    LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
+    for (LoadMetadataDetails oneLoad : currentDetails) {
+      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
+        String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
+        boolean status = false;
+        try {
+          if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+            CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+            CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+              @Override public boolean accept(CarbonFile file) {
+                return (CarbonTablePath.isCarbonDataFile(file.getName())
+                    || CarbonTablePath.isCarbonIndexFile(file.getName())
+                    || CarbonTablePath.isPartitionMapFile(file.getName()));
+              }
+            });
+
+            //if there are no fact and msr metadata files present then no need to keep
+            //entry in metadata.
+            if (filesToBeDeleted.length == 0) {
+              status = true;
+            } else {
+
+              for (CarbonFile eachFile : filesToBeDeleted) {
+                if (!eachFile.delete()) {
+                  LOGGER.warn("Unable to delete the file as per delete command " + eachFile
+                      .getAbsolutePath());
+                  status = false;
+                } else {
+                  status = true;
+                }
+              }
+            }
+            // need to delete the complete folder.
+            if (status) {
+              if (!file.delete()) {
+                LOGGER.warn(
+                    "Unable to delete the folder as per delete command " + file.getAbsolutePath());
+              }
+            }
+
+          } else {
+            LOGGER.warn("Files are not found in segment " + path
+                + " it seems, files are already being deleted");
+          }
+        } catch (IOException e) {
+          LOGGER.warn("Unable to delete the file as per delete command " + path);
+        }
+      }
+    }
+  }
+
+  private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
+      boolean isForceDelete) {
+    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
+        SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() ||
+        SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() ||
+        SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus())
+        && oneLoad.getVisibility().equalsIgnoreCase("true")) {
+      if (isForceDelete) {
+        return true;
+      }
+      long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
+
+      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
+
+    }
+
+    return false;
+  }
+
+  private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
+      boolean isForceDelete) {
+    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
+        SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) {
+      if (isForceDelete) {
+        return true;
+      }
+      long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
+
+      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
+
+    }
+
+    return false;
+  }
+
+  private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
+      String metadataPath) {
+    LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
+    for (LoadMetadataDetails oneLoad : currentDetails) {
+      if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) {
+        return oneLoad;
+      }
+    }
+    return null;
+  }
+
+  public static boolean deleteLoadFoldersFromFileSystem(
+      AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
+      LoadMetadataDetails[] details, String metadataPath) {
+    boolean isDeleted = false;
+    if (details != null && details.length != 0) {
+      for (LoadMetadataDetails oneLoad : details) {
+        if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
+          ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+              CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
+          try {
+            if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
+                || oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
+              if (segmentLock.lockWithRetries(1, 5)) {
+                LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName());
+                LoadMetadataDetails currentDetails =
+                    getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath);
+                if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails,
+                    isForceDelete)) {
+                  oneLoad.setVisibility("false");
+                  isDeleted = true;
+                  LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
+                }
+              } else {
+                LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName());
+                return isDeleted;
+              }
+            } else {
+              oneLoad.setVisibility("false");
+              isDeleted = true;
+              LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
+            }
+          } finally {
+            segmentLock.unlock();
+            LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released");
+          }
+        }
+      }
+    }
+    return isDeleted;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 23132de..b3bf93d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -26,10 +26,10 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
index 1138adf..db0fb3f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.spark.exception.NoSuchDataMapException
+import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException
 
 class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index 0ca7cb9..97aa056 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -19,8 +19,8 @@ package org.apache.carbondata.integration.spark.testsuite.timeseries
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
-import org.apache.carbondata.spark.exception.{MalformedDataMapCommandException, MalformedCarbonCommandException}
 
 class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
index 545c4de..5fe21e8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.integration.spark.testsuite.timeseries
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 class TestTimeSeriesDropSuite extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
index 3065952..3f140df 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil4Test
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
index 1532328..4e5ebbb 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
@@ -18,9 +18,10 @@
 package org.apache.carbondata.spark.testsuite.dataload
 
 import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
 /**
  * This class will test data load in which number of columns in data are more than
  * the number of columns in schema

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 5e5eed5..aacd28b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -23,13 +23,13 @@ import org.apache.commons.io.FileUtils
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.BatchedDataSourceScanExec
 import org.apache.spark.sql.test.TestQueryExecutor.projectPath
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.spark.rdd.CarbonScanRDD

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
index ec6fff1..c06d782 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
@@ -25,10 +25,10 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.spark.sql.test.util.QueryTest
 
 import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 class TestLoadDataWithDiffTimestampFormat extends QueryTest with BeforeAndAfterAll {
   val bad_records_action = CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
index 7700ed5..edcdd51 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
@@ -20,6 +20,9 @@ package org.apache.carbondata.spark.testsuite.dataload
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
+
 class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterAll{
   override def beforeAll {
     sql("DROP TABLE IF EXISTS t3")
@@ -32,7 +35,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
   }
 
   test("test load data both file and ddl without file header exception") {
-    val e = intercept[Exception] {
+    val e = intercept[CarbonDataLoadingException] {
       sql(
         s"""LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3""")
     }
@@ -41,7 +44,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
   }
 
   test("test load data ddl provided wrong file header exception") {
-    val e = intercept[Exception] {
+    val e = intercept[CarbonDataLoadingException] {
       sql(
         s"""
            LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3
@@ -52,7 +55,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
   }
 
   test("test load data with wrong header , but without fileheader") {
-    val e = intercept[Exception] {
+    val e = intercept[InvalidLoadOptionException] {
       sql(
         s"""
            LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
@@ -63,7 +66,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
   }
 
   test("test load data with wrong header and fileheader") {
-    val e = intercept[Exception] {
+    val e = intercept[InvalidLoadOptionException] {
       sql(
         s"""
          LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
index 1851705..6759049 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
@@ -18,9 +18,10 @@
 package org.apache.carbondata.spark.testsuite.dataload
 
 import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
 class TestLoadDataWithMalformedCarbonCommandException extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
index d2c7e63..4ec9335 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 class TestLoadOptions extends QueryTest with BeforeAndAfterAll{
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala
index a77b210..f6a049a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala
@@ -19,11 +19,13 @@ package org.apache.carbondata.spark.testsuite.dataload
 
 import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
 /**
   * Test Class for table block size
   *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 5170c43..37007ed 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -23,11 +23,11 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.spark.exception.MalformedDataMapCommandException
 
 class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index 7c82f75..a70584b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -29,9 +29,9 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala
index e2df07c..b9b01f8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala
@@ -16,10 +16,10 @@
  */
 package org.apache.carbondata.spark.testsuite.datetype
 
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 
 class DateTypeTest extends QueryTest with BeforeAndAfterAll{
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
index 7c288b3..dd1aab8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
@@ -21,10 +21,11 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil4Test
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
 class TestSortColumns extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index d40e213..e80593b 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -36,11 +36,6 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-processing</artifactId>
-      <version>${project.version}</version>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.carbondata</groupId>
       <artifactId>carbondata-hadoop</artifactId>
       <version>${project.version}</version>
     </dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
deleted file mode 100644
index 1f3c07d..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.exception;
-
-public class ConcurrentOperationException extends Exception {
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public ConcurrentOperationException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
deleted file mode 100644
index 9f441d3..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.exception;
-
-// After parsing carbon query successfully , if any validation fails then
-// use MalformedCarbonCommandException
-public class MalformedCarbonCommandException extends Exception {
-
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public MalformedCarbonCommandException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public MalformedCarbonCommandException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * getLocalizedMessage
-   */
-  @Override
-  public String getLocalizedMessage() {
-    return super.getLocalizedMessage();
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java
deleted file mode 100644
index a05d8e6..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.exception;
-
-/**
- * Throw exception when using illegal argument
- */
-public class MalformedDataMapCommandException extends MalformedCarbonCommandException {
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  public MalformedDataMapCommandException(String msg) {
-    super(msg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java
deleted file mode 100644
index 959e70d..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.exception;
-
-/**
- * if the dataMap does not exist, carbon should throw NoSuchDataMapException
- */
-public class NoSuchDataMapException extends MalformedCarbonCommandException {
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  public NoSuchDataMapException(String dataMapName, String tableName) {
-    super("Datamap with name " + dataMapName + " does not exist under table " + tableName);
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index b98bddf..b89d49d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.unsafe.types.UTF8String
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -34,8 +35,6 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, PartitionMa
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.util.DataLoadingUtil
 
 object CarbonStore {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -138,8 +137,7 @@ object CarbonStore {
         carbonCleanFilesLock =
           CarbonLockUtil
             .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
-        DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-          isForceDeletion = true, carbonTable)
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
         CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
         currentTablePartitions match {
           case Some(partitions) =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
index ad624ee..578138f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
@@ -16,12 +16,12 @@
  */
 package org.apache.carbondata.spark
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
- /**
-  * Carbon column validator
-  */
+/**
+ * Carbon column validator
+ */
 class CarbonColumnValidator extends ColumnValidator {
   def validateColumns(allColumns: Seq[ColumnSchema]) {
     allColumns.foreach { columnSchema =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
deleted file mode 100644
index dfda92c..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.load
-
-import java.text.SimpleDateFormat
-
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.processing.loading.sort.SortScopeOptions
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-object ValidateUtil {
-
-  /**
-   * validates both timestamp and date for illegal values
-   *
-   * @param dateTimeLoadFormat
-   * @param dateTimeLoadOption
-   */
-  def validateDateTimeFormat(dateTimeLoadFormat: String, dateTimeLoadOption: String): Unit = {
-    // allowing empty value to be configured for dateformat option.
-    if (dateTimeLoadFormat != null && dateTimeLoadFormat.trim != "") {
-      try {
-        new SimpleDateFormat(dateTimeLoadFormat)
-      } catch {
-        case _: IllegalArgumentException =>
-          throw new MalformedCarbonCommandException(s"Error: Wrong option: $dateTimeLoadFormat is" +
-                                                    s" provided for option $dateTimeLoadOption")
-      }
-    }
-  }
-
-  def validateSortScope(carbonTable: CarbonTable, sortScope: String): Unit = {
-    if (sortScope != null) {
-      // Don't support use global sort on partitioned table.
-      if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null &&
-          sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
-        throw new MalformedCarbonCommandException("Don't support use global sort on partitioned " +
-          "table.")
-      }
-    }
-  }
-
-  def validateGlobalSortPartitions(globalSortPartitions: String): Unit = {
-    if (globalSortPartitions != null) {
-      try {
-        val num = globalSortPartitions.toInt
-        if (num <= 0) {
-          throw new MalformedCarbonCommandException("'GLOBAL_SORT_PARTITIONS' should be greater " +
-            "than 0.")
-        }
-      } catch {
-        case e: NumberFormatException => throw new MalformedCarbonCommandException(e.getMessage)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index fa126fc..d2c059c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -310,11 +310,11 @@ class CarbonMergerRDD[K, V](
       val splits = format.getSplits(job)
 
       // keep on assigning till last one is reached.
-      if (null != splits && splits.size > 0) splitsOfLastSegment =
-        splits.asScala
+      if (null != splits && splits.size > 0) {
+        splitsOfLastSegment = splits.asScala
           .map(_.asInstanceOf[CarbonInputSplit])
           .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
-
+      }
       carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
         val blockInfo = new TableBlockInfo(entry.getPath.toString,
           entry.getStart, entry.getSegmentId,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 90a4223..8cb8205 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types.{MetadataBuilder, StringType}
 import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.FileUtils
 
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -53,8 +54,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD
 
 object CommonUtil {
@@ -632,13 +632,6 @@ object CommonUtil {
     parsedPropertyValueString
   }
 
-
-  def readLoadMetadataDetails(model: CarbonLoadModel): Unit = {
-    val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
-    val details = SegmentStatusManager.readLoadMetadata(metadataPath)
-    model.setLoadMetadataDetails(new util.ArrayList[LoadMetadataDetails](details.toList.asJava))
-  }
-
   def configureCSVInputFormat(configuration: Configuration,
       carbonLoadModel: CarbonLoadModel): Unit = {
     CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
@@ -680,48 +673,6 @@ object CommonUtil {
     }
   }
 
-  def getCsvHeaderColumns(
-      carbonLoadModel: CarbonLoadModel,
-      hadoopConf: Configuration): Array[String] = {
-    val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
-      CarbonCommonConstants.COMMA
-    } else {
-      CarbonUtil.delimiterConverter(carbonLoadModel.getCsvDelimiter)
-    }
-    var csvFile: String = null
-    var csvHeader: String = carbonLoadModel.getCsvHeader
-    val csvColumns = if (StringUtils.isBlank(csvHeader)) {
-      // read header from csv file
-      csvFile = carbonLoadModel.getFactFilePath.split(",")(0)
-      csvHeader = CarbonUtil.readHeader(csvFile, hadoopConf)
-      if (StringUtils.isBlank(csvHeader)) {
-        throw new CarbonDataLoadingException("First line of the csv is not valid.")
-      }
-      csvHeader.toLowerCase().split(delimiter).map(_.replaceAll("\"", "").trim)
-    } else {
-      csvHeader.toLowerCase.split(CarbonCommonConstants.COMMA).map(_.trim)
-    }
-
-    if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName, csvColumns,
-        carbonLoadModel.getCarbonDataLoadSchema)) {
-      if (csvFile == null) {
-        LOGGER.error("CSV header in DDL is not proper."
-                     + " Column names in schema and CSV header are not the same.")
-        throw new CarbonDataLoadingException(
-          "CSV header in DDL is not proper. Column names in schema and CSV header are "
-          + "not the same.")
-      } else {
-        LOGGER.error(
-          "CSV header in input file is not proper. Column names in schema and csv header are not "
-          + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile))
-        throw new CarbonDataLoadingException(
-          "CSV header in input file is not proper. Column names in schema and csv header are not "
-          + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile))
-      }
-    }
-    csvColumns
-  }
-
   def validateMaxColumns(csvHeaders: Array[String], maxColumns: String): Int = {
     /*
     User configures both csvheadercolumns, maxcolumns,
@@ -862,8 +813,7 @@ object CommonUtil {
                 try {
                   val carbonTable = CarbonMetadata.getInstance
                     .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
-                  DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-                    isForceDeletion = true, carbonTable)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
                 } catch {
                   case _: Exception =>
                     LOGGER.warn(s"Error while cleaning table " +