You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 03:28:58 UTC

[38/49] carbondata git commit: [CARBONDATA-2159] Remove carbon-spark dependency in store-sdk module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
new file mode 100644
index 0000000..fbb93b6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.Strings;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Builder for {@link CarbonLoadModel}
+ */
+@InterfaceAudience.Developer
+public class CarbonLoadModelBuilder {
+
+  private CarbonTable table;
+
+  public CarbonLoadModelBuilder(CarbonTable table) {
+    this.table = table;
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   * @param options Load options from user input
+   * @return a new CarbonLoadModel instance
+   */
+  public CarbonLoadModel build(
+      Map<String, String> options) throws InvalidLoadOptionException, IOException {
+    Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options);
+    optionsFinal.put("sort_scope", "no_sort");
+    if (!options.containsKey("fileheader")) {
+      List<CarbonColumn> csvHeader = table.getCreateOrderColumn(table.getTableName());
+      String[] columns = new String[csvHeader.size()];
+      for (int i = 0; i < columns.length; i++) {
+        columns[i] = csvHeader.get(i).getColName();
+      }
+      optionsFinal.put("fileheader", Strings.mkString(columns, ","));
+    }
+    CarbonLoadModel model = new CarbonLoadModel();
+
+    // we have provided 'fileheader', so it hadoopConf can be null
+    build(options, optionsFinal, model, null);
+
+    // set default values
+    model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+    model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+    model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options, "onepass", "false")));
+    model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost", null));
+    try {
+      model.setDictionaryServerPort(Integer.parseInt(Maps.getOrDefault(options, "dictport", "-1")));
+    } catch (NumberFormatException e) {
+      throw new InvalidLoadOptionException(e.getMessage());
+    }
+    return model;
+  }
+
+  /**
+   * build CarbonLoadModel for data loading
+   * @param options Load options from user input
+   * @param optionsFinal Load options that populated with default values for optional options
+   * @param carbonLoadModel The output load model
+   * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
+   *                   user provided load options
+   */
+  public void build(
+      Map<String, String> options,
+      Map<String, String> optionsFinal,
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf) throws InvalidLoadOptionException, IOException {
+    carbonLoadModel.setTableName(table.getTableName());
+    carbonLoadModel.setDatabaseName(table.getDatabaseName());
+    carbonLoadModel.setTablePath(table.getTablePath());
+    carbonLoadModel.setTableName(table.getTableName());
+    CarbonDataLoadSchema dataLoadSchema = new CarbonDataLoadSchema(table);
+    // Need to fill dimension relation
+    carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema);
+    String sort_scope = optionsFinal.get("sort_scope");
+    String single_pass = optionsFinal.get("single_pass");
+    String bad_records_logger_enable = optionsFinal.get("bad_records_logger_enable");
+    String bad_records_action = optionsFinal.get("bad_records_action");
+    String bad_record_path = optionsFinal.get("bad_record_path");
+    String global_sort_partitions = optionsFinal.get("global_sort_partitions");
+    String timestampformat = optionsFinal.get("timestampformat");
+    String dateFormat = optionsFinal.get("dateformat");
+    String delimeter = optionsFinal.get("delimiter");
+    String complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1");
+    String complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2");
+    String all_dictionary_path = optionsFinal.get("all_dictionary_path");
+    String column_dict = optionsFinal.get("columndict");
+    validateDateTimeFormat(timestampformat, "TimestampFormat");
+    validateDateTimeFormat(dateFormat, "DateFormat");
+    validateSortScope(sort_scope);
+
+    if (Boolean.parseBoolean(bad_records_logger_enable) ||
+        LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
+      bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path);
+      if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+        throw new InvalidLoadOptionException("Invalid bad records location.");
+      }
+    }
+    carbonLoadModel.setBadRecordsLocation(bad_record_path);
+
+    validateGlobalSortPartitions(global_sort_partitions);
+    carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar"), "\\"));
+    carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar"), "\""));
+    carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar"), "#"));
+
+    // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
+    // we should use table schema to generate file header.
+    String fileHeader = optionsFinal.get("fileheader");
+    String headerOption = options.get("header");
+    if (headerOption != null) {
+      if (!headerOption.equalsIgnoreCase("true") &&
+          !headerOption.equalsIgnoreCase("false")) {
+        throw new InvalidLoadOptionException(
+            "'header' option should be either 'true' or 'false'.");
+      }
+      // whether the csv file has file header, the default value is true
+      if (Boolean.valueOf(headerOption)) {
+        if (!StringUtils.isEmpty(fileHeader)) {
+          throw new InvalidLoadOptionException(
+              "When 'header' option is true, 'fileheader' option is not required.");
+        }
+      } else {
+        if (StringUtils.isEmpty(fileHeader)) {
+          List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
+          String[] columnNames = new String[columns.size()];
+          for (int i = 0; i < columnNames.length; i++) {
+            columnNames[i] = columns.get(i).getColName();
+          }
+          fileHeader = Strings.mkString(columnNames, ",");
+        }
+      }
+    }
+
+    carbonLoadModel.setTimestampformat(timestampformat);
+    carbonLoadModel.setDateFormat(dateFormat);
+    carbonLoadModel.setDefaultTimestampFormat(
+        CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+
+    carbonLoadModel.setDefaultDateFormat(
+        CarbonProperties.getInstance().getProperty(
+            CarbonCommonConstants.CARBON_DATE_FORMAT,
+            CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+
+    carbonLoadModel.setSerializationNullFormat(
+        TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," +
+            optionsFinal.get("serialization_null_format"));
+
+    carbonLoadModel.setBadRecordsLoggerEnable(
+        TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + bad_records_logger_enable);
+
+    carbonLoadModel.setBadRecordsAction(
+        TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + bad_records_action.toUpperCase());
+
+    carbonLoadModel.setIsEmptyDataBadRecord(
+        DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
+            optionsFinal.get("is_empty_data_bad_record"));
+
+    carbonLoadModel.setSkipEmptyLine(optionsFinal.get("skip_empty_line"));
+
+    carbonLoadModel.setSortScope(sort_scope);
+    carbonLoadModel.setBatchSortSizeInMb(optionsFinal.get("batch_sort_size_inmb"));
+    carbonLoadModel.setGlobalSortPartitions(global_sort_partitions);
+    carbonLoadModel.setUseOnePass(Boolean.parseBoolean(single_pass));
+
+    if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
+        complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
+        delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
+      throw new InvalidLoadOptionException("Field Delimiter and Complex types delimiter are same");
+    } else {
+      carbonLoadModel.setComplexDelimiterLevel1(
+          CarbonUtil.delimiterConverter(complex_delimeter_level1));
+      carbonLoadModel.setComplexDelimiterLevel2(
+          CarbonUtil.delimiterConverter(complex_delimeter_level2));
+    }
+    // set local dictionary path, and dictionary file extension
+    carbonLoadModel.setAllDictPath(all_dictionary_path);
+    carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter));
+    carbonLoadModel.setCsvHeader(fileHeader);
+    carbonLoadModel.setColDictFilePath(column_dict);
+    carbonLoadModel.setCsvHeaderColumns(
+        LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf));
+
+    int validatedMaxColumns = validateMaxColumns(
+        carbonLoadModel.getCsvHeaderColumns(),
+        optionsFinal.get("maxcolumns"));
+
+    carbonLoadModel.setMaxColumns(String.valueOf(validatedMaxColumns));
+    carbonLoadModel.readAndSetLoadMetadataDetails();
+  }
+
+  private int validateMaxColumns(String[] csvHeaders, String maxColumns)
+      throws InvalidLoadOptionException {
+    /*
+    User configures both csvheadercolumns, maxcolumns,
+      if csvheadercolumns >= maxcolumns, give error
+      if maxcolumns > threashold, give error
+    User configures csvheadercolumns
+      if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
+      if csvheadercolumns >= threashold, give error
+    User configures nothing
+      if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
+      if csvheadercolumns >= threashold, give error
+     */
+    int columnCountInSchema = csvHeaders.length;
+    int maxNumberOfColumnsForParsing = 0;
+    Integer maxColumnsInt = getMaxColumnValue(maxColumns);
+    if (maxColumnsInt != null) {
+      if (columnCountInSchema >= maxColumnsInt) {
+        throw new InvalidLoadOptionException(
+            "csv headers should be less than the max columns " + maxColumnsInt);
+      } else if (maxColumnsInt > CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+        throw new InvalidLoadOptionException(
+            "max columns cannot be greater than the threshold value: " +
+                CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
+      } else {
+        maxNumberOfColumnsForParsing = maxColumnsInt;
+      }
+    } else if (columnCountInSchema >= CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+      throw new InvalidLoadOptionException(
+          "csv header columns should be less than max threashold: " +
+              CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
+    } else if (columnCountInSchema >= CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+      maxNumberOfColumnsForParsing = columnCountInSchema + 1;
+    } else {
+      maxNumberOfColumnsForParsing = CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING;
+    }
+    return maxNumberOfColumnsForParsing;
+  }
+
+  private Integer getMaxColumnValue(String maxColumn) {
+    return (maxColumn == null) ? null : Integer.parseInt(maxColumn);
+  }
+
+  /**
+   * validates both timestamp and date for illegal values
+   */
+  private void validateDateTimeFormat(String dateTimeLoadFormat, String dateTimeLoadOption)
+      throws InvalidLoadOptionException {
+    // allowing empty value to be configured for dateformat option.
+    if (dateTimeLoadFormat != null && !dateTimeLoadFormat.trim().equalsIgnoreCase("")) {
+      try {
+        new SimpleDateFormat(dateTimeLoadFormat);
+      } catch (IllegalArgumentException e) {
+        throw new InvalidLoadOptionException(
+            "Error: Wrong option: " + dateTimeLoadFormat + " is provided for option "
+                + dateTimeLoadOption);
+      }
+    }
+  }
+
+  private void validateSortScope(String sortScope) throws InvalidLoadOptionException {
+    if (sortScope != null) {
+      // Don't support use global sort on partitioned table.
+      if (table.getPartitionInfo(table.getTableName()) != null &&
+          sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString())) {
+        throw new InvalidLoadOptionException("Don't support use global sort on partitioned table.");
+      }
+    }
+  }
+
+  private void validateGlobalSortPartitions(String globalSortPartitions)
+      throws InvalidLoadOptionException {
+    if (globalSortPartitions != null) {
+      try {
+        int num = Integer.parseInt(globalSortPartitions);
+        if (num <= 0) {
+          throw new InvalidLoadOptionException("'GLOBAL_SORT_PARTITIONS' should be greater than 0");
+        }
+      } catch (NumberFormatException e) {
+        throw new InvalidLoadOptionException(e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * check whether using default value or not
+   */
+  private String checkDefaultValue(String value, String defaultValue) {
+    if (StringUtils.isEmpty(value)) {
+      return defaultValue;
+    } else {
+      return value;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
new file mode 100644
index 0000000..8ec93a9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provide utilities to populate loading options
+ */
+@InterfaceAudience.Developer
+public class LoadOption {
+
+  private static LogService LOG = LogServiceFactory.getLogService(LoadOption.class.getName());
+
+  /**
+   * Based on the input options, fill and return data loading options with default value
+   */
+  public static Map<String, String> fillOptionWithDefaultValue(
+      Map<String, String> options) throws InvalidLoadOptionException {
+    Map<String, String> optionsFinal = new HashMap<>();
+    optionsFinal.put("delimiter", Maps.getOrDefault(options, "delimiter", ","));
+    optionsFinal.put("quotechar", Maps.getOrDefault(options, "quotechar", "\""));
+    optionsFinal.put("fileheader", Maps.getOrDefault(options, "fileheader", ""));
+    optionsFinal.put("commentchar", Maps.getOrDefault(options, "commentchar", "#"));
+    optionsFinal.put("columndict", Maps.getOrDefault(options, "columndict", null));
+
+    optionsFinal.put(
+        "escapechar",
+        CarbonLoaderUtil.getEscapeChar(Maps.getOrDefault(options,"escapechar", "\\")));
+
+    optionsFinal.put(
+        "serialization_null_format",
+        Maps.getOrDefault(options, "serialization_null_format", "\\N"));
+
+    optionsFinal.put(
+        "bad_records_logger_enable",
+        Maps.getOrDefault(
+            options,
+            "bad_records_logger_enable",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)));
+
+    String badRecordActionValue = CarbonProperties.getInstance().getProperty(
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+        CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+
+    optionsFinal.put(
+        "bad_records_action",
+        Maps.getOrDefault(
+            options,
+            "bad_records_action",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+                badRecordActionValue)));
+
+    optionsFinal.put(
+        "is_empty_data_bad_record",
+        Maps.getOrDefault(
+            options,
+            "is_empty_data_bad_record",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)));
+
+    optionsFinal.put(
+        "skip_empty_line",
+        Maps.getOrDefault(
+            options,
+            "skip_empty_line",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)));
+
+    optionsFinal.put(
+        "all_dictionary_path",
+        Maps.getOrDefault(options, "all_dictionary_path", ""));
+
+    optionsFinal.put(
+        "complex_delimiter_level_1",
+        Maps.getOrDefault(options,"complex_delimiter_level_1", "\\$"));
+
+    optionsFinal.put(
+        "complex_delimiter_level_2",
+        Maps.getOrDefault(options, "complex_delimiter_level_2", "\\:"));
+
+    optionsFinal.put(
+        "dateformat",
+        Maps.getOrDefault(
+            options,
+            "dateformat",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)));
+
+    optionsFinal.put(
+        "timestampformat",
+        Maps.getOrDefault(
+            options,
+            "timestampformat",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+                CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)));
+
+    optionsFinal.put(
+        "global_sort_partitions",
+        Maps.getOrDefault(
+            options,
+            "global_sort_partitions",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+                null)));
+
+    optionsFinal.put("maxcolumns", Maps.getOrDefault(options, "maxcolumns", null));
+
+    optionsFinal.put(
+        "batch_sort_size_inmb",
+        Maps.getOrDefault(
+            options,
+            "batch_sort_size_inmb",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+                CarbonProperties.getInstance().getProperty(
+                    CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+                    CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))));
+
+    optionsFinal.put(
+        "bad_record_path",
+        Maps.getOrDefault(
+            options,
+            "bad_record_path",
+            CarbonProperties.getInstance().getProperty(
+                CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+                CarbonProperties.getInstance().getProperty(
+                    CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+                    CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))));
+
+    String useOnePass = Maps.getOrDefault(
+        options,
+        "single_pass",
+        CarbonProperties.getInstance().getProperty(
+            CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+            CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim().toLowerCase();
+
+    boolean singlePass;
+
+    if (useOnePass.equalsIgnoreCase("true")) {
+      singlePass = true;
+    } else {
+      // when single_pass = false  and if either alldictionarypath
+      // or columnDict is configured the do not allow load
+      if (StringUtils.isNotEmpty(optionsFinal.get("all_dictionary_path")) ||
+          StringUtils.isNotEmpty(optionsFinal.get("columndict"))) {
+        throw new InvalidLoadOptionException(
+            "Can not use all_dictionary_path or columndict without single_pass.");
+      } else {
+        singlePass = false;
+      }
+    }
+
+    optionsFinal.put("single_pass", String.valueOf(singlePass));
+    return optionsFinal;
+  }
+
+  /**
+   * Return CSV header field names
+   */
+  public static String[] getCsvHeaderColumns(
+      CarbonLoadModel carbonLoadModel,
+      Configuration hadoopConf) throws IOException {
+    String delimiter;
+    if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) {
+      delimiter = CarbonCommonConstants.COMMA;
+    } else {
+      delimiter = CarbonUtil.delimiterConverter(carbonLoadModel.getCsvDelimiter());
+    }
+    String csvFile = null;
+    String csvHeader = carbonLoadModel.getCsvHeader();
+    String[] csvColumns;
+    if (StringUtils.isBlank(csvHeader)) {
+      // read header from csv file
+      csvFile = carbonLoadModel.getFactFilePath().split(",")[0];
+      csvHeader = CarbonUtil.readHeader(csvFile, hadoopConf);
+      if (StringUtils.isBlank(csvHeader)) {
+        throw new CarbonDataLoadingException("First line of the csv is not valid.");
+      }
+      String[] headers = csvHeader.toLowerCase().split(delimiter);
+      csvColumns = new String[headers.length];
+      for (int i = 0; i < csvColumns.length; i++) {
+        csvColumns[i] = headers[i].replaceAll("\"", "").trim();
+      }
+    } else {
+      String[] headers = csvHeader.toLowerCase().split(CarbonCommonConstants.COMMA);
+      csvColumns = new String[headers.length];
+      for (int i = 0; i < csvColumns.length; i++) {
+        csvColumns[i] = headers[i].trim();
+      }
+    }
+
+    if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
+        carbonLoadModel.getCarbonDataLoadSchema())) {
+      if (csvFile == null) {
+        LOG.error("CSV header in DDL is not proper."
+            + " Column names in schema and CSV header are not the same.");
+        throw new CarbonDataLoadingException(
+            "CSV header in DDL is not proper. Column names in schema and CSV header are "
+                + "not the same.");
+      } else {
+        LOG.error(
+            "CSV header in input file is not proper. Column names in schema and csv header are not "
+                + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile));
+        throw new CarbonDataLoadingException(
+            "CSV header in input file is not proper. Column names in schema and csv header are not "
+                + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile));
+      }
+    }
+    return csvColumns;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index e9bd3b8..6876355 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -16,14 +16,9 @@
  */
 package org.apache.carbondata.processing.util;
 
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
 import java.util.*;
 
 import org.apache.carbondata.common.logging.LogService;
