You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/02 08:02:03 UTC
[42/50] [abbrv] carbondata git commit: [CARBONDATA-2159] Remove
carbon-spark dependency in store-sdk module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5740b19/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
new file mode 100644
index 0000000..fbb93b6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.Strings;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Builder for {@link CarbonLoadModel}
+ */
+@InterfaceAudience.Developer
+public class CarbonLoadModelBuilder {
+
+ private CarbonTable table;
+
+ public CarbonLoadModelBuilder(CarbonTable table) {
+ this.table = table;
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @return a new CarbonLoadModel instance
+ */
+ public CarbonLoadModel build(
+ Map<String, String> options) throws InvalidLoadOptionException, IOException {
+ Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options);
+ optionsFinal.put("sort_scope", "no_sort");
+ if (!options.containsKey("fileheader")) {
+ List<CarbonColumn> csvHeader = table.getCreateOrderColumn(table.getTableName());
+ String[] columns = new String[csvHeader.size()];
+ for (int i = 0; i < columns.length; i++) {
+ columns[i] = csvHeader.get(i).getColName();
+ }
+ optionsFinal.put("fileheader", Strings.mkString(columns, ","));
+ }
+ CarbonLoadModel model = new CarbonLoadModel();
+
+ // we have provided 'fileheader', so it hadoopConf can be null
+ build(options, optionsFinal, model, null);
+
+ // set default values
+ model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+ model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+ model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options, "onepass", "false")));
+ model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost", null));
+ try {
+ model.setDictionaryServerPort(Integer.parseInt(Maps.getOrDefault(options, "dictport", "-1")));
+ } catch (NumberFormatException e) {
+ throw new InvalidLoadOptionException(e.getMessage());
+ }
+ return model;
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @param optionsFinal Load options that populated with default values for optional options
+ * @param carbonLoadModel The output load model
+ * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
+ * user provided load options
+ */
+ public void build(
+ Map<String, String> options,
+ Map<String, String> optionsFinal,
+ CarbonLoadModel carbonLoadModel,
+ Configuration hadoopConf) throws InvalidLoadOptionException, IOException {
+ carbonLoadModel.setTableName(table.getTableName());
+ carbonLoadModel.setDatabaseName(table.getDatabaseName());
+ carbonLoadModel.setTablePath(table.getTablePath());
+ carbonLoadModel.setTableName(table.getTableName());
+ CarbonDataLoadSchema dataLoadSchema = new CarbonDataLoadSchema(table);
+ // Need to fill dimension relation
+ carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema);
+ String sort_scope = optionsFinal.get("sort_scope");
+ String single_pass = optionsFinal.get("single_pass");
+ String bad_records_logger_enable = optionsFinal.get("bad_records_logger_enable");
+ String bad_records_action = optionsFinal.get("bad_records_action");
+ String bad_record_path = optionsFinal.get("bad_record_path");
+ String global_sort_partitions = optionsFinal.get("global_sort_partitions");
+ String timestampformat = optionsFinal.get("timestampformat");
+ String dateFormat = optionsFinal.get("dateformat");
+ String delimeter = optionsFinal.get("delimiter");
+ String complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1");
+ String complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2");
+ String all_dictionary_path = optionsFinal.get("all_dictionary_path");
+ String column_dict = optionsFinal.get("columndict");
+ validateDateTimeFormat(timestampformat, "TimestampFormat");
+ validateDateTimeFormat(dateFormat, "DateFormat");
+ validateSortScope(sort_scope);
+
+ if (Boolean.parseBoolean(bad_records_logger_enable) ||
+ LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
+ bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path);
+ if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+ throw new InvalidLoadOptionException("Invalid bad records location.");
+ }
+ }
+ carbonLoadModel.setBadRecordsLocation(bad_record_path);
+
+ validateGlobalSortPartitions(global_sort_partitions);
+ carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar"), "\\"));
+ carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar"), "\""));
+ carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar"), "#"));
+
+ // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
+ // we should use table schema to generate file header.
+ String fileHeader = optionsFinal.get("fileheader");
+ String headerOption = options.get("header");
+ if (headerOption != null) {
+ if (!headerOption.equalsIgnoreCase("true") &&
+ !headerOption.equalsIgnoreCase("false")) {
+ throw new InvalidLoadOptionException(
+ "'header' option should be either 'true' or 'false'.");
+ }
+ // whether the csv file has file header, the default value is true
+ if (Boolean.valueOf(headerOption)) {
+ if (!StringUtils.isEmpty(fileHeader)) {
+ throw new InvalidLoadOptionException(
+ "When 'header' option is true, 'fileheader' option is not required.");
+ }
+ } else {
+ if (StringUtils.isEmpty(fileHeader)) {
+ List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
+ String[] columnNames = new String[columns.size()];
+ for (int i = 0; i < columnNames.length; i++) {
+ columnNames[i] = columns.get(i).getColName();
+ }
+ fileHeader = Strings.mkString(columnNames, ",");
+ }
+ }
+ }
+
+ carbonLoadModel.setTimestampformat(timestampformat);
+ carbonLoadModel.setDateFormat(dateFormat);
+ carbonLoadModel.setDefaultTimestampFormat(
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+
+ carbonLoadModel.setDefaultDateFormat(
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+
+ carbonLoadModel.setSerializationNullFormat(
+ TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," +
+ optionsFinal.get("serialization_null_format"));
+
+ carbonLoadModel.setBadRecordsLoggerEnable(
+ TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + bad_records_logger_enable);
+
+ carbonLoadModel.setBadRecordsAction(
+ TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + bad_records_action.toUpperCase());
+
+ carbonLoadModel.setIsEmptyDataBadRecord(
+ DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
+ optionsFinal.get("is_empty_data_bad_record"));
+
+ carbonLoadModel.setSkipEmptyLine(optionsFinal.get("skip_empty_line"));
+
+ carbonLoadModel.setSortScope(sort_scope);
+ carbonLoadModel.setBatchSortSizeInMb(optionsFinal.get("batch_sort_size_inmb"));
+ carbonLoadModel.setGlobalSortPartitions(global_sort_partitions);
+ carbonLoadModel.setUseOnePass(Boolean.parseBoolean(single_pass));
+
+ if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
+ complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
+ delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
+ throw new InvalidLoadOptionException("Field Delimiter and Complex types delimiter are same");
+ } else {
+ carbonLoadModel.setComplexDelimiterLevel1(
+ CarbonUtil.delimiterConverter(complex_delimeter_level1));
+ carbonLoadModel.setComplexDelimiterLevel2(
+ CarbonUtil.delimiterConverter(complex_delimeter_level2));
+ }
+ // set local dictionary path, and dictionary file extension
+ carbonLoadModel.setAllDictPath(all_dictionary_path);
+ carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter));
+ carbonLoadModel.setCsvHeader(fileHeader);
+ carbonLoadModel.setColDictFilePath(column_dict);
+ carbonLoadModel.setCsvHeaderColumns(
+ LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf));
+
+ int validatedMaxColumns = validateMaxColumns(
+ carbonLoadModel.getCsvHeaderColumns(),
+ optionsFinal.get("maxcolumns"));
+
+ carbonLoadModel.setMaxColumns(String.valueOf(validatedMaxColumns));
+ carbonLoadModel.readAndSetLoadMetadataDetails();
+ }
+
+ private int validateMaxColumns(String[] csvHeaders, String maxColumns)
+ throws InvalidLoadOptionException {
+ /*
+ User configures both csvheadercolumns, maxcolumns,
+ if csvheadercolumns >= maxcolumns, give error
+ if maxcolumns > threashold, give error
+ User configures csvheadercolumns
+ if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
+ if csvheadercolumns >= threashold, give error
+ User configures nothing
+ if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
+ if csvheadercolumns >= threashold, give error
+ */
+ int columnCountInSchema = csvHeaders.length;
+ int maxNumberOfColumnsForParsing = 0;
+ Integer maxColumnsInt = getMaxColumnValue(maxColumns);
+ if (maxColumnsInt != null) {
+ if (columnCountInSchema >= maxColumnsInt) {
+ throw new InvalidLoadOptionException(
+ "csv headers should be less than the max columns " + maxColumnsInt);
+ } else if (maxColumnsInt > CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+ throw new InvalidLoadOptionException(
+ "max columns cannot be greater than the threshold value: " +
+ CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
+ } else {
+ maxNumberOfColumnsForParsing = maxColumnsInt;
+ }
+ } else if (columnCountInSchema >= CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+ throw new InvalidLoadOptionException(
+ "csv header columns should be less than max threashold: " +
+ CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
+ } else if (columnCountInSchema >= CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+ maxNumberOfColumnsForParsing = columnCountInSchema + 1;
+ } else {
+ maxNumberOfColumnsForParsing = CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING;
+ }
+ return maxNumberOfColumnsForParsing;
+ }
+
+ private Integer getMaxColumnValue(String maxColumn) {
+ return (maxColumn == null) ? null : Integer.parseInt(maxColumn);
+ }
+
+ /**
+ * validates both timestamp and date for illegal values
+ */
+ private void validateDateTimeFormat(String dateTimeLoadFormat, String dateTimeLoadOption)
+ throws InvalidLoadOptionException {
+ // allowing empty value to be configured for dateformat option.
+ if (dateTimeLoadFormat != null && !dateTimeLoadFormat.trim().equalsIgnoreCase("")) {
+ try {
+ new SimpleDateFormat(dateTimeLoadFormat);
+ } catch (IllegalArgumentException e) {
+ throw new InvalidLoadOptionException(
+ "Error: Wrong option: " + dateTimeLoadFormat + " is provided for option "
+ + dateTimeLoadOption);
+ }
+ }
+ }
+
+ private void validateSortScope(String sortScope) throws InvalidLoadOptionException {
+ if (sortScope != null) {
+ // Don't support use global sort on partitioned table.
+ if (table.getPartitionInfo(table.getTableName()) != null &&
+ sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString())) {
+ throw new InvalidLoadOptionException("Don't support use global sort on partitioned table.");
+ }
+ }
+ }
+
+ private void validateGlobalSortPartitions(String globalSortPartitions)
+ throws InvalidLoadOptionException {
+ if (globalSortPartitions != null) {
+ try {
+ int num = Integer.parseInt(globalSortPartitions);
+ if (num <= 0) {
+ throw new InvalidLoadOptionException("'GLOBAL_SORT_PARTITIONS' should be greater than 0");
+ }
+ } catch (NumberFormatException e) {
+ throw new InvalidLoadOptionException(e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * check whether using default value or not
+ */
+ private String checkDefaultValue(String value, String defaultValue) {
+ if (StringUtils.isEmpty(value)) {
+ return defaultValue;
+ } else {
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5740b19/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
new file mode 100644
index 0000000..8ec93a9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provide utilities to populate loading options
+ */
+@InterfaceAudience.Developer
+public class LoadOption {
+
+ private static LogService LOG = LogServiceFactory.getLogService(LoadOption.class.getName());
+
+ /**
+ * Based on the input options, fill and return data loading options with default value
+ */
+ public static Map<String, String> fillOptionWithDefaultValue(
+ Map<String, String> options) throws InvalidLoadOptionException {
+ Map<String, String> optionsFinal = new HashMap<>();
+ optionsFinal.put("delimiter", Maps.getOrDefault(options, "delimiter", ","));
+ optionsFinal.put("quotechar", Maps.getOrDefault(options, "quotechar", "\""));
+ optionsFinal.put("fileheader", Maps.getOrDefault(options, "fileheader", ""));
+ optionsFinal.put("commentchar", Maps.getOrDefault(options, "commentchar", "#"));
+ optionsFinal.put("columndict", Maps.getOrDefault(options, "columndict", null));
+
+ optionsFinal.put(
+ "escapechar",
+ CarbonLoaderUtil.getEscapeChar(Maps.getOrDefault(options,"escapechar", "\\")));
+
+ optionsFinal.put(
+ "serialization_null_format",
+ Maps.getOrDefault(options, "serialization_null_format", "\\N"));
+
+ optionsFinal.put(
+ "bad_records_logger_enable",
+ Maps.getOrDefault(
+ options,
+ "bad_records_logger_enable",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)));
+
+ String badRecordActionValue = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+
+ optionsFinal.put(
+ "bad_records_action",
+ Maps.getOrDefault(
+ options,
+ "bad_records_action",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+ badRecordActionValue)));
+
+ optionsFinal.put(
+ "is_empty_data_bad_record",
+ Maps.getOrDefault(
+ options,
+ "is_empty_data_bad_record",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)));
+
+ optionsFinal.put(
+ "skip_empty_line",
+ Maps.getOrDefault(
+ options,
+ "skip_empty_line",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)));
+
+ optionsFinal.put(
+ "all_dictionary_path",
+ Maps.getOrDefault(options, "all_dictionary_path", ""));
+
+ optionsFinal.put(
+ "complex_delimiter_level_1",
+ Maps.getOrDefault(options,"complex_delimiter_level_1", "\\$"));
+
+ optionsFinal.put(
+ "complex_delimiter_level_2",
+ Maps.getOrDefault(options, "complex_delimiter_level_2", "\\:"));
+
+ optionsFinal.put(
+ "dateformat",
+ Maps.getOrDefault(
+ options,
+ "dateformat",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)));
+
+ optionsFinal.put(
+ "timestampformat",
+ Maps.getOrDefault(
+ options,
+ "timestampformat",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)));
+
+ optionsFinal.put(
+ "global_sort_partitions",
+ Maps.getOrDefault(
+ options,
+ "global_sort_partitions",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+ null)));
+
+ optionsFinal.put("maxcolumns", Maps.getOrDefault(options, "maxcolumns", null));
+
+ optionsFinal.put(
+ "batch_sort_size_inmb",
+ Maps.getOrDefault(
+ options,
+ "batch_sort_size_inmb",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+ CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))));
+
+ optionsFinal.put(
+ "bad_record_path",
+ Maps.getOrDefault(
+ options,
+ "bad_record_path",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))));
+
+ String useOnePass = Maps.getOrDefault(
+ options,
+ "single_pass",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim().toLowerCase();
+
+ boolean singlePass;
+
+ if (useOnePass.equalsIgnoreCase("true")) {
+ singlePass = true;
+ } else {
+ // when single_pass = false and if either alldictionarypath
+ // or columnDict is configured the do not allow load
+ if (StringUtils.isNotEmpty(optionsFinal.get("all_dictionary_path")) ||
+ StringUtils.isNotEmpty(optionsFinal.get("columndict"))) {
+ throw new InvalidLoadOptionException(
+ "Can not use all_dictionary_path or columndict without single_pass.");
+ } else {
+ singlePass = false;
+ }
+ }
+
+ optionsFinal.put("single_pass", String.valueOf(singlePass));
+ return optionsFinal;
+ }
+
+ /**
+ * Return CSV header field names
+ */
+ public static String[] getCsvHeaderColumns(
+ CarbonLoadModel carbonLoadModel,
+ Configuration hadoopConf) throws IOException {
+ String delimiter;
+ if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) {
+ delimiter = CarbonCommonConstants.COMMA;
+ } else {
+ delimiter = CarbonUtil.delimiterConverter(carbonLoadModel.getCsvDelimiter());
+ }
+ String csvFile = null;
+ String csvHeader = carbonLoadModel.getCsvHeader();
+ String[] csvColumns;
+ if (StringUtils.isBlank(csvHeader)) {
+ // read header from csv file
+ csvFile = carbonLoadModel.getFactFilePath().split(",")[0];
+ csvHeader = CarbonUtil.readHeader(csvFile, hadoopConf);
+ if (StringUtils.isBlank(csvHeader)) {
+ throw new CarbonDataLoadingException("First line of the csv is not valid.");
+ }
+ String[] headers = csvHeader.toLowerCase().split(delimiter);
+ csvColumns = new String[headers.length];
+ for (int i = 0; i < csvColumns.length; i++) {
+ csvColumns[i] = headers[i].replaceAll("\"", "").trim();
+ }
+ } else {
+ String[] headers = csvHeader.toLowerCase().split(CarbonCommonConstants.COMMA);
+ csvColumns = new String[headers.length];
+ for (int i = 0; i < csvColumns.length; i++) {
+ csvColumns[i] = headers[i].trim();
+ }
+ }
+
+ if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
+ carbonLoadModel.getCarbonDataLoadSchema())) {
+ if (csvFile == null) {
+ LOG.error("CSV header in DDL is not proper."
+ + " Column names in schema and CSV header are not the same.");
+ throw new CarbonDataLoadingException(
+ "CSV header in DDL is not proper. Column names in schema and CSV header are "
+ + "not the same.");
+ } else {
+ LOG.error(
+ "CSV header in input file is not proper. Column names in schema and csv header are not "
+ + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile));
+ throw new CarbonDataLoadingException(
+ "CSV header in input file is not proper. Column names in schema and csv header are not "
+ + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile));
+ }
+ }
+ return csvColumns;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5740b19/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index e9bd3b8..6876355 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -16,14 +16,9 @@
*/
package org.apache.carbondata.processing.util;
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
import java.util.*;
import org.apache.carbondata.common.logging.LogService;
@@ -40,9 +35,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -58,9 +50,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.merger.NodeBlockRelation;
import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
-import static org.apache.carbondata.core.enums.EscapeSequences.*;
-import com.google.gson.Gson;
+import static org.apache.carbondata.core.enums.EscapeSequences.*;
public final class CarbonLoaderUtil {
@@ -344,48 +335,6 @@ public final class CarbonLoaderUtil {
loadMetadataDetails.setLoadStartTime(loadStartTime);
}
- public static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
- List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
- String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
-
- DataOutputStream dataOutputStream;
- Gson gsonObjectToWrite = new Gson();
- BufferedWriter brWriter = null;
-
- AtomicFileOperations writeOperation =
- new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
-
- try {
- dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
- brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
- String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
- brWriter.write(metadataInstance);
- } finally {
- try {
- if (null != brWriter) {
- brWriter.flush();
- }
- } catch (Exception e) {
- LOGGER.error("error in flushing ");
-
- }
- CarbonUtil.closeStreams(brWriter);
- writeOperation.close();
- }
-
- }
-
- public static String readCurrentTime() {
- SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
- String date = null;
-
- date = sdf.format(new Date());
-
- return date;
- }
-
public static boolean isValidEscapeSequence(String escapeChar) {
return escapeChar.equalsIgnoreCase(NEW_LINE.getName()) ||
escapeChar.equalsIgnoreCase(CARRIAGE_RETURN.getName()) ||
@@ -514,17 +463,6 @@ public final class CarbonLoaderUtil {
}
/**
- * This method will divide the blocks among the nodes as per the data locality
- *
- * @param blockInfos
- * @return
- */
- public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
- // -1 if number of nodes has to be decided based on block location information
- return nodeBlockMapping(blockInfos, -1);
- }
-
- /**
* the method returns the number of required executors
*
* @param blockInfos
@@ -899,25 +837,6 @@ public final class CarbonLoaderUtil {
CarbonUtil.checkAndCreateFolder(segmentFolder);
}
- /**
- * This will update the old table status details before clean files to the latest table status.
- * @param oldList
- * @param newList
- * @return
- */
- public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
- LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
-
- List<LoadMetadataDetails> newListMetadata =
- new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
- for (LoadMetadataDetails oldSegment : oldList) {
- if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
- newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
- }
- }
- return newListMetadata;
- }
-
/*
* This method will add data size and index size into tablestatus for each segment. And also
* returns the size of the segment.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5740b19/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
deleted file mode 100644
index c00cc86..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.util;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.locks.CarbonLockFactory;
-import org.apache.carbondata.core.locks.ICarbonLock;
-import org.apache.carbondata.core.locks.LockUsage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.SegmentFileStore;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatus;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-public final class DeleteLoadFolders {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
-
- private DeleteLoadFolders() {
-
- }
-
- /**
- * returns segment path
- *
- * @param identifier
- * @param oneLoad
- * @return
- */
- private static String getSegmentPath(AbsoluteTableIdentifier identifier,
- LoadMetadataDetails oneLoad) {
- String segmentId = oneLoad.getLoadName();
- return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
- }
-
- public static void physicalFactAndMeasureMetadataDeletion(
- AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete,
- List<PartitionSpec> specs) {
- LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
- for (LoadMetadataDetails oneLoad : currentDetails) {
- if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
- try {
- if (oneLoad.getSegmentFile() != null) {
- SegmentFileStore
- .deleteSegment(absoluteTableIdentifier.getTablePath(), oneLoad.getSegmentFile(),
- specs);
- } else {
- String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
- boolean status = false;
- if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
- CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
- CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
-
- @Override public boolean accept(CarbonFile file) {
- return (CarbonTablePath.isCarbonDataFile(file.getName()) ||
- CarbonTablePath.isCarbonIndexFile(file.getName()));
- }
- });
-
- //if there are no fact and msr metadata files present then no need to keep
- //entry in metadata.
- if (filesToBeDeleted.length == 0) {
- status = true;
- } else {
-
- for (CarbonFile eachFile : filesToBeDeleted) {
- if (!eachFile.delete()) {
- LOGGER.warn("Unable to delete the file as per delete command " + eachFile
- .getAbsolutePath());
- status = false;
- } else {
- status = true;
- }
- }
- }
- // need to delete the complete folder.
- if (status) {
- if (!file.delete()) {
- LOGGER.warn("Unable to delete the folder as per delete command " + file
- .getAbsolutePath());
- }
- }
-
- } else {
- LOGGER.warn("Files are not found in segment " + path
- + " it seems, files are already being deleted");
- }
-
- }
- } catch (IOException e) {
- LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName());
- }
- }
- }
- }
-
- private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
- boolean isForceDelete) {
- if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
- SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() ||
- SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() ||
- SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus())
- && oneLoad.getVisibility().equalsIgnoreCase("true")) {
- if (isForceDelete) {
- return true;
- }
- long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
-
- return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
-
- }
-
- return false;
- }
-
- private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
- boolean isForceDelete) {
- if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
- SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) {
- if (isForceDelete) {
- return true;
- }
- long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
-
- return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
-
- }
-
- return false;
- }
-
- private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
- String metadataPath) {
- LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
- for (LoadMetadataDetails oneLoad : currentDetails) {
- if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) {
- return oneLoad;
- }
- }
- return null;
- }
-
- public static boolean deleteLoadFoldersFromFileSystem(
- AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
- LoadMetadataDetails[] details, String metadataPath) {
- boolean isDeleted = false;
- if (details != null && details.length != 0) {
- for (LoadMetadataDetails oneLoad : details) {
- if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
- ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
- CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
- try {
- if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
- || oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
- if (segmentLock.lockWithRetries(1, 5)) {
- LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName());
- LoadMetadataDetails currentDetails =
- getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath);
- if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails,
- isForceDelete)) {
- oneLoad.setVisibility("false");
- isDeleted = true;
- LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
- }
- } else {
- LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName());
- return isDeleted;
- }
- } else {
- oneLoad.setVisibility("false");
- isDeleted = true;
- LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
- }
- } finally {
- segmentLock.unlock();
- LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released");
- }
- }
- }
- }
- return isDeleted;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5740b19/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 6663683..51d2cf9 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -21,7 +21,7 @@
<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-spark-common</artifactId>
+ <artifactId>carbondata-hadoop</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5740b19/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 51ca09c..e06200a 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -28,6 +28,7 @@ import java.util.Objects;
import org.apache.carbondata.common.Strings;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
@@ -40,7 +41,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.ThriftWriter;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.spark.util.DataLoadingUtil;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
/**
* Biulder for {@link CarbonWriter}
@@ -94,9 +95,9 @@ public class CarbonWriterBuilder {
}
/**
- * Build a {@link CSVCarbonWriter}, which accepts row in CSV format
+ * Build a {@link CarbonWriter}, which accepts row in CSV format
*/
- public CarbonWriter buildWriterForCSVInput() throws IOException {
+ public CarbonWriter buildWriterForCSVInput() throws IOException, InvalidLoadOptionException {
Objects.requireNonNull(schema, "schema should not be null");
Objects.requireNonNull(path, "path should not be null");
@@ -113,7 +114,7 @@ public class CarbonWriterBuilder {
}
/**
- * Build a {@link AvroCarbonWriter}, which accepts Avro object
+ * Build a {@link CarbonWriter}, which accepts Avro object
* @return
* @throws IOException
*/
@@ -184,11 +185,13 @@ public class CarbonWriterBuilder {
/**
* Build a {@link CarbonLoadModel}
*/
- private CarbonLoadModel buildLoadModel(CarbonTable table) {
+ private CarbonLoadModel buildLoadModel(CarbonTable table)
+ throws InvalidLoadOptionException, IOException {
Map<String, String> options = new HashMap<>();
if (sortColumns != null) {
options.put("sort_columns", Strings.mkString(sortColumns, ","));
}
- return DataLoadingUtil.buildCarbonLoadModelJava(table, options);
+ CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
+ return builder.build(options);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5740b19/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
index 531ec7c..aca2b2d 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
@@ -77,7 +77,7 @@ public class CSVCarbonWriterSuite {
writer.write(new String[]{"robot" + i, String.valueOf(i), String.valueOf((double) i / 2)});
}
writer.close();
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c5740b19/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index 6316d84..bc7b042 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -34,10 +34,9 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.util.DataLoadingUtil
import org.apache.carbondata.streaming.segment.StreamSegment
/**
@@ -209,17 +208,15 @@ object StreamSinkFactory {
segmentId: String): CarbonLoadModel = {
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")
- val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, parameters)
+ val optionsFinal = LoadOption.fillOptionWithDefaultValue(parameters.asJava)
optionsFinal.put("sort_scope", "no_sort")
if (parameters.get("fileheader").isEmpty) {
optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName)
.asScala.map(_.getColName).mkString(","))
}
val carbonLoadModel = new CarbonLoadModel()
- DataLoadingUtil.buildCarbonLoadModel(
- carbonTable,
- carbonProperty,
- parameters,
+ new CarbonLoadModelBuilder(carbonTable).build(
+ parameters.asJava,
optionsFinal,
carbonLoadModel,
hadoopConf)