You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/12 08:07:07 UTC
[1/3] carbondata git commit: [CARBONDATA-2159] Remove carbon-spark
dependency in store-sdk module
Repository: carbondata
Updated Branches:
refs/heads/carbonstore a848ccff8 -> 0d50f6546
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
new file mode 100644
index 0000000..fbb93b6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.Strings;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.constants.LoggerAction;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
+import org.apache.carbondata.processing.loading.sort.SortScopeOptions;
+import org.apache.carbondata.processing.util.TableOptionConstant;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Builder for {@link CarbonLoadModel}
+ */
+@InterfaceAudience.Developer
+public class CarbonLoadModelBuilder {
+
+ private CarbonTable table;
+
+ public CarbonLoadModelBuilder(CarbonTable table) {
+ this.table = table;
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @return a new CarbonLoadModel instance
+ */
+ public CarbonLoadModel build(
+ Map<String, String> options) throws InvalidLoadOptionException, IOException {
+ Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options);
+ optionsFinal.put("sort_scope", "no_sort");
+ if (!options.containsKey("fileheader")) {
+ List<CarbonColumn> csvHeader = table.getCreateOrderColumn(table.getTableName());
+ String[] columns = new String[csvHeader.size()];
+ for (int i = 0; i < columns.length; i++) {
+ columns[i] = csvHeader.get(i).getColName();
+ }
+ optionsFinal.put("fileheader", Strings.mkString(columns, ","));
+ }
+ CarbonLoadModel model = new CarbonLoadModel();
+
+ // we have provided 'fileheader', so it hadoopConf can be null
+ build(options, optionsFinal, model, null);
+
+ // set default values
+ model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT);
+ model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT);
+ model.setUseOnePass(Boolean.parseBoolean(Maps.getOrDefault(options, "onepass", "false")));
+ model.setDictionaryServerHost(Maps.getOrDefault(options, "dicthost", null));
+ try {
+ model.setDictionaryServerPort(Integer.parseInt(Maps.getOrDefault(options, "dictport", "-1")));
+ } catch (NumberFormatException e) {
+ throw new InvalidLoadOptionException(e.getMessage());
+ }
+ return model;
+ }
+
+ /**
+ * build CarbonLoadModel for data loading
+ * @param options Load options from user input
+ * @param optionsFinal Load options that populated with default values for optional options
+ * @param carbonLoadModel The output load model
+ * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
+ * user provided load options
+ */
+ public void build(
+ Map<String, String> options,
+ Map<String, String> optionsFinal,
+ CarbonLoadModel carbonLoadModel,
+ Configuration hadoopConf) throws InvalidLoadOptionException, IOException {
+ carbonLoadModel.setTableName(table.getTableName());
+ carbonLoadModel.setDatabaseName(table.getDatabaseName());
+ carbonLoadModel.setTablePath(table.getTablePath());
+ carbonLoadModel.setTableName(table.getTableName());
+ CarbonDataLoadSchema dataLoadSchema = new CarbonDataLoadSchema(table);
+ // Need to fill dimension relation
+ carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema);
+ String sort_scope = optionsFinal.get("sort_scope");
+ String single_pass = optionsFinal.get("single_pass");
+ String bad_records_logger_enable = optionsFinal.get("bad_records_logger_enable");
+ String bad_records_action = optionsFinal.get("bad_records_action");
+ String bad_record_path = optionsFinal.get("bad_record_path");
+ String global_sort_partitions = optionsFinal.get("global_sort_partitions");
+ String timestampformat = optionsFinal.get("timestampformat");
+ String dateFormat = optionsFinal.get("dateformat");
+ String delimeter = optionsFinal.get("delimiter");
+ String complex_delimeter_level1 = optionsFinal.get("complex_delimiter_level_1");
+ String complex_delimeter_level2 = optionsFinal.get("complex_delimiter_level_2");
+ String all_dictionary_path = optionsFinal.get("all_dictionary_path");
+ String column_dict = optionsFinal.get("columndict");
+ validateDateTimeFormat(timestampformat, "TimestampFormat");
+ validateDateTimeFormat(dateFormat, "DateFormat");
+ validateSortScope(sort_scope);
+
+ if (Boolean.parseBoolean(bad_records_logger_enable) ||
+ LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
+ bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path);
+ if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
+ throw new InvalidLoadOptionException("Invalid bad records location.");
+ }
+ }
+ carbonLoadModel.setBadRecordsLocation(bad_record_path);
+
+ validateGlobalSortPartitions(global_sort_partitions);
+ carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal.get("escapechar"), "\\"));
+ carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal.get("quotechar"), "\""));
+ carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal.get("commentchar"), "#"));
+
+ // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
+ // we should use table schema to generate file header.
+ String fileHeader = optionsFinal.get("fileheader");
+ String headerOption = options.get("header");
+ if (headerOption != null) {
+ if (!headerOption.equalsIgnoreCase("true") &&
+ !headerOption.equalsIgnoreCase("false")) {
+ throw new InvalidLoadOptionException(
+ "'header' option should be either 'true' or 'false'.");
+ }
+ // whether the csv file has file header, the default value is true
+ if (Boolean.valueOf(headerOption)) {
+ if (!StringUtils.isEmpty(fileHeader)) {
+ throw new InvalidLoadOptionException(
+ "When 'header' option is true, 'fileheader' option is not required.");
+ }
+ } else {
+ if (StringUtils.isEmpty(fileHeader)) {
+ List<CarbonColumn> columns = table.getCreateOrderColumn(table.getTableName());
+ String[] columnNames = new String[columns.size()];
+ for (int i = 0; i < columnNames.length; i++) {
+ columnNames[i] = columns.get(i).getColName();
+ }
+ fileHeader = Strings.mkString(columnNames, ",");
+ }
+ }
+ }
+
+ carbonLoadModel.setTimestampformat(timestampformat);
+ carbonLoadModel.setDateFormat(dateFormat);
+ carbonLoadModel.setDefaultTimestampFormat(
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+
+ carbonLoadModel.setDefaultDateFormat(
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+
+ carbonLoadModel.setSerializationNullFormat(
+ TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName() + "," +
+ optionsFinal.get("serialization_null_format"));
+
+ carbonLoadModel.setBadRecordsLoggerEnable(
+ TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName() + "," + bad_records_logger_enable);
+
+ carbonLoadModel.setBadRecordsAction(
+ TableOptionConstant.BAD_RECORDS_ACTION.getName() + "," + bad_records_action.toUpperCase());
+
+ carbonLoadModel.setIsEmptyDataBadRecord(
+ DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
+ optionsFinal.get("is_empty_data_bad_record"));
+
+ carbonLoadModel.setSkipEmptyLine(optionsFinal.get("skip_empty_line"));
+
+ carbonLoadModel.setSortScope(sort_scope);
+ carbonLoadModel.setBatchSortSizeInMb(optionsFinal.get("batch_sort_size_inmb"));
+ carbonLoadModel.setGlobalSortPartitions(global_sort_partitions);
+ carbonLoadModel.setUseOnePass(Boolean.parseBoolean(single_pass));
+
+ if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
+ complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
+ delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
+ throw new InvalidLoadOptionException("Field Delimiter and Complex types delimiter are same");
+ } else {
+ carbonLoadModel.setComplexDelimiterLevel1(
+ CarbonUtil.delimiterConverter(complex_delimeter_level1));
+ carbonLoadModel.setComplexDelimiterLevel2(
+ CarbonUtil.delimiterConverter(complex_delimeter_level2));
+ }
+ // set local dictionary path, and dictionary file extension
+ carbonLoadModel.setAllDictPath(all_dictionary_path);
+ carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter));
+ carbonLoadModel.setCsvHeader(fileHeader);
+ carbonLoadModel.setColDictFilePath(column_dict);
+ carbonLoadModel.setCsvHeaderColumns(
+ LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf));
+
+ int validatedMaxColumns = validateMaxColumns(
+ carbonLoadModel.getCsvHeaderColumns(),
+ optionsFinal.get("maxcolumns"));
+
+ carbonLoadModel.setMaxColumns(String.valueOf(validatedMaxColumns));
+ carbonLoadModel.readAndSetLoadMetadataDetails();
+ }
+
+ private int validateMaxColumns(String[] csvHeaders, String maxColumns)
+ throws InvalidLoadOptionException {
+ /*
+ User configures both csvheadercolumns, maxcolumns,
+ if csvheadercolumns >= maxcolumns, give error
+ if maxcolumns > threashold, give error
+ User configures csvheadercolumns
+ if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
+ if csvheadercolumns >= threashold, give error
+ User configures nothing
+ if csvheadercolumns >= maxcolumns(default) then maxcolumns = csvheadercolumns+1
+ if csvheadercolumns >= threashold, give error
+ */
+ int columnCountInSchema = csvHeaders.length;
+ int maxNumberOfColumnsForParsing = 0;
+ Integer maxColumnsInt = getMaxColumnValue(maxColumns);
+ if (maxColumnsInt != null) {
+ if (columnCountInSchema >= maxColumnsInt) {
+ throw new InvalidLoadOptionException(
+ "csv headers should be less than the max columns " + maxColumnsInt);
+ } else if (maxColumnsInt > CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+ throw new InvalidLoadOptionException(
+ "max columns cannot be greater than the threshold value: " +
+ CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
+ } else {
+ maxNumberOfColumnsForParsing = maxColumnsInt;
+ }
+ } else if (columnCountInSchema >= CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+ throw new InvalidLoadOptionException(
+ "csv header columns should be less than max threashold: " +
+ CSVInputFormat.THRESHOLD_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
+ } else if (columnCountInSchema >= CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING) {
+ maxNumberOfColumnsForParsing = columnCountInSchema + 1;
+ } else {
+ maxNumberOfColumnsForParsing = CSVInputFormat.DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING;
+ }
+ return maxNumberOfColumnsForParsing;
+ }
+
+ private Integer getMaxColumnValue(String maxColumn) {
+ return (maxColumn == null) ? null : Integer.parseInt(maxColumn);
+ }
+
+ /**
+ * validates both timestamp and date for illegal values
+ */
+ private void validateDateTimeFormat(String dateTimeLoadFormat, String dateTimeLoadOption)
+ throws InvalidLoadOptionException {
+ // allowing empty value to be configured for dateformat option.
+ if (dateTimeLoadFormat != null && !dateTimeLoadFormat.trim().equalsIgnoreCase("")) {
+ try {
+ new SimpleDateFormat(dateTimeLoadFormat);
+ } catch (IllegalArgumentException e) {
+ throw new InvalidLoadOptionException(
+ "Error: Wrong option: " + dateTimeLoadFormat + " is provided for option "
+ + dateTimeLoadOption);
+ }
+ }
+ }
+
+ private void validateSortScope(String sortScope) throws InvalidLoadOptionException {
+ if (sortScope != null) {
+ // Don't support use global sort on partitioned table.
+ if (table.getPartitionInfo(table.getTableName()) != null &&
+ sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString())) {
+ throw new InvalidLoadOptionException("Don't support use global sort on partitioned table.");
+ }
+ }
+ }
+
+ private void validateGlobalSortPartitions(String globalSortPartitions)
+ throws InvalidLoadOptionException {
+ if (globalSortPartitions != null) {
+ try {
+ int num = Integer.parseInt(globalSortPartitions);
+ if (num <= 0) {
+ throw new InvalidLoadOptionException("'GLOBAL_SORT_PARTITIONS' should be greater than 0");
+ }
+ } catch (NumberFormatException e) {
+ throw new InvalidLoadOptionException(e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * check whether using default value or not
+ */
+ private String checkDefaultValue(String value, String defaultValue) {
+ if (StringUtils.isEmpty(value)) {
+ return defaultValue;
+ } else {
+ return value;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
new file mode 100644
index 0000000..8ec93a9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.model;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.carbondata.common.Maps;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+import org.apache.carbondata.processing.util.CarbonLoaderUtil;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Provide utilities to populate loading options
+ */
+@InterfaceAudience.Developer
+public class LoadOption {
+
+ private static LogService LOG = LogServiceFactory.getLogService(LoadOption.class.getName());
+
+ /**
+ * Based on the input options, fill and return data loading options with default value
+ */
+ public static Map<String, String> fillOptionWithDefaultValue(
+ Map<String, String> options) throws InvalidLoadOptionException {
+ Map<String, String> optionsFinal = new HashMap<>();
+ optionsFinal.put("delimiter", Maps.getOrDefault(options, "delimiter", ","));
+ optionsFinal.put("quotechar", Maps.getOrDefault(options, "quotechar", "\""));
+ optionsFinal.put("fileheader", Maps.getOrDefault(options, "fileheader", ""));
+ optionsFinal.put("commentchar", Maps.getOrDefault(options, "commentchar", "#"));
+ optionsFinal.put("columndict", Maps.getOrDefault(options, "columndict", null));
+
+ optionsFinal.put(
+ "escapechar",
+ CarbonLoaderUtil.getEscapeChar(Maps.getOrDefault(options,"escapechar", "\\")));
+
+ optionsFinal.put(
+ "serialization_null_format",
+ Maps.getOrDefault(options, "serialization_null_format", "\\N"));
+
+ optionsFinal.put(
+ "bad_records_logger_enable",
+ Maps.getOrDefault(
+ options,
+ "bad_records_logger_enable",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)));
+
+ String badRecordActionValue = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
+ CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT);
+
+ optionsFinal.put(
+ "bad_records_action",
+ Maps.getOrDefault(
+ options,
+ "bad_records_action",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
+ badRecordActionValue)));
+
+ optionsFinal.put(
+ "is_empty_data_bad_record",
+ Maps.getOrDefault(
+ options,
+ "is_empty_data_bad_record",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)));
+
+ optionsFinal.put(
+ "skip_empty_line",
+ Maps.getOrDefault(
+ options,
+ "skip_empty_line",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)));
+
+ optionsFinal.put(
+ "all_dictionary_path",
+ Maps.getOrDefault(options, "all_dictionary_path", ""));
+
+ optionsFinal.put(
+ "complex_delimiter_level_1",
+ Maps.getOrDefault(options,"complex_delimiter_level_1", "\\$"));
+
+ optionsFinal.put(
+ "complex_delimiter_level_2",
+ Maps.getOrDefault(options, "complex_delimiter_level_2", "\\:"));
+
+ optionsFinal.put(
+ "dateformat",
+ Maps.getOrDefault(
+ options,
+ "dateformat",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)));
+
+ optionsFinal.put(
+ "timestampformat",
+ Maps.getOrDefault(
+ options,
+ "timestampformat",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)));
+
+ optionsFinal.put(
+ "global_sort_partitions",
+ Maps.getOrDefault(
+ options,
+ "global_sort_partitions",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
+ null)));
+
+ optionsFinal.put("maxcolumns", Maps.getOrDefault(options, "maxcolumns", null));
+
+ optionsFinal.put(
+ "batch_sort_size_inmb",
+ Maps.getOrDefault(
+ options,
+ "batch_sort_size_inmb",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
+ CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))));
+
+ optionsFinal.put(
+ "bad_record_path",
+ Maps.getOrDefault(
+ options,
+ "bad_record_path",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
+ CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC,
+ CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))));
+
+ String useOnePass = Maps.getOrDefault(
+ options,
+ "single_pass",
+ CarbonProperties.getInstance().getProperty(
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
+ CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim().toLowerCase();
+
+ boolean singlePass;
+
+ if (useOnePass.equalsIgnoreCase("true")) {
+ singlePass = true;
+ } else {
+ // when single_pass = false and if either alldictionarypath
+ // or columnDict is configured the do not allow load
+ if (StringUtils.isNotEmpty(optionsFinal.get("all_dictionary_path")) ||
+ StringUtils.isNotEmpty(optionsFinal.get("columndict"))) {
+ throw new InvalidLoadOptionException(
+ "Can not use all_dictionary_path or columndict without single_pass.");
+ } else {
+ singlePass = false;
+ }
+ }
+
+ optionsFinal.put("single_pass", String.valueOf(singlePass));
+ return optionsFinal;
+ }
+
+ /**
+ * Return CSV header field names
+ */
+ public static String[] getCsvHeaderColumns(
+ CarbonLoadModel carbonLoadModel,
+ Configuration hadoopConf) throws IOException {
+ String delimiter;
+ if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) {
+ delimiter = CarbonCommonConstants.COMMA;
+ } else {
+ delimiter = CarbonUtil.delimiterConverter(carbonLoadModel.getCsvDelimiter());
+ }
+ String csvFile = null;
+ String csvHeader = carbonLoadModel.getCsvHeader();
+ String[] csvColumns;
+ if (StringUtils.isBlank(csvHeader)) {
+ // read header from csv file
+ csvFile = carbonLoadModel.getFactFilePath().split(",")[0];
+ csvHeader = CarbonUtil.readHeader(csvFile, hadoopConf);
+ if (StringUtils.isBlank(csvHeader)) {
+ throw new CarbonDataLoadingException("First line of the csv is not valid.");
+ }
+ String[] headers = csvHeader.toLowerCase().split(delimiter);
+ csvColumns = new String[headers.length];
+ for (int i = 0; i < csvColumns.length; i++) {
+ csvColumns[i] = headers[i].replaceAll("\"", "").trim();
+ }
+ } else {
+ String[] headers = csvHeader.toLowerCase().split(CarbonCommonConstants.COMMA);
+ csvColumns = new String[headers.length];
+ for (int i = 0; i < csvColumns.length; i++) {
+ csvColumns[i] = headers[i].trim();
+ }
+ }
+
+ if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
+ carbonLoadModel.getCarbonDataLoadSchema())) {
+ if (csvFile == null) {
+ LOG.error("CSV header in DDL is not proper."
+ + " Column names in schema and CSV header are not the same.");
+ throw new CarbonDataLoadingException(
+ "CSV header in DDL is not proper. Column names in schema and CSV header are "
+ + "not the same.");
+ } else {
+ LOG.error(
+ "CSV header in input file is not proper. Column names in schema and csv header are not "
+ + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile));
+ throw new CarbonDataLoadingException(
+ "CSV header in input file is not proper. Column names in schema and csv header are not "
+ + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile));
+ }
+ }
+ return csvColumns;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index c2f4501..6843946 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -16,14 +16,9 @@
*/
package org.apache.carbondata.processing.util;
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
import java.util.*;
import org.apache.carbondata.common.logging.LogService;
@@ -39,9 +34,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.impl.FileFactory.FileType;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.core.fileoperations.FileWriteOperation;
import org.apache.carbondata.core.locks.CarbonLockUtil;
import org.apache.carbondata.core.locks.ICarbonLock;
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -57,9 +49,8 @@ import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
import org.apache.carbondata.processing.merger.NodeBlockRelation;
import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
-import static org.apache.carbondata.core.enums.EscapeSequences.*;
-import com.google.gson.Gson;
+import static org.apache.carbondata.core.enums.EscapeSequences.*;
public final class CarbonLoaderUtil {
@@ -287,49 +278,6 @@ public final class CarbonLoaderUtil {
loadMetadataDetails.setLoadStartTime(loadStartTime);
}
- public static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
- List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
- String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
-
- DataOutputStream dataOutputStream;
- Gson gsonObjectToWrite = new Gson();
- BufferedWriter brWriter = null;
-
- AtomicFileOperations writeOperation =
- new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
-
- try {
-
- dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
- brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
-
- String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
- brWriter.write(metadataInstance);
- } finally {
- try {
- if (null != brWriter) {
- brWriter.flush();
- }
- } catch (Exception e) {
- LOGGER.error("error in flushing ");
-
- }
- CarbonUtil.closeStreams(brWriter);
- writeOperation.close();
- }
-
- }
-
- public static String readCurrentTime() {
- SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
- String date = null;
-
- date = sdf.format(new Date());
-
- return date;
- }
-
public static boolean isValidEscapeSequence(String escapeChar) {
return escapeChar.equalsIgnoreCase(NEW_LINE.getName()) ||
escapeChar.equalsIgnoreCase(CARRIAGE_RETURN.getName()) ||
@@ -445,17 +393,6 @@ public final class CarbonLoaderUtil {
}
/**
- * This method will divide the blocks among the nodes as per the data locality
- *
- * @param blockInfos
- * @return
- */
- public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
- // -1 if number of nodes has to be decided based on block location information
- return nodeBlockMapping(blockInfos, -1);
- }
-
- /**
* the method returns the number of required executors
*
* @param blockInfos
@@ -830,25 +767,6 @@ public final class CarbonLoaderUtil {
CarbonUtil.checkAndCreateFolder(segmentFolder);
}
- /**
- * This will update the old table status details before clean files to the latest table status.
- * @param oldList
- * @param newList
- * @return
- */
- public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
- LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
-
- List<LoadMetadataDetails> newListMetadata =
- new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
- for (LoadMetadataDetails oldSegment : oldList) {
- if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
- newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
- }
- }
- return newListMetadata;
- }
-
/*
* This method will add data size and index size into tablestatus for each segment. And also
* returns the size of the segment.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
deleted file mode 100644
index 1fdce32..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.util;
-
-import java.io.IOException;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.locks.CarbonLockFactory;
-import org.apache.carbondata.core.locks.ICarbonLock;
-import org.apache.carbondata.core.locks.LockUsage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatus;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-public final class DeleteLoadFolders {
-
- private static final LogService LOGGER =
- LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
-
- private DeleteLoadFolders() {
-
- }
-
- /**
- * returns segment path
- *
- * @param identifier
- * @param oneLoad
- * @return
- */
- private static String getSegmentPath(AbsoluteTableIdentifier identifier,
- LoadMetadataDetails oneLoad) {
- String segmentId = oneLoad.getLoadName();
- return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
- }
-
- public static void physicalFactAndMeasureMetadataDeletion(
- AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete) {
- LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
- for (LoadMetadataDetails oneLoad : currentDetails) {
- if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
- String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
- boolean status = false;
- try {
- if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
- CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
- CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
-
- @Override public boolean accept(CarbonFile file) {
- return (CarbonTablePath.isCarbonDataFile(file.getName())
- || CarbonTablePath.isCarbonIndexFile(file.getName())
- || CarbonTablePath.isPartitionMapFile(file.getName()));
- }
- });
-
- //if there are no fact and msr metadata files present then no need to keep
- //entry in metadata.
- if (filesToBeDeleted.length == 0) {
- status = true;
- } else {
-
- for (CarbonFile eachFile : filesToBeDeleted) {
- if (!eachFile.delete()) {
- LOGGER.warn("Unable to delete the file as per delete command " + eachFile
- .getAbsolutePath());
- status = false;
- } else {
- status = true;
- }
- }
- }
- // need to delete the complete folder.
- if (status) {
- if (!file.delete()) {
- LOGGER.warn(
- "Unable to delete the folder as per delete command " + file.getAbsolutePath());
- }
- }
-
- } else {
- LOGGER.warn("Files are not found in segment " + path
- + " it seems, files are already being deleted");
- }
- } catch (IOException e) {
- LOGGER.warn("Unable to delete the file as per delete command " + path);
- }
- }
- }
- }
-
- private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
- boolean isForceDelete) {
- if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
- SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() ||
- SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() ||
- SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus())
- && oneLoad.getVisibility().equalsIgnoreCase("true")) {
- if (isForceDelete) {
- return true;
- }
- long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
-
- return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
-
- }
-
- return false;
- }
-
- private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
- boolean isForceDelete) {
- if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
- SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) {
- if (isForceDelete) {
- return true;
- }
- long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
-
- return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
-
- }
-
- return false;
- }
-
- private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
- String metadataPath) {
- LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
- for (LoadMetadataDetails oneLoad : currentDetails) {
- if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) {
- return oneLoad;
- }
- }
- return null;
- }
-
- public static boolean deleteLoadFoldersFromFileSystem(
- AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
- LoadMetadataDetails[] details, String metadataPath) {
- boolean isDeleted = false;
- if (details != null && details.length != 0) {
- for (LoadMetadataDetails oneLoad : details) {
- if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
- ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
- CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
- try {
- if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
- || oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
- if (segmentLock.lockWithRetries(1, 5)) {
- LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName());
- LoadMetadataDetails currentDetails =
- getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath);
- if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails,
- isForceDelete)) {
- oneLoad.setVisibility("false");
- isDeleted = true;
- LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
- }
- } else {
- LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName());
- return isDeleted;
- }
- } else {
- oneLoad.setVisibility("false");
- isDeleted = true;
- LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
- }
- } finally {
- segmentLock.unlock();
- LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released");
- }
- }
- }
- }
- return isDeleted;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/store/sdk/pom.xml
----------------------------------------------------------------------
diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml
index 6663683..51d2cf9 100644
--- a/store/sdk/pom.xml
+++ b/store/sdk/pom.xml
@@ -21,7 +21,7 @@
<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-spark-common</artifactId>
+ <artifactId>carbondata-hadoop</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 51ca09c..e06200a 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -28,6 +28,7 @@ import java.util.Objects;
import org.apache.carbondata.common.Strings;
import org.apache.carbondata.common.annotations.InterfaceAudience;
import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.metadata.CarbonMetadata;
import org.apache.carbondata.core.metadata.converter.SchemaConverter;
@@ -40,7 +41,7 @@ import org.apache.carbondata.core.metadata.schema.table.TableSchemaBuilder;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import org.apache.carbondata.core.writer.ThriftWriter;
import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.spark.util.DataLoadingUtil;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModelBuilder;
/**
* Biulder for {@link CarbonWriter}
@@ -94,9 +95,9 @@ public class CarbonWriterBuilder {
}
/**
- * Build a {@link CSVCarbonWriter}, which accepts row in CSV format
+ * Build a {@link CarbonWriter}, which accepts row in CSV format
*/
- public CarbonWriter buildWriterForCSVInput() throws IOException {
+ public CarbonWriter buildWriterForCSVInput() throws IOException, InvalidLoadOptionException {
Objects.requireNonNull(schema, "schema should not be null");
Objects.requireNonNull(path, "path should not be null");
@@ -113,7 +114,7 @@ public class CarbonWriterBuilder {
}
/**
- * Build a {@link AvroCarbonWriter}, which accepts Avro object
+ * Build a {@link CarbonWriter}, which accepts Avro object
* @return
* @throws IOException
*/
@@ -184,11 +185,13 @@ public class CarbonWriterBuilder {
/**
* Build a {@link CarbonLoadModel}
*/
- private CarbonLoadModel buildLoadModel(CarbonTable table) {
+ private CarbonLoadModel buildLoadModel(CarbonTable table)
+ throws InvalidLoadOptionException, IOException {
Map<String, String> options = new HashMap<>();
if (sortColumns != null) {
options.put("sort_columns", Strings.mkString(sortColumns, ","));
}
- return DataLoadingUtil.buildCarbonLoadModelJava(table, options);
+ CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
+ return builder.build(options);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
index 531ec7c..aca2b2d 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterSuite.java
@@ -77,7 +77,7 @@ public class CSVCarbonWriterSuite {
writer.write(new String[]{"robot" + i, String.valueOf(i), String.valueOf((double) i / 2)});
}
writer.close();
- } catch (IOException e) {
+ } catch (Exception e) {
e.printStackTrace();
Assert.fail(e.getMessage());
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
index c417fbe..843a9d1 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala
@@ -34,10 +34,9 @@ import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.util.DataLoadingUtil
import org.apache.carbondata.streaming.segment.StreamSegment
/**
@@ -208,17 +207,15 @@ object StreamSinkFactory {
segmentId: String): CarbonLoadModel = {
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
carbonProperty.addProperty("zookeeper.enable.lock", "false")
- val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, parameters)
+ val optionsFinal = LoadOption.fillOptionWithDefaultValue(parameters.asJava)
optionsFinal.put("sort_scope", "no_sort")
if (parameters.get("fileheader").isEmpty) {
optionsFinal.put("fileheader", carbonTable.getCreateOrderColumn(carbonTable.getTableName)
.asScala.map(_.getColName).mkString(","))
}
val carbonLoadModel = new CarbonLoadModel()
- DataLoadingUtil.buildCarbonLoadModel(
- carbonTable,
- carbonProperty,
- parameters,
+ new CarbonLoadModelBuilder(carbonTable).build(
+ parameters.asJava,
optionsFinal,
carbonLoadModel,
hadoopConf)
[2/3] carbondata git commit: [CARBONDATA-2159] Remove carbon-spark
dependency in store-sdk module
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
deleted file mode 100644
index 9c5ab69..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ /dev/null
@@ -1,451 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import scala.collection.{immutable, mutable}
-import scala.collection.JavaConverters._
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.sql.util.CarbonException
-
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.ValidateUtil
-
-/**
- * the util object of data loading
- */
-object DataLoadingUtil {
-
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- /**
- * get data loading options and initialise default value
- */
- def getDataLoadingOptions(
- carbonProperty: CarbonProperties,
- options: immutable.Map[String, String]): mutable.Map[String, String] = {
- val optionsFinal = scala.collection.mutable.Map[String, String]()
- optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
- optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
- optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
- optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
- optionsFinal.put("columndict", options.getOrElse("columndict", null))
-
- optionsFinal.put("escapechar",
- CarbonLoaderUtil.getEscapeChar(options.getOrElse("escapechar", "\\")))
-
- optionsFinal.put(
- "serialization_null_format",
- options.getOrElse("serialization_null_format", "\\N"))
-
- optionsFinal.put(
- "bad_records_logger_enable",
- options.getOrElse(
- "bad_records_logger_enable",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
-
- val badRecordActionValue = carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-
- optionsFinal.put(
- "bad_records_action",
- options.getOrElse(
- "bad_records_action",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
- badRecordActionValue)))
-
- optionsFinal.put(
- "is_empty_data_bad_record",
- options.getOrElse(
- "is_empty_data_bad_record",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
-
- optionsFinal.put(
- "skip_empty_line",
- options.getOrElse(
- "skip_empty_line",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)))
-
- optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
-
- optionsFinal.put(
- "complex_delimiter_level_1",
- options.getOrElse("complex_delimiter_level_1", "\\$"))
-
- optionsFinal.put(
- "complex_delimiter_level_2",
- options.getOrElse("complex_delimiter_level_2", "\\:"))
-
- optionsFinal.put(
- "dateformat",
- options.getOrElse(
- "dateformat",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
- CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
-
- optionsFinal.put(
- "timestampformat",
- options.getOrElse(
- "timestampformat",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
- CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)))
-
- optionsFinal.put(
- "global_sort_partitions",
- options.getOrElse(
- "global_sort_partitions",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
- null)))
-
- optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
-
- optionsFinal.put(
- "batch_sort_size_inmb",
- options.getOrElse(
- "batch_sort_size_inmb",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
- carbonProperty.getProperty(
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
-
- optionsFinal.put(
- "bad_record_path",
- options.getOrElse(
- "bad_record_path",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
-
- val useOnePass = options.getOrElse(
- "single_pass",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
- CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
- case "true" =>
- true
- case "false" =>
- // when single_pass = false and if either alldictionarypath
- // or columnDict is configured the do not allow load
- if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
- StringUtils.isNotEmpty(optionsFinal("columndict"))) {
- throw new MalformedCarbonCommandException(
- "Can not use all_dictionary_path or columndict without single_pass.")
- } else {
- false
- }
- case illegal =>
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- LOGGER.error(s"Can't use single_pass, because illegal syntax found: [$illegal] " +
- "Please set it as 'true' or 'false'")
- false
- }
- optionsFinal.put("single_pass", useOnePass.toString)
- optionsFinal
- }
-
- /**
- * check whether using default value or not
- */
- private def checkDefaultValue(value: String, default: String) = {
- if (StringUtils.isEmpty(value)) {
- default
- } else {
- value
- }
- }
-
- /**
- * build CarbonLoadModel for data loading
- * @param table CarbonTable object containing all metadata information for the table
- * like table name, table path, schema, etc
- * @param options Load options from user input
- * @return a new CarbonLoadModel instance
- */
- def buildCarbonLoadModelJava(
- table: CarbonTable,
- options: java.util.Map[String, String]
- ): CarbonLoadModel = {
- val carbonProperty: CarbonProperties = CarbonProperties.getInstance
- val optionsFinal = getDataLoadingOptions(carbonProperty, options.asScala.toMap)
- optionsFinal.put("sort_scope", "no_sort")
- if (!options.containsKey("fileheader")) {
- val csvHeader = table.getCreateOrderColumn(table.getTableName)
- .asScala.map(_.getColName).mkString(",")
- optionsFinal.put("fileheader", csvHeader)
- }
- val model = new CarbonLoadModel()
- buildCarbonLoadModel(
- table = table,
- carbonProperty = carbonProperty,
- options = options.asScala.toMap,
- optionsFinal = optionsFinal,
- carbonLoadModel = model,
- hadoopConf = null) // we have provided 'fileheader', so it can be null
-
- // set default values
- model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
- model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
- model.setUseOnePass(options.asScala.getOrElse("onepass", "false").toBoolean)
- model.setDictionaryServerHost(options.asScala.getOrElse("dicthost", null))
- model.setDictionaryServerPort(options.asScala.getOrElse("dictport", "-1").toInt)
- model
- }
-
- /**
- * build CarbonLoadModel for data loading
- * @param table CarbonTable object containing all metadata information for the table
- * like table name, table path, schema, etc
- * @param carbonProperty Carbon property instance
- * @param options Load options from user input
- * @param optionsFinal Load options that populated with default values for optional options
- * @param carbonLoadModel The output load model
- * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
- * user provided load options
- */
- def buildCarbonLoadModel(
- table: CarbonTable,
- carbonProperty: CarbonProperties,
- options: immutable.Map[String, String],
- optionsFinal: mutable.Map[String, String],
- carbonLoadModel: CarbonLoadModel,
- hadoopConf: Configuration): Unit = {
- carbonLoadModel.setTableName(table.getTableName)
- carbonLoadModel.setDatabaseName(table.getDatabaseName)
- carbonLoadModel.setTablePath(table.getTablePath)
- carbonLoadModel.setTableName(table.getTableName)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- val sort_scope = optionsFinal("sort_scope")
- val single_pass = optionsFinal("single_pass")
- val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
- val bad_records_action = optionsFinal("bad_records_action")
- var bad_record_path = optionsFinal("bad_record_path")
- val global_sort_partitions = optionsFinal("global_sort_partitions")
- val timestampformat = optionsFinal("timestampformat")
- val dateFormat = optionsFinal("dateformat")
- val delimeter = optionsFinal("delimiter")
- val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
- val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
- val all_dictionary_path = optionsFinal("all_dictionary_path")
- val column_dict = optionsFinal("columndict")
- ValidateUtil.validateDateTimeFormat(timestampformat, "TimestampFormat")
- ValidateUtil.validateDateTimeFormat(dateFormat, "DateFormat")
- ValidateUtil.validateSortScope(table, sort_scope)
-
- if (bad_records_logger_enable.toBoolean ||
- LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
- bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path)
- if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
- CarbonException.analysisException("Invalid bad records location.")
- }
- }
- carbonLoadModel.setBadRecordsLocation(bad_record_path)
-
- ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
- carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
- carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
- carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
-
- // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
- // we should use table schema to generate file header.
- var fileHeader = optionsFinal("fileheader")
- val headerOption = options.get("header")
- if (headerOption.isDefined) {
- // whether the csv file has file header
- // the default value is true
- val header = try {
- headerOption.get.toBoolean
- } catch {
- case ex: IllegalArgumentException =>
- throw new MalformedCarbonCommandException(
- "'header' option should be either 'true' or 'false'. " + ex.getMessage)
- }
- if (header) {
- if (fileHeader.nonEmpty) {
- throw new MalformedCarbonCommandException(
- "When 'header' option is true, 'fileheader' option is not required.")
- }
- } else {
- if (fileHeader.isEmpty) {
- fileHeader = table.getCreateOrderColumn(table.getTableName)
- .asScala.map(_.getColName).mkString(",")
- }
- }
- }
-
- carbonLoadModel.setTimestampformat(timestampformat)
- carbonLoadModel.setDateFormat(dateFormat)
- carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
- CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-
- carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_DATE_FORMAT,
- CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
-
- carbonLoadModel.setSerializationNullFormat(
- TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
- optionsFinal("serialization_null_format"))
-
- carbonLoadModel.setBadRecordsLoggerEnable(
- TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
-
- carbonLoadModel.setBadRecordsAction(
- TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action.toUpperCase)
-
- carbonLoadModel.setIsEmptyDataBadRecord(
- DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
- optionsFinal("is_empty_data_bad_record"))
-
- carbonLoadModel.setSkipEmptyLine(optionsFinal("skip_empty_line"))
-
- carbonLoadModel.setSortScope(sort_scope)
- carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
- carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
- carbonLoadModel.setUseOnePass(single_pass.toBoolean)
-
- if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
- complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
- delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
- CarbonException.analysisException(s"Field Delimiter and Complex types delimiter are same")
- } else {
- carbonLoadModel.setComplexDelimiterLevel1(
- CarbonUtil.delimiterConverter(complex_delimeter_level1))
- carbonLoadModel.setComplexDelimiterLevel2(
- CarbonUtil.delimiterConverter(complex_delimeter_level2))
- }
- // set local dictionary path, and dictionary file extension
- carbonLoadModel.setAllDictPath(all_dictionary_path)
- carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
- carbonLoadModel.setCsvHeader(fileHeader)
- carbonLoadModel.setColDictFilePath(column_dict)
- carbonLoadModel.setCsvHeaderColumns(
- CommonUtil.getCsvHeaderColumns(carbonLoadModel, hadoopConf))
-
- val validatedMaxColumns = CommonUtil.validateMaxColumns(
- carbonLoadModel.getCsvHeaderColumns,
- optionsFinal("maxcolumns"))
-
- carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
- if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
- }
- }
-
- private def isLoadDeletionRequired(metaDataLocation: String): Boolean = {
- val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
- if (details != null && details.nonEmpty) for (oneRow <- details) {
- if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
- SegmentStatus.COMPACTED == oneRow.getSegmentStatus ||
- SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus ||
- SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus) &&
- oneRow.getVisibility.equalsIgnoreCase("true")) {
- return true
- }
- }
- false
- }
-
- def deleteLoadsAndUpdateMetadata(
- isForceDeletion: Boolean,
- carbonTable: CarbonTable): Unit = {
- if (isLoadDeletionRequired(carbonTable.getMetadataPath)) {
- val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
- val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
- val carbonTableStatusLock =
- CarbonLockFactory.getCarbonLockObj(
- absoluteTableIdentifier,
- LockUsage.TABLE_STATUS_LOCK
- )
-
- // Delete marked loads
- val isUpdationRequired =
- DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
- absoluteTableIdentifier,
- isForceDeletion,
- details,
- carbonTable.getMetadataPath
- )
-
- var updationCompletionStaus = false
-
- if (isUpdationRequired) {
- try {
- // Update load metadate file after cleaning deleted nodes
- if (carbonTableStatusLock.lockWithRetries()) {
- LOGGER.info("Table status lock has been successfully acquired.")
-
- // read latest table status again.
- val latestMetadata = SegmentStatusManager
- .readLoadMetadata(carbonTable.getMetadataPath)
-
- // update the metadata details from old to new status.
- val latestStatus = CarbonLoaderUtil
- .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
- CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus)
- } else {
- val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
- val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
- val errorMsg = "Clean files request is failed for " +
- s"$dbName.$tableName" +
- ". Not able to acquire the table status lock due to other operation " +
- "running in the background."
- LOGGER.audit(errorMsg)
- LOGGER.error(errorMsg)
- throw new Exception(errorMsg + " Please try after some time.")
- }
- updationCompletionStaus = true
- } finally {
- CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
- }
- if (updationCompletionStaus) {
- DeleteLoadFolders
- .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier,
- carbonTable.getMetadataPath, isForceDeletion)
- }
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index bbf345c..bcb7bd9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -321,7 +321,7 @@ object GlobalDictionaryUtil {
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
// get load count
if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+ carbonLoadModel.readAndSetLoadMetadataDetails()
}
val absoluteTableIdentifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, table)
DictionaryLoadModel(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 3da603b..2dcff81 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.PartitionUtils
import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f47c9bc..c7dd553 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -163,7 +163,7 @@ object CarbonDataRDDFactory {
if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
// update the updated table status. For the case of Update Delta Compaction the Metadata
// is filled in LoadModel, no need to refresh.
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+ carbonLoadModel.readAndSetLoadMetadataDetails()
}
val compactionThread = new Thread {
@@ -282,7 +282,7 @@ object CarbonDataRDDFactory {
loadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
loadModel.setTablePath(table.getTablePath)
- CommonUtil.readLoadMetadataDetails(loadModel)
+ loadModel.readAndSetLoadMetadataDetails()
val loadStartTime = CarbonUpdateUtil.readCurrentTime()
loadModel.setFactTimeStamp(loadStartTime)
loadModel
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index ddc8586..1e19111 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -77,7 +77,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
}
// scan again and determine if anything is there to merge again.
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+ carbonLoadModel.readAndSetLoadMetadataDetails()
segList = carbonLoadModel.getLoadMetadataDetails
// in case of major compaction we will scan only once and come out as it will keep
// on doing major for the new loads also.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index e61b636..5dcca6d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -35,13 +35,13 @@ import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CarbonException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index c4d32b4..e93ab25 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -22,9 +22,9 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException}
/**
* Below command class will be used to create datamap on table
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 1fa2494..e5db286 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException}
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -34,7 +35,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.events._
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, NoSuchDataMapException}
/**
* Drops the datamap and any related tables associated with the datamap
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index febb83e..18c1339 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -24,12 +24,13 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel, DataCommand}
+import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.AlterTableUtil
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -44,7 +45,6 @@ import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCo
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.util.CommonUtil
import org.apache.carbondata.streaming.StreamHandoffRDD
@@ -89,8 +89,6 @@ case class CarbonAlterTableCompactionCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService =
LogServiceFactory.getLogService(this.getClass.getName)
- val tableName = alterTableModel.tableName.toLowerCase
- val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
if (isLoadInProgress) {
val message = "Cannot run data loading and compaction on same table concurrently. " +
@@ -146,8 +144,7 @@ case class CarbonAlterTableCompactionCommand(
operationContext: OperationContext): Unit = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
- val compactionSize: Long = CarbonDataMergerUtil
- .getCompactionSize(compactionType, carbonLoadModel)
+ val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel)
if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
if (alterTableModel.segmentUpdateStatusManager.isDefined) {
carbonLoadModel.setSegmentUpdateStatusManager(
@@ -162,7 +159,7 @@ case class CarbonAlterTableCompactionCommand(
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+ carbonLoadModel.readAndSetLoadMetadataDetails()
}
if (compactionType == CompactionType.STREAMING) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index c7b59d4..f105778 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -68,13 +69,12 @@ import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.exception.NoRetryException
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
case class CarbonLoadDataCommand(
databaseNameOp: Option[String],
@@ -148,7 +148,7 @@ case class CarbonLoadDataCommand(
val carbonLoadModel = new CarbonLoadModel()
try {
val tableProperties = table.getTableInfo.getFactTable.getTableProperties
- val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+ val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
@@ -163,10 +163,8 @@ case class CarbonLoadDataCommand(
carbonLoadModel.setAggLoadRequest(
internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
- DataLoadingUtil.buildCarbonLoadModel(
- table,
- carbonProperty,
- options,
+ new CarbonLoadModelBuilder(table).build(
+ options.asJava,
optionsFinal,
carbonLoadModel,
hadoopConf
@@ -183,7 +181,7 @@ case class CarbonLoadDataCommand(
carbonLoadModel,
factPath,
dataFrame.isDefined,
- optionsFinal.asJava,
+ optionsFinal,
options.asJava,
isOverwriteTable)
operationContext.setProperty("isOverwrite", isOverwriteTable)
@@ -191,7 +189,7 @@ case class CarbonLoadDataCommand(
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
// Clean up the old invalid segment data before creating a new entry for new load.
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false)
// add the start entry for the new load in the table status file
if (updateModel.isEmpty && !table.isHivePartitionTable) {
CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 874d416..50c5eca 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,13 +22,13 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
/**
* IUD update delete and compaction framework.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 2f12bef..bbea15b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.storage.StorageLevel
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -32,7 +33,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
private[sql] case class CarbonProjectForUpdateCommand(
plan: LogicalPlan,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
index 5817d88..220d75d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
@@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.HiveSessionCatalog
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
* Util for IUD common function
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index fed4235..bf72325 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -33,7 +33,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.spark.util.DataLoadingUtil
/**
* Below command class will be used to create pre-aggregate table
@@ -182,7 +181,7 @@ case class CreatePreAggregateTableCommand(
// This will be used to check if the parent table has any segments or not. If not then no
// need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
// table.
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, parentTable)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index feef7a1..7e3b80e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.DataType
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchem
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 7a56dbf..fc780cb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
import org.apache.spark.util.AlterTableUtil
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -37,7 +38,6 @@ import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
private[sql] case class CarbonAlterTableRenameCommand(
alterTableRenameModel: AlterTableRenameModel)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 987d4fe..9e0cee5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -18,12 +18,12 @@ package org.apache.spark.sql.execution.command.timeseries
import org.apache.spark.sql.execution.command.{DataMapField, Field}
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
import org.apache.carbondata.core.metadata.schema.datamap.Granularity
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.preagg.TimeSeriesUDF
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException}
/**
* Utility class for time series to keep
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 945f47f..072216c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -46,8 +46,8 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
class CarbonFileFormat
extends FileFormat
@@ -82,7 +82,7 @@ with Serializable {
TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
val model = new CarbonLoadModel
val carbonProperty = CarbonProperties.getInstance()
- val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+ val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
val tableProperties = table.getTableInfo.getFactTable.getTableProperties
optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
@@ -97,14 +97,11 @@ with Serializable {
val optionsLocal = new mutable.HashMap[String, String]()
optionsLocal ++= options
optionsLocal += (("header", "false"))
- DataLoadingUtil.buildCarbonLoadModel(
- table,
- carbonProperty,
- optionsLocal.toMap,
+ new CarbonLoadModelBuilder(table).build(
+ optionsLocal.toMap.asJava,
optionsFinal,
model,
- conf
- )
+ conf)
model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
model.setDictionaryServerHost(options.getOrElse("dicthost", null))
model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index b174b94..b36d7c0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -32,10 +32,10 @@ import org.apache.spark.sql.execution.datasources.RefreshTable
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.merger.CompactionType
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
* Carbon strategies for ddl commands
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 608ec60..7028dcf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command.AlterTableRenameCommand
import org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, CarbonProjectForUpdateCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
/**
* Strategy for streaming table, like blocking unsupported operation
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 7ca34af..27c7d17 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
extends RunnableCommand {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 4045478..542115e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -36,9 +36,9 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index ad6d0c7..ef4836e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -32,10 +32,10 @@ import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.hadoop.util.SchemaReader
import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index bc36e9c..aaa87a3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog}
import org.apache.spark.sql.hive.HiveExternalCatalog._
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -39,7 +40,6 @@ import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
object AlterTableUtil {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
index bc62902..a8094b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
@@ -19,10 +19,10 @@ package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
* table api util
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index 3c151f0..a0dd7d9 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -43,10 +43,10 @@ import org.apache.spark.sql.parser.{CarbonHelperSqlAstBuilder, CarbonSpark2SqlPa
import org.apache.spark.sql.types.DecimalType
import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CarbonScalaUtil
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index c676b01..5ade510 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -4,7 +4,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
/**
* Created by rahul on 19/9/17.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 71c5477..0b37e46 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.util.TableOptionConstant
/**
@@ -63,7 +63,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
carbonLoadModel.setCsvHeaderColumns(
- CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
+ LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
// Create table and metadata folders if not exist
val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
val fileType = FileFactory.getFileType(metadataDirectoryPath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index c0e1781..6149e82 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -23,14 +23,14 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.util.TableOptionConstant
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
* test case for external column dictionary generation
@@ -176,7 +176,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
carbonLoadModel.setCsvHeaderColumns(
- CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
+ LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
carbonLoadModel.setMaxColumns("100")
// Create table and metadata folders if not exist
val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 2552ca8..afb34b9 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -31,12 +31,12 @@ import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types.StructType
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 9da7244..65a006b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -21,11 +21,11 @@ import org.apache.spark.sql.common.util.Spark2QueryTest
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index d36dd26..b035834 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -26,9 +26,9 @@ import org.apache.spark.sql.common.util.Spark2QueryTest
import org.apache.spark.sql.test.TestQueryExecutor
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index fef2da6..31c5b27 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -18,14 +18,16 @@
package org.apache.carbondata.processing.loading.model;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-
+import org.apache.carbondata.core.util.path.CarbonTablePath;
public class CarbonLoadModel implements Serializable {
@@ -785,4 +787,13 @@ public class CarbonLoadModel implements Serializable {
public void setSkipEmptyLine(String skipEmptyLine) {
this.skipEmptyLine = skipEmptyLine;
}
+
+ /**
+ * Read segments metadata from table status file and set it to this load model object
+ */
+ public void readAndSetLoadMetadataDetails() {
+ String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+ LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metadataPath);
+ setLoadMetadataDetails(Arrays.asList(details));
+ }
}
[3/3] carbondata git commit: [CARBONDATA-2159] Remove carbon-spark
dependency in store-sdk module
Posted by ja...@apache.org.
[CARBONDATA-2159] Remove carbon-spark dependency in store-sdk module
To make assembling JAR of store-sdk module, it should not depend on carbon-spark module
This closes #1970
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/0d50f654
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/0d50f654
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/0d50f654
Branch: refs/heads/carbonstore
Commit: 0d50f65461ae3855db66f44fa06e01174de50ccd
Parents: a848ccf
Author: Jacky Li <ja...@qq.com>
Authored: Sun Feb 11 21:37:04 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Mon Feb 12 16:06:49 2018 +0800
----------------------------------------------------------------------
.../java/org/apache/carbondata/common/Maps.java | 39 ++
.../org/apache/carbondata/common/Strings.java | 3 +
.../ConcurrentOperationException.java | 56 +++
.../exceptions/TableStatusLockException.java | 34 ++
.../sql/InvalidLoadOptionException.java | 33 ++
.../sql/MalformedCarbonCommandException.java | 75 +++
.../sql/MalformedDataMapCommandException.java | 37 ++
.../exceptions/sql/NoSuchDataMapException.java | 39 ++
.../statusmanager/SegmentStatusManager.java | 124 +++++
.../carbondata/core/util/DeleteLoadFolders.java | 200 ++++++++
.../preaggregate/TestPreAggCreateCommand.scala | 2 +-
.../preaggregate/TestPreAggregateDrop.scala | 2 +-
.../timeseries/TestTimeSeriesCreateTable.scala | 2 +-
.../timeseries/TestTimeSeriesDropSuite.scala | 2 +-
.../TestTimeseriesTableSelection.scala | 2 +-
.../TestDataLoadWithColumnsMoreThanSchema.scala | 3 +-
.../dataload/TestGlobalSortDataLoad.scala | 2 +-
.../TestLoadDataWithDiffTimestampFormat.scala | 2 +-
.../TestLoadDataWithFileHeaderException.scala | 11 +-
...ataWithMalformedCarbonCommandException.scala | 3 +-
.../testsuite/dataload/TestLoadOptions.scala | 2 +-
.../dataload/TestTableLevelBlockSize.scala | 4 +-
.../testsuite/datamap/TestDataMapCommand.scala | 2 +-
.../dataretention/DataRetentionTestCase.scala | 2 +-
.../spark/testsuite/datetype/DateTypeTest.scala | 2 +-
.../testsuite/sortcolumns/TestSortColumns.scala | 3 +-
integration/spark-common/pom.xml | 5 -
.../exception/ConcurrentOperationException.java | 44 --
.../MalformedCarbonCommandException.java | 69 ---
.../MalformedDataMapCommandException.java | 32 --
.../spark/exception/NoSuchDataMapException.java | 33 --
.../org/apache/carbondata/api/CarbonStore.scala | 6 +-
.../spark/CarbonColumnValidator.scala | 8 +-
.../carbondata/spark/load/ValidateUtil.scala | 71 ---
.../carbondata/spark/rdd/CarbonMergerRDD.scala | 6 +-
.../carbondata/spark/util/CommonUtil.scala | 56 +--
.../carbondata/spark/util/DataLoadingUtil.scala | 451 -------------------
.../spark/util/GlobalDictionaryUtil.scala | 2 +-
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 2 +-
.../spark/rdd/CarbonDataRDDFactory.scala | 4 +-
.../spark/rdd/CarbonTableCompactor.scala | 2 +-
.../org/apache/spark/sql/CarbonSource.scala | 2 +-
.../datamap/CarbonCreateDataMapCommand.scala | 2 +-
.../datamap/CarbonDropDataMapCommand.scala | 2 +-
.../CarbonAlterTableCompactionCommand.scala | 11 +-
.../management/CarbonLoadDataCommand.scala | 18 +-
.../CarbonProjectForDeleteCommand.scala | 2 +-
.../CarbonProjectForUpdateCommand.scala | 2 +-
.../command/mutation/IUDCommonUtil.scala | 2 +-
.../CreatePreAggregateTableCommand.scala | 3 +-
.../preaaggregate/PreAggregateUtil.scala | 2 +-
.../schema/CarbonAlterTableRenameCommand.scala | 2 +-
.../command/timeseries/TimeSeriesUtil.scala | 2 +-
.../datasources/CarbonFileFormat.scala | 15 +-
.../sql/execution/strategy/DDLStrategy.scala | 2 +-
.../strategy/StreamingTableStrategy.scala | 2 +-
.../execution/command/CarbonHiveCommands.scala | 2 +-
.../sql/parser/CarbonSpark2SqlParser.scala | 2 +-
.../spark/sql/parser/CarbonSparkSqlParser.scala | 2 +-
.../org/apache/spark/util/AlterTableUtil.scala | 2 +-
.../org/apache/spark/util/TableAPIUtil.scala | 2 +-
.../spark/sql/hive/CarbonSessionState.scala | 2 +-
.../segmentreading/TestSegmentReading.scala | 2 +-
.../spark/util/AllDictionaryTestCase.scala | 4 +-
.../util/ExternalColumnDictionaryTestCase.scala | 6 +-
.../TestStreamingTableOperation.scala | 2 +-
.../bucketing/TableBucketingTestCase.scala | 2 +-
.../vectorreader/AddColumnTestCases.scala | 2 +-
.../loading/model/CarbonLoadModel.java | 13 +-
.../loading/model/CarbonLoadModelBuilder.java | 322 +++++++++++++
.../processing/loading/model/LoadOption.java | 251 +++++++++++
.../processing/util/CarbonLoaderUtil.java | 84 +---
.../processing/util/DeleteLoadFolders.java | 200 --------
store/sdk/pom.xml | 2 +-
.../sdk/file/CarbonWriterBuilder.java | 15 +-
.../sdk/file/CSVCarbonWriterSuite.java | 2 +-
.../streaming/StreamSinkFactory.scala | 11 +-
77 files changed, 1330 insertions(+), 1146 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/Maps.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/Maps.java b/common/src/main/java/org/apache/carbondata/common/Maps.java
new file mode 100644
index 0000000..14fc329
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/Maps.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common;
+
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+@InterfaceAudience.Developer
+public class Maps {
+
+ /**
+ * Return value if key is contained in the map, else return defauleValue.
+ * This is added to avoid JDK 8 dependency
+ */
+ public static <K, V> V getOrDefault(Map<K, V> map, K key, V defaultValue) {
+ V value = map.get(key);
+ if (value != null) {
+ return value;
+ } else {
+ return defaultValue;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/Strings.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/Strings.java b/common/src/main/java/org/apache/carbondata/common/Strings.java
index 23288dd..08fdc3c 100644
--- a/common/src/main/java/org/apache/carbondata/common/Strings.java
+++ b/common/src/main/java/org/apache/carbondata/common/Strings.java
@@ -19,6 +19,9 @@ package org.apache.carbondata.common;
import java.util.Objects;
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+@InterfaceAudience.Developer
public class Strings {
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java
new file mode 100644
index 0000000..a14d161
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown when executing concurrent operations which
+ * is not supported in carbon.
+ *
+ * For example, when INSERT OVERWRITE is executing, other operations are not
+ * allowed, so this exception will be thrown
+ */
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class ConcurrentOperationException extends Exception {
+
+ /**
+ * The Error message.
+ */
+ private String msg;
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public ConcurrentOperationException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ /**
+ * getMessage
+ */
+ public String getMessage() {
+ return this.msg;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java
new file mode 100644
index 0000000..89cfd46
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/TableStatusLockException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown when failed to acquire lock for table status metadata,
+ * or re-try timed out
+ */
+@InterfaceAudience.User
+@InterfaceStability.Evolving
+public class TableStatusLockException extends RuntimeException {
+
+ public TableStatusLockException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java
new file mode 100644
index 0000000..41b2434
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/InvalidLoadOptionException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions.sql;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown when loading option is invalid for SQL
+ * loading statement (LOAD DATA, INSERT INTO)
+ */
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class InvalidLoadOptionException extends MalformedCarbonCommandException {
+ public InvalidLoadOptionException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java
new file mode 100644
index 0000000..5fe3ce8
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedCarbonCommandException.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions.sql;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown if any validation fails then parsing
+ * SQL statement.
+ */
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class MalformedCarbonCommandException extends Exception {
+
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * The Error message.
+ */
+ private String msg = "";
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public MalformedCarbonCommandException(String msg) {
+ super(msg);
+ this.msg = msg;
+ }
+
+ /**
+ * Constructor
+ *
+ * @param msg The error message for this exception.
+ */
+ public MalformedCarbonCommandException(String msg, Throwable t) {
+ super(msg, t);
+ this.msg = msg;
+ }
+
+ /**
+ * getLocalizedMessage
+ */
+ @Override
+ public String getLocalizedMessage() {
+ return super.getLocalizedMessage();
+ }
+
+ /**
+ * getMessage
+ */
+ public String getMessage() {
+ return this.msg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java
new file mode 100644
index 0000000..7c25b2c
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/MalformedDataMapCommandException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions.sql;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown when Datamap related SQL statement is invalid
+ */
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class MalformedDataMapCommandException extends MalformedCarbonCommandException {
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ public MalformedDataMapCommandException(String msg) {
+ super(msg);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java
new file mode 100644
index 0000000..7ab9048
--- /dev/null
+++ b/common/src/main/java/org/apache/carbondata/common/exceptions/sql/NoSuchDataMapException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.common.exceptions.sql;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+
+/**
+ * This exception will be thrown if datamap is not found when executing datamap
+ * related SQL statement
+ */
+@InterfaceAudience.User
+@InterfaceStability.Stable
+public class NoSuchDataMapException extends MalformedCarbonCommandException {
+
+ /**
+ * default serial version ID.
+ */
+ private static final long serialVersionUID = 1L;
+
+ public NoSuchDataMapException(String dataMapName, String tableName) {
+ super("Datamap with name " + dataMapName + " does not exist under table " + tableName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index c613735..2e73aef 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import org.apache.carbondata.common.exceptions.TableStatusLockException;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -46,6 +47,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DeleteLoadFolders;
import org.apache.carbondata.core.util.path.CarbonTablePath;
import com.google.gson.Gson;
@@ -708,4 +710,126 @@ public class SegmentStatusManager {
}
}
+ private static boolean isLoadDeletionRequired(String metaDataLocation) {
+ LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+ if (details != null && details.length > 0) {
+ for (LoadMetadataDetails oneRow : details) {
+ if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus()
+ || SegmentStatus.COMPACTED == oneRow.getSegmentStatus()
+ || SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus()
+ || SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus())
+ && oneRow.getVisibility().equalsIgnoreCase("true")) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * This will update the old table status details before clean files to the latest table status.
+ * @param oldList
+ * @param newList
+ * @return
+ */
+ public static List<LoadMetadataDetails> updateLoadMetadataFromOldToNew(
+ LoadMetadataDetails[] oldList, LoadMetadataDetails[] newList) {
+
+ List<LoadMetadataDetails> newListMetadata =
+ new ArrayList<LoadMetadataDetails>(Arrays.asList(newList));
+ for (LoadMetadataDetails oldSegment : oldList) {
+ if ("false".equalsIgnoreCase(oldSegment.getVisibility())) {
+ newListMetadata.get(newListMetadata.indexOf(oldSegment)).setVisibility("false");
+ }
+ }
+ return newListMetadata;
+ }
+
+ private static void writeLoadMetadata(AbsoluteTableIdentifier identifier,
+ List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
+ String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
+
+ DataOutputStream dataOutputStream;
+ Gson gsonObjectToWrite = new Gson();
+ BufferedWriter brWriter = null;
+
+ AtomicFileOperations writeOperation =
+ new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
+
+ try {
+
+ dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
+ brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
+ Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
+ String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
+ brWriter.write(metadataInstance);
+ } finally {
+ try {
+ if (null != brWriter) {
+ brWriter.flush();
+ }
+ } catch (Exception e) {
+ LOG.error("error in flushing ");
+
+ }
+ CarbonUtil.closeStreams(brWriter);
+ writeOperation.close();
+ }
+ }
+
+ public static void deleteLoadsAndUpdateMetadata(
+ CarbonTable carbonTable,
+ boolean isForceDeletion) throws IOException {
+ if (isLoadDeletionRequired(carbonTable.getMetadataPath())) {
+ LoadMetadataDetails[] details =
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+ AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+ ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
+ identifier, LockUsage.TABLE_STATUS_LOCK);
+
+ // Delete marked loads
+ boolean isUpdationRequired = DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
+ identifier, isForceDeletion, details, carbonTable.getMetadataPath());
+
+ boolean updationCompletionStatus = false;
+
+ if (isUpdationRequired) {
+ try {
+ // Update load metadate file after cleaning deleted nodes
+ if (carbonTableStatusLock.lockWithRetries()) {
+ LOG.info("Table status lock has been successfully acquired.");
+
+ // read latest table status again.
+ LoadMetadataDetails[] latestMetadata =
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+
+ // update the metadata details from old to new status.
+ List<LoadMetadataDetails> latestStatus =
+ updateLoadMetadataFromOldToNew(details, latestMetadata);
+
+ writeLoadMetadata(identifier, latestStatus);
+ } else {
+ String dbName = identifier.getCarbonTableIdentifier().getDatabaseName();
+ String tableName = identifier.getCarbonTableIdentifier().getTableName();
+ String errorMsg = "Clean files request is failed for " +
+ dbName + "." + tableName +
+ ". Not able to acquire the table status lock due to other operation " +
+ "running in the background.";
+ LOG.audit(errorMsg);
+ LOG.error(errorMsg);
+ throw new TableStatusLockException(errorMsg + " Please try after some time.");
+ }
+ updationCompletionStatus = true;
+ } finally {
+ CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
+ if (updationCompletionStatus) {
+ DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
+ identifier, carbonTable.getMetadataPath(), isForceDeletion);
+ }
+ }
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
new file mode 100644
index 0000000..ba4c4fc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+public final class DeleteLoadFolders {
+
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(DeleteLoadFolders.class.getName());
+
+ private DeleteLoadFolders() {
+
+ }
+
+ /**
+ * returns segment path
+ *
+ * @param identifier
+ * @param oneLoad
+ * @return
+ */
+ private static String getSegmentPath(AbsoluteTableIdentifier identifier,
+ LoadMetadataDetails oneLoad) {
+ String segmentId = oneLoad.getLoadName();
+ return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
+ }
+
+ public static void physicalFactAndMeasureMetadataDeletion(
+ AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete) {
+ LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
+ for (LoadMetadataDetails oneLoad : currentDetails) {
+ if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
+ String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
+ boolean status = false;
+ try {
+ if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+ CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+ CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+ @Override public boolean accept(CarbonFile file) {
+ return (CarbonTablePath.isCarbonDataFile(file.getName())
+ || CarbonTablePath.isCarbonIndexFile(file.getName())
+ || CarbonTablePath.isPartitionMapFile(file.getName()));
+ }
+ });
+
+ //if there are no fact and msr metadata files present then no need to keep
+ //entry in metadata.
+ if (filesToBeDeleted.length == 0) {
+ status = true;
+ } else {
+
+ for (CarbonFile eachFile : filesToBeDeleted) {
+ if (!eachFile.delete()) {
+ LOGGER.warn("Unable to delete the file as per delete command " + eachFile
+ .getAbsolutePath());
+ status = false;
+ } else {
+ status = true;
+ }
+ }
+ }
+ // need to delete the complete folder.
+ if (status) {
+ if (!file.delete()) {
+ LOGGER.warn(
+ "Unable to delete the folder as per delete command " + file.getAbsolutePath());
+ }
+ }
+
+ } else {
+ LOGGER.warn("Files are not found in segment " + path
+ + " it seems, files are already being deleted");
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Unable to delete the file as per delete command " + path);
+ }
+ }
+ }
+ }
+
+ private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
+ boolean isForceDelete) {
+ if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
+ SegmentStatus.COMPACTED == oneLoad.getSegmentStatus() ||
+ SegmentStatus.INSERT_IN_PROGRESS == oneLoad.getSegmentStatus() ||
+ SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneLoad.getSegmentStatus())
+ && oneLoad.getVisibility().equalsIgnoreCase("true")) {
+ if (isForceDelete) {
+ return true;
+ }
+ long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
+
+ return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
+
+ }
+
+ return false;
+ }
+
+ private static boolean checkIfLoadCanBeDeletedPhysically(LoadMetadataDetails oneLoad,
+ boolean isForceDelete) {
+ if ((SegmentStatus.MARKED_FOR_DELETE == oneLoad.getSegmentStatus() ||
+ SegmentStatus.COMPACTED == oneLoad.getSegmentStatus())) {
+ if (isForceDelete) {
+ return true;
+ }
+ long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
+
+ return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
+
+ }
+
+ return false;
+ }
+
+ private static LoadMetadataDetails getCurrentLoadStatusOfSegment(String segmentId,
+ String metadataPath) {
+ LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
+ for (LoadMetadataDetails oneLoad : currentDetails) {
+ if (oneLoad.getLoadName().equalsIgnoreCase(segmentId)) {
+ return oneLoad;
+ }
+ }
+ return null;
+ }
+
+ public static boolean deleteLoadFoldersFromFileSystem(
+ AbsoluteTableIdentifier absoluteTableIdentifier, boolean isForceDelete,
+ LoadMetadataDetails[] details, String metadataPath) {
+ boolean isDeleted = false;
+ if (details != null && details.length != 0) {
+ for (LoadMetadataDetails oneLoad : details) {
+ if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
+ ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+ CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
+ try {
+ if (oneLoad.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
+ || oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
+ if (segmentLock.lockWithRetries(1, 5)) {
+ LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName());
+ LoadMetadataDetails currentDetails =
+ getCurrentLoadStatusOfSegment(oneLoad.getLoadName(), metadataPath);
+ if (currentDetails != null && checkIfLoadCanBeDeleted(currentDetails,
+ isForceDelete)) {
+ oneLoad.setVisibility("false");
+ isDeleted = true;
+ LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
+ }
+ } else {
+ LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName());
+ return isDeleted;
+ }
+ } else {
+ oneLoad.setVisibility("false");
+ isDeleted = true;
+ LOGGER.info("Info: Deleted the load " + oneLoad.getLoadName());
+ }
+ } finally {
+ segmentLock.unlock();
+ LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released");
+ }
+ }
+ }
+ }
+ return isDeleted;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 23132de..b3bf93d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -26,10 +26,10 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.metadata.encoder.Encoding
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
index 1138adf..db0fb3f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.spark.exception.NoSuchDataMapException
+import org.apache.carbondata.common.exceptions.sql.NoSuchDataMapException
class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index 0ca7cb9..97aa056 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -19,8 +19,8 @@ package org.apache.carbondata.integration.spark.testsuite.timeseries
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
-import org.apache.carbondata.spark.exception.{MalformedDataMapCommandException, MalformedCarbonCommandException}
class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
index 545c4de..5fe21e8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesDropSuite.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.integration.spark.testsuite.timeseries
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
class TestTimeSeriesDropSuite extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
index 3065952..3f140df 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesTableSelection.scala
@@ -24,8 +24,8 @@ import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.util.SparkUtil4Test
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class TestTimeseriesTableSelection extends QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
index 1532328..4e5ebbb 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithColumnsMoreThanSchema.scala
@@ -18,9 +18,10 @@
package org.apache.carbondata.spark.testsuite.dataload
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
/**
* This class will test data load in which number of columns in data are more than
* the number of columns in schema
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 5e5eed5..aacd28b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -23,13 +23,13 @@ import org.apache.commons.io.FileUtils
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.spark.sql.Row
import org.apache.spark.sql.execution.BatchedDataSourceScanExec
import org.apache.spark.sql.test.TestQueryExecutor.projectPath
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.spark.rdd.CarbonScanRDD
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
index ec6fff1..c06d782 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithDiffTimestampFormat.scala
@@ -25,10 +25,10 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.spark.sql.test.util.QueryTest
import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
class TestLoadDataWithDiffTimestampFormat extends QueryTest with BeforeAndAfterAll {
val bad_records_action = CarbonProperties.getInstance()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
index 7700ed5..edcdd51 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithFileHeaderException.scala
@@ -20,6 +20,9 @@ package org.apache.carbondata.spark.testsuite.dataload
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
+
class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterAll{
override def beforeAll {
sql("DROP TABLE IF EXISTS t3")
@@ -32,7 +35,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
}
test("test load data both file and ddl without file header exception") {
- val e = intercept[Exception] {
+ val e = intercept[CarbonDataLoadingException] {
sql(
s"""LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3""")
}
@@ -41,7 +44,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
}
test("test load data ddl provided wrong file header exception") {
- val e = intercept[Exception] {
+ val e = intercept[CarbonDataLoadingException] {
sql(
s"""
LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3
@@ -52,7 +55,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
}
test("test load data with wrong header , but without fileheader") {
- val e = intercept[Exception] {
+ val e = intercept[InvalidLoadOptionException] {
sql(
s"""
LOAD DATA LOCAL INPATH '$resourcesPath/source.csv' into table t3
@@ -63,7 +66,7 @@ class TestLoadDataWithFileHeaderException extends QueryTest with BeforeAndAfterA
}
test("test load data with wrong header and fileheader") {
- val e = intercept[Exception] {
+ val e = intercept[InvalidLoadOptionException] {
sql(
s"""
LOAD DATA LOCAL INPATH '$resourcesPath/source_without_header.csv' into table t3
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
index 1851705..6759049 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithMalformedCarbonCommandException.scala
@@ -18,9 +18,10 @@
package org.apache.carbondata.spark.testsuite.dataload
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
class TestLoadDataWithMalformedCarbonCommandException extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
index d2c7e63..4ec9335 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadOptions.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
class TestLoadOptions extends QueryTest with BeforeAndAfterAll{
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala
index a77b210..f6a049a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestTableLevelBlockSize.scala
@@ -19,11 +19,13 @@ package org.apache.carbondata.spark.testsuite.dataload
import org.apache.spark.sql.Row
import org.scalatest.BeforeAndAfterAll
+
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
/**
* Test Class for table block size
*
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 5170c43..37007ed 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -23,11 +23,11 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.spark.exception.MalformedDataMapCommandException
class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
index 7c82f75..a70584b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala
@@ -29,9 +29,9 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala
index e2df07c..b9b01f8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datetype/DateTypeTest.scala
@@ -16,10 +16,10 @@
*/
package org.apache.carbondata.spark.testsuite.datetype
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
class DateTypeTest extends QueryTest with BeforeAndAfterAll{
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
index 7c288b3..dd1aab8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
@@ -21,10 +21,11 @@ import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.util.SparkUtil4Test
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+
class TestSortColumns extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common/pom.xml b/integration/spark-common/pom.xml
index d40e213..e80593b 100644
--- a/integration/spark-common/pom.xml
+++ b/integration/spark-common/pom.xml
@@ -36,11 +36,6 @@
<dependencies>
<dependency>
<groupId>org.apache.carbondata</groupId>
- <artifactId>carbondata-processing</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-hadoop</artifactId>
<version>${project.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
deleted file mode 100644
index 1f3c07d..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.exception;
-
-public class ConcurrentOperationException extends Exception {
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public ConcurrentOperationException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
deleted file mode 100644
index 9f441d3..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedCarbonCommandException.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.exception;
-
-// After parsing carbon query successfully , if any validation fails then
-// use MalformedCarbonCommandException
-public class MalformedCarbonCommandException extends Exception {
-
-
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- /**
- * The Error message.
- */
- private String msg = "";
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public MalformedCarbonCommandException(String msg) {
- super(msg);
- this.msg = msg;
- }
-
- /**
- * Constructor
- *
- * @param msg The error message for this exception.
- */
- public MalformedCarbonCommandException(String msg, Throwable t) {
- super(msg, t);
- this.msg = msg;
- }
-
- /**
- * getLocalizedMessage
- */
- @Override
- public String getLocalizedMessage() {
- return super.getLocalizedMessage();
- }
-
- /**
- * getMessage
- */
- public String getMessage() {
- return this.msg;
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java
deleted file mode 100644
index a05d8e6..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/MalformedDataMapCommandException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.exception;
-
-/**
- * Throw exception when using illegal argument
- */
-public class MalformedDataMapCommandException extends MalformedCarbonCommandException {
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- public MalformedDataMapCommandException(String msg) {
- super(msg);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java
deleted file mode 100644
index 959e70d..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/NoSuchDataMapException.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.exception;
-
-/**
- * if the dataMap does not exist, carbon should throw NoSuchDataMapException
- */
-public class NoSuchDataMapException extends MalformedCarbonCommandException {
-
- /**
- * default serial version ID.
- */
- private static final long serialVersionUID = 1L;
-
- public NoSuchDataMapException(String dataMapName, String tableName) {
- super("Datamap with name " + dataMapName + " does not exist under table " + tableName);
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index b98bddf..b89d49d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -34,8 +35,6 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, PartitionMa
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.util.DataLoadingUtil
object CarbonStore {
private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -138,8 +137,7 @@ object CarbonStore {
carbonCleanFilesLock =
CarbonLockUtil
.getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(
- isForceDeletion = true, carbonTable)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
currentTablePartitions match {
case Some(partitions) =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
index ad624ee..578138f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonColumnValidator.scala
@@ -16,12 +16,12 @@
*/
package org.apache.carbondata.spark
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
- /**
- * Carbon column validator
- */
+/**
+ * Carbon column validator
+ */
class CarbonColumnValidator extends ColumnValidator {
def validateColumns(allColumns: Seq[ColumnSchema]) {
allColumns.foreach { columnSchema =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
deleted file mode 100644
index dfda92c..0000000
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/ValidateUtil.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.load
-
-import java.text.SimpleDateFormat
-
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.processing.loading.sort.SortScopeOptions
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-object ValidateUtil {
-
- /**
- * validates both timestamp and date for illegal values
- *
- * @param dateTimeLoadFormat
- * @param dateTimeLoadOption
- */
- def validateDateTimeFormat(dateTimeLoadFormat: String, dateTimeLoadOption: String): Unit = {
- // allowing empty value to be configured for dateformat option.
- if (dateTimeLoadFormat != null && dateTimeLoadFormat.trim != "") {
- try {
- new SimpleDateFormat(dateTimeLoadFormat)
- } catch {
- case _: IllegalArgumentException =>
- throw new MalformedCarbonCommandException(s"Error: Wrong option: $dateTimeLoadFormat is" +
- s" provided for option $dateTimeLoadOption")
- }
- }
- }
-
- def validateSortScope(carbonTable: CarbonTable, sortScope: String): Unit = {
- if (sortScope != null) {
- // Don't support use global sort on partitioned table.
- if (carbonTable.getPartitionInfo(carbonTable.getTableName) != null &&
- sortScope.equalsIgnoreCase(SortScopeOptions.SortScope.GLOBAL_SORT.toString)) {
- throw new MalformedCarbonCommandException("Don't support use global sort on partitioned " +
- "table.")
- }
- }
- }
-
- def validateGlobalSortPartitions(globalSortPartitions: String): Unit = {
- if (globalSortPartitions != null) {
- try {
- val num = globalSortPartitions.toInt
- if (num <= 0) {
- throw new MalformedCarbonCommandException("'GLOBAL_SORT_PARTITIONS' should be greater " +
- "than 0.")
- }
- } catch {
- case e: NumberFormatException => throw new MalformedCarbonCommandException(e.getMessage)
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index fa126fc..d2c059c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -310,11 +310,11 @@ class CarbonMergerRDD[K, V](
val splits = format.getSplits(job)
// keep on assigning till last one is reached.
- if (null != splits && splits.size > 0) splitsOfLastSegment =
- splits.asScala
+ if (null != splits && splits.size > 0) {
+ splitsOfLastSegment = splits.asScala
.map(_.asInstanceOf[CarbonInputSplit])
.filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
-
+ }
carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
val blockInfo = new TableBlockInfo(entry.getPath.toString,
entry.getStart, entry.getSegmentId,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/0d50f654/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 90a4223..8cb8205 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types.{MetadataBuilder, StringType}
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.FileUtils
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -53,8 +54,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD
object CommonUtil {
@@ -632,13 +632,6 @@ object CommonUtil {
parsedPropertyValueString
}
-
- def readLoadMetadataDetails(model: CarbonLoadModel): Unit = {
- val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath
- val details = SegmentStatusManager.readLoadMetadata(metadataPath)
- model.setLoadMetadataDetails(new util.ArrayList[LoadMetadataDetails](details.toList.asJava))
- }
-
def configureCSVInputFormat(configuration: Configuration,
carbonLoadModel: CarbonLoadModel): Unit = {
CSVInputFormat.setCommentCharacter(configuration, carbonLoadModel.getCommentChar)
@@ -680,48 +673,6 @@ object CommonUtil {
}
}
- def getCsvHeaderColumns(
- carbonLoadModel: CarbonLoadModel,
- hadoopConf: Configuration): Array[String] = {
- val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
- CarbonCommonConstants.COMMA
- } else {
- CarbonUtil.delimiterConverter(carbonLoadModel.getCsvDelimiter)
- }
- var csvFile: String = null
- var csvHeader: String = carbonLoadModel.getCsvHeader
- val csvColumns = if (StringUtils.isBlank(csvHeader)) {
- // read header from csv file
- csvFile = carbonLoadModel.getFactFilePath.split(",")(0)
- csvHeader = CarbonUtil.readHeader(csvFile, hadoopConf)
- if (StringUtils.isBlank(csvHeader)) {
- throw new CarbonDataLoadingException("First line of the csv is not valid.")
- }
- csvHeader.toLowerCase().split(delimiter).map(_.replaceAll("\"", "").trim)
- } else {
- csvHeader.toLowerCase.split(CarbonCommonConstants.COMMA).map(_.trim)
- }
-
- if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName, csvColumns,
- carbonLoadModel.getCarbonDataLoadSchema)) {
- if (csvFile == null) {
- LOGGER.error("CSV header in DDL is not proper."
- + " Column names in schema and CSV header are not the same.")
- throw new CarbonDataLoadingException(
- "CSV header in DDL is not proper. Column names in schema and CSV header are "
- + "not the same.")
- } else {
- LOGGER.error(
- "CSV header in input file is not proper. Column names in schema and csv header are not "
- + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile))
- throw new CarbonDataLoadingException(
- "CSV header in input file is not proper. Column names in schema and csv header are not "
- + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile))
- }
- }
- csvColumns
- }
-
def validateMaxColumns(csvHeaders: Array[String], maxColumns: String): Int = {
/*
User configures both csvheadercolumns, maxcolumns,
@@ -862,8 +813,7 @@ object CommonUtil {
try {
val carbonTable = CarbonMetadata.getInstance
.getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(
- isForceDeletion = true, carbonTable)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true)
} catch {
case _: Exception =>
LOGGER.warn(s"Error while cleaning table " +