@@ -40,9 +35,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -58,9 +50,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.merger.NodeBlockRelation;
 import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
-import static org.apache.carbondata.core.enums.EscapeSequences.*;
 
-import com.google.gson.Gson;
+import static org.apache.carbondata.core.enums.EscapeSequences.*;
 
 
 public final class CarbonLoaderUtil {
@@ -344,48 +335,6 @@ public final class CarbonLoaderUtil {
     loadMetadataDetails.setLoadStartTime(loadStartTime);
   }
 
-  public static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
-      List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
-    String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
-
-    DataOutputStream dataOutputStream;
-    Gson gsonObjectToWrite = new Gson();
-    BufferedWriter brWriter = null;
-
-    AtomicFileOperations writeOperation =
-        new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
-
-    try {
-      dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
-      brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
-      String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
-      brWriter.write(metadataInstance);
-    } finally {
-      try {
-        if (null != brWriter) {
-          brWriter.flush();
-        }
-      } catch (Exception e) {
-        LOGGER.error("error in  flushing ");
-
-      }
-      CarbonUtil.closeStreams(brWriter);
-      writeOperation.close();
-    }
-
-  }
-
-  public static String readCurrentTime() {
-    SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-    String date = null;
-
-    date = sdf.format(new Date());
-
-    return date;
-  }
-
   public static boolean isValidEscapeSequence(String escapeChar) {
     return escapeChar.equalsIgnoreCase(NEW_LINE.getName()) ||
         escapeChar.equalsIgnoreCase(CARRIAGE_RETURN.getName()) ||
@@ -514,17 +463,6 @@ public final class CarbonLoaderUtil {
   }
 
   /**
-   * This method will divide the blocks among the nodes as per the data locality
-   *
-   * @param blockInfos
-   * @return
-   */
-  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
-    // -1 if number of nodes has to be decided based on block location information
-    return nodeBlockMapping(blockInfos, -1);
-  }
-
-  /**
    * the method returns the number of required executors
    *
    * @param blockInfos
@@ -899,25 +837,6 @@ public final class CarbonLoaderUtil {
     CarbonUtil.checkAndCreateFolder(segmentFolder);
   }
 
-  /**
-   * This will update the old table status details before clean files to the latest table status.
-   * @param oldList
-   * @param newList
-   * @return
-   */
-  public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
-      LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
-
-    List<LoadMetadataDetails> newListMetadata =
-        new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
-    for (LoadMetadataDetails oldSegment : oldList) {
-      if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
-        newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
-      }
-    }
-    return newListMetadata;
-  }
-
   /*
    * This method will add data size and index size into tablestatus for each segment. And also
    * returns the size of the segment.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
deleted file mode 100644
index c00cc86..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.util;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.locks.CarbonLockFactory;
-import org.apache.carbondata.core.locks.ICarbonLock;
-import org.apache.carbondata.core.locks.LockUsage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.SegmentFileStore;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatus;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-public final class DeleteLoadFolders {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
-
-  private DeleteLoadFolders() {
-
-  }
-
-  /**
-   * returns segment path
-   *
-   * @param identifier
-   * @param oneLoad
-   * @return
-   */
-  private static String getSegmentPath(AbsoluteTableIdentifier identifier,
-      LoadMetadataDetails oneLoad) {
-    String segmentId = oneLoad.getLoadName();
-    return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
-  }
-
-  public static void physicalFactAndMeasureMetadataDeletion(
-      AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete,
-      List<PartitionSpec> specs) {
-    LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
-    for (LoadMetadataDetails oneLoad : currentDetails) {
-      if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
-        try {
-          if (oneLoad.getSegmentFile() != null) {
-            SegmentFileStore
-                .deleteSegment(absoluteTableIdentifier.getTablePath(), oneLoad.getSegmentFile(),
-                    specs);
-          } else {
-            String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
-            boolean status = false;
-            if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
-              CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
-              CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
-
-                @Override public boolean accept(CarbonFile file) {
-                  return (CarbonTablePath.isCarbonDataFile(file.getName()) ||
-                      CarbonTablePath.isCarbonIndexFile(file.getName()));
-                }
-              });
-
-              //if there are no fact and msr metadata files present then no need to keep
-              //entry in metadata.
-              if (filesToBeDeleted.length == 0) {
-                status = true;
-              } else {
-
-                for (CarbonFile eachFile : filesToBeDeleted) {
-                  if (!eachFile.delete()) {
-                    LOGGER.warn("Unable to delete the file as per delete command " + eachFile
-                        .getAbsolutePath());
-                    status = false;
-                  } else {
-                    status = true;
-                  }
-                }
-              }
-              // need to delete the complete folder.
-              if (status) {
-                if (!file.delete()) {
-                  LOGGER.warn("Unable to delete the folder as per delete command " + file
-                      .getAbsolutePath());
-                }
-              }
-
-            } else {
-              LOGGER.warn("Files are not found in segment " + path
-                  + " it seems, files are already being deleted");
-            }
-
-          }
-        } catch (IOException e) {
-          LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName());
-        }
-      }
-    }
-  }
-
-  private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
-      boolean isForceDelete) {
-    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
-        SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() ||
-        SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() ||
-        SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus())
-        && oneLoad.getVisibility().equalsIgnoreCase("true")) {
-      if (isForceDelete) {
-        return true;
-      }
-      long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
-
-      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
-
-    }
-
-    return false;
-  }
-
-  private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
-      boolean isForceDelete) {
-    if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
-        SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) {
-      if (isForceDelete) {
-        return true;
-      }
-      long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
-
-      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
-
-    }
-
-    return false;
-  }
-
-  private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
-      String metadataPath) {
-    LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
-    for (LoadMetadataDetails oneLoad : currentDetails) {
-      if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) {
-        return oneLoad;
-      }
-    }
-    return null;
-  }
-
-  public static boolean deleteLoadFoldersFromFileSystem(
-      AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
-      LoadMetadataDetails[] details, String metadataPath) {
-    boolean isDeleted = false;
-    if (details != null && details.length != 0) {
-      for (LoadMetadataDetails oneLoad : details) {
-        if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
-          ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
-              CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
-          try {
-            if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
-                || oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
-              if (segmentLock.lockWithRetries(1, 5)) {
-                LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName());
-                LoadMetadataDetails currentDetails =
-                    getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath);
-                if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails,
-                    isForceDelete)) {
-                  oneLoad.setVisibility("false");
-                  isDeleted = true;
-                  LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
-                }
-              } else {
-                LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName());
-                return isDeleted;
-              }
-            } else {
-              oneLoad.setVisibility("false");
-              isDeleted = true;
-              LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
-            }
-          } finally {
-            segmentLock.unlock();
-            LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released");
-          }
-        }
-      }
-    }
-    return isDeleted;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 6663683..51d2cf9 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -21,7 +21,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.carbondata</groupId>
-      <artifactId>carbondata-spark-common</artifactId>
+      <artifactId>carbondata-hadoop</artifactId>
       <version>${project.version}</version>
     </dependency>
     <dependency>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 51ca09c..e06200a 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -28,6 +28,7 @@ import java.util.Objects;
 import org.apache.carbondata.common.Strings;
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.converter.SchemaConverter;
@@ -40,7 +41,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.ThriftWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.spark.util.DataLoadingUtil;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
 
 /**
  * Biulder for {@link CarbonWriter}
@@ -94,9 +95,9 @@ public class CarbonWriterBuilder {
   }
 
   /**
-   * Build a {@link CSVCarbonWriter}, which accepts row in CSV format
+   * Build a {@link CarbonWriter}, which accepts row in CSV format
    */
-  public CarbonWriter buildWriterForCSVInput() throws IOException {
+  public CarbonWriter buildWriterForCSVInput() throws IOException, InvalidLoadOptionException {
     Objects.requireNonNull(schema, "schema should not be null");
     Objects.requireNonNull(path, "path should not be null");
 
@@ -113,7 +114,7 @@ public class CarbonWriterBuilder {
   }
 
   /**
-   * Build a {@link AvroCarbonWriter}, which accepts Avro object
+   * Build a {@link CarbonWriter}, which accepts Avro object
    * @return
    * @throws IOException
    */
@@ -184,11 +185,13 @@ public class CarbonWriterBuilder {
   /**
    * Build a {@link CarbonLoadModel}
    */
-  private CarbonLoadModel buildLoadModel(CarbonTable table) {
+  private CarbonLoadModel buildLoadModel(CarbonTable table)
+      throws InvalidLoadOptionException, IOException {
     Map<String, String> options = new HashMap<>();
     if (sortColumns != null) {
       options.put("sort_columns", Strings.mkString(sortColumns, ","));
     }
-    return DataLoadingUtil.buildCarbonLoadModelJava(table, options);
+    CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
+    return builder.build(options);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
index 531ec7c..aca2b2d 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
@@ -77,7 +77,7 @@ public class CSVCarbonWriterSuite {
         writer.write(new String[]{"robot" + i, String.valueOf(i), String.valueOf((double) i / 2)});
       }
       writer.close();
-    } catch (IOException e) {
+    } catch (Exception e) {
       e.printStackTrace();
       Assert.fail(e.getMessage());
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/06a71586/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 6316d84..bc7b042 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -34,10 +34,9 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
 import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.util.DataLoadingUtil
 import org.apache.carbondata.streaming.segment.StreamSegment
 
 /**
@@ -209,17 +208,15 @@ object StreamSinkFactory {
       segmentId: String): CarbonLoadModel = {
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
-    val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, parameters)
+    val optionsFinal = LoadOption.fillOptionWithDefaultValue(parameters.asJava)
     optionsFinal.put("sort_scope", "no_sort")
     if (parameters.get("fileheader").isEmpty) {
       optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName)
         .asScala.map(_.getColName).mkString(","))
     }
     val carbonLoadModel = new CarbonLoadModel()
-    DataLoadingUtil.buildCarbonLoadModel(
-      carbonTable,
-      carbonProperty,
-      parameters,
+    new CarbonLoadModelBuilder(carbonTable).build(
+      parameters.asJava,
       optionsFinal,
       carbonLoadModel,
       hadoopConf)