You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/08 16:55:28 UTC
[27/54] [abbrv] carbondata git commit: [CARBONDATA-2159] Remove
carbon-spark dependency in store-sdk module
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index 8d394db..e69de29 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -1,610 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.{Date, List, Locale}
-
-import scala.collection.{immutable, mutable}
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.commons.lang3.StringUtils
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
-import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
-import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}
-import org.apache.spark.sql.util.CarbonException
-import org.apache.spark.sql.util.SparkSQLUtil.sessionState
-
-import org.apache.carbondata.common.constants.LoggerAction
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.indexstore.PartitionSpec
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
-import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, TableOptionConstant}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark.LOGGER
-import org.apache.carbondata.spark.load.ValidateUtil
-import org.apache.carbondata.spark.rdd.SerializableConfiguration
-
-/**
- * the util object of data loading
- */
-object DataLoadingUtil {
-
- val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- /**
- * get data loading options and initialise default value
- */
- def getDataLoadingOptions(
- carbonProperty: CarbonProperties,
- options: immutable.Map[String, String]): mutable.Map[String, String] = {
- val optionsFinal = scala.collection.mutable.Map[String, String]()
- optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
- optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
- optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
- optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
- optionsFinal.put("columndict", options.getOrElse("columndict", null))
-
- optionsFinal.put("escapechar",
- CarbonLoaderUtil.getEscapeChar(options.getOrElse("escapechar", "\\")))
-
- optionsFinal.put(
- "serialization_null_format",
- options.getOrElse("serialization_null_format", "\\N"))
-
- optionsFinal.put(
- "bad_records_logger_enable",
- options.getOrElse(
- "bad_records_logger_enable",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
-
- val badRecordActionValue = carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
- CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
-
- optionsFinal.put(
- "bad_records_action",
- options.getOrElse(
- "bad_records_action",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
- badRecordActionValue)))
-
- optionsFinal.put(
- "is_empty_data_bad_record",
- options.getOrElse(
- "is_empty_data_bad_record",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
- CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
-
- optionsFinal.put(
- "skip_empty_line",
- options.getOrElse(
- "skip_empty_line",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_SKIP_EMPTY_LINE)))
-
- optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
-
- optionsFinal.put(
- "complex_delimiter_level_1",
- options.getOrElse("complex_delimiter_level_1", "$"))
-
- optionsFinal.put(
- "complex_delimiter_level_2",
- options.getOrElse("complex_delimiter_level_2", ":"))
-
- optionsFinal.put(
- "dateformat",
- options.getOrElse(
- "dateformat",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
- CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
-
- optionsFinal.put(
- "timestampformat",
- options.getOrElse(
- "timestampformat",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
- CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)))
-
- optionsFinal.put(
- "global_sort_partitions",
- options.getOrElse(
- "global_sort_partitions",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
- null)))
-
- optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
-
- optionsFinal.put(
- "batch_sort_size_inmb",
- options.getOrElse(
- "batch_sort_size_inmb",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
- carbonProperty.getProperty(
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
- CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
-
- optionsFinal.put(
- "bad_record_path",
- options.getOrElse(
- "bad_record_path",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
- carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_BADRECORDS_LOC,
- CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
-
- val useOnePass = options.getOrElse(
- "single_pass",
- carbonProperty.getProperty(
- CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
- CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
- case "true" =>
- true
- case "false" =>
- // when single_pass = false and if either alldictionarypath
- // or columnDict is configured the do not allow load
- if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
- StringUtils.isNotEmpty(optionsFinal("columndict"))) {
- throw new MalformedCarbonCommandException(
- "Can not use all_dictionary_path or columndict without single_pass.")
- } else {
- false
- }
- case illegal =>
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- LOGGER.error(s"Can't use single_pass, because illegal syntax found: [$illegal] " +
- "Please set it as 'true' or 'false'")
- false
- }
- optionsFinal.put("single_pass", useOnePass.toString)
- optionsFinal
- }
-
- /**
- * check whether using default value or not
- */
- private def checkDefaultValue(value: String, default: String) = {
- if (StringUtils.isEmpty(value)) {
- default
- } else {
- value
- }
- }
-
- /**
- * build CarbonLoadModel for data loading
- * @param table CarbonTable object containing all metadata information for the table
- * like table name, table path, schema, etc
- * @param options Load options from user input
- * @return a new CarbonLoadModel instance
- */
- def buildCarbonLoadModelJava(
- table: CarbonTable,
- options: java.util.Map[String, String]
- ): CarbonLoadModel = {
- val carbonProperty: CarbonProperties = CarbonProperties.getInstance
- val optionsFinal = getDataLoadingOptions(carbonProperty, options.asScala.toMap)
- optionsFinal.put("sort_scope", "no_sort")
- if (!options.containsKey("fileheader")) {
- val csvHeader = table.getCreateOrderColumn(table.getTableName)
- .asScala.map(_.getColName).mkString(",")
- optionsFinal.put("fileheader", csvHeader)
- }
- val model = new CarbonLoadModel()
- buildCarbonLoadModel(
- table = table,
- carbonProperty = carbonProperty,
- options = options.asScala.toMap,
- optionsFinal = optionsFinal,
- carbonLoadModel = model,
- hadoopConf = null) // we have provided 'fileheader', so it can be null
-
- // set default values
- model.setTimestampformat(CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
- model.setDateFormat(CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
- model.setUseOnePass(options.asScala.getOrElse("onepass", "false").toBoolean)
- model.setDictionaryServerHost(options.asScala.getOrElse("dicthost", null))
- model.setDictionaryServerPort(options.asScala.getOrElse("dictport", "-1").toInt)
- model
- }
-
- /**
- * build CarbonLoadModel for data loading
- * @param table CarbonTable object containing all metadata information for the table
- * like table name, table path, schema, etc
- * @param carbonProperty Carbon property instance
- * @param options Load options from user input
- * @param optionsFinal Load options that populated with default values for optional options
- * @param carbonLoadModel The output load model
- * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in
- * user provided load options
- */
- def buildCarbonLoadModel(
- table: CarbonTable,
- carbonProperty: CarbonProperties,
- options: immutable.Map[String, String],
- optionsFinal: mutable.Map[String, String],
- carbonLoadModel: CarbonLoadModel,
- hadoopConf: Configuration,
- partition: Map[String, Option[String]] = Map.empty,
- isDataFrame: Boolean = false): Unit = {
- carbonLoadModel.setTableName(table.getTableName)
- carbonLoadModel.setDatabaseName(table.getDatabaseName)
- carbonLoadModel.setTablePath(table.getTablePath)
- carbonLoadModel.setTableName(table.getTableName)
- val dataLoadSchema = new CarbonDataLoadSchema(table)
- // Need to fill dimension relation
- carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
- val sort_scope = optionsFinal("sort_scope")
- val single_pass = optionsFinal("single_pass")
- val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
- val bad_records_action = optionsFinal("bad_records_action")
- var bad_record_path = optionsFinal("bad_record_path")
- val global_sort_partitions = optionsFinal("global_sort_partitions")
- val timestampformat = optionsFinal("timestampformat")
- val dateFormat = optionsFinal("dateformat")
- val delimeter = optionsFinal("delimiter")
- val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
- val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
- val all_dictionary_path = optionsFinal("all_dictionary_path")
- val column_dict = optionsFinal("columndict")
- ValidateUtil.validateDateTimeFormat(timestampformat, "TimestampFormat")
- ValidateUtil.validateDateTimeFormat(dateFormat, "DateFormat")
- ValidateUtil.validateSortScope(table, sort_scope)
-
- if (bad_records_logger_enable.toBoolean ||
- LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
- if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
- CarbonException.analysisException("Invalid bad records location.")
- }
- bad_record_path = CarbonUtil.checkAndAppendHDFSUrl(bad_record_path)
- }
- carbonLoadModel.setBadRecordsLocation(bad_record_path)
-
- ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
- carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
- carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
- carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
-
- // if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
- // we should use table schema to generate file header.
- var fileHeader = optionsFinal("fileheader")
- val headerOption = options.get("header")
- if (headerOption.isDefined) {
- // whether the csv file has file header
- // the default value is true
- val header = try {
- headerOption.get.toBoolean
- } catch {
- case ex: IllegalArgumentException =>
- throw new MalformedCarbonCommandException(
- "'header' option should be either 'true' or 'false'. " + ex.getMessage)
- }
- if (header) {
- if (fileHeader.nonEmpty) {
- throw new MalformedCarbonCommandException(
- "When 'header' option is true, 'fileheader' option is not required.")
- }
- } else {
- if (fileHeader.isEmpty) {
- fileHeader = table.getCreateOrderColumn(table.getTableName)
- .asScala.map(_.getColName).mkString(",")
- }
- }
- }
-
- carbonLoadModel.setTimestampformat(timestampformat)
- carbonLoadModel.setDateFormat(dateFormat)
- carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
- CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
-
- carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
- CarbonCommonConstants.CARBON_DATE_FORMAT,
- CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
-
- carbonLoadModel.setSerializationNullFormat(
- TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
- optionsFinal("serialization_null_format"))
-
- carbonLoadModel.setBadRecordsLoggerEnable(
- TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
-
- carbonLoadModel.setBadRecordsAction(
- TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action.toUpperCase)
-
- carbonLoadModel.setIsEmptyDataBadRecord(
- DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
- optionsFinal("is_empty_data_bad_record"))
-
- carbonLoadModel.setSkipEmptyLine(optionsFinal("skip_empty_line"))
-
- carbonLoadModel.setSortScope(sort_scope)
- carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
- carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
- carbonLoadModel.setUseOnePass(single_pass.toBoolean)
-
- if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
- complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
- delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
- CarbonException.analysisException(s"Field Delimiter and Complex types delimiter are same")
- } else {
- carbonLoadModel.setComplexDelimiterLevel1(complex_delimeter_level1)
- carbonLoadModel.setComplexDelimiterLevel2(complex_delimeter_level2)
- }
- // set local dictionary path, and dictionary file extension
- carbonLoadModel.setAllDictPath(all_dictionary_path)
- carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
- carbonLoadModel.setCsvHeader(fileHeader)
- carbonLoadModel.setColDictFilePath(column_dict)
-
- val ignoreColumns = new util.ArrayList[String]()
- if (!isDataFrame) {
- ignoreColumns.addAll(partition.filter(_._2.isDefined).keys.toList.asJava)
- }
- carbonLoadModel.setCsvHeaderColumns(
- CommonUtil.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns))
-
- val validatedMaxColumns = CommonUtil.validateMaxColumns(
- carbonLoadModel.getCsvHeaderColumns,
- optionsFinal("maxcolumns"))
-
- carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
- if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
- }
- }
-
- private def isLoadDeletionRequired(metaDataLocation: String): Boolean = {
- val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
- if (details != null && details.nonEmpty) for (oneRow <- details) {
- if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
- SegmentStatus.COMPACTED == oneRow.getSegmentStatus ||
- SegmentStatus.INSERT_IN_PROGRESS == oneRow.getSegmentStatus ||
- SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == oneRow.getSegmentStatus) &&
- oneRow.getVisibility.equalsIgnoreCase("true")) {
- return true
- }
- }
- false
- }
-
- def deleteLoadsAndUpdateMetadata(
- isForceDeletion: Boolean,
- carbonTable: CarbonTable,
- specs: util.List[PartitionSpec]): Unit = {
- if (isLoadDeletionRequired(carbonTable.getMetadataPath)) {
- val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-
- val (details, updationRequired) =
- isUpdationRequired(
- isForceDeletion,
- carbonTable,
- absoluteTableIdentifier)
-
-
- if (updationRequired) {
- val carbonTableStatusLock =
- CarbonLockFactory.getCarbonLockObj(
- absoluteTableIdentifier,
- LockUsage.TABLE_STATUS_LOCK
- )
- var locked = false
- var updationCompletionStaus = false
- try {
- // Update load metadate file after cleaning deleted nodes
- locked = carbonTableStatusLock.lockWithRetries()
- if (locked) {
- LOGGER.info("Table status lock has been successfully acquired.")
- // Again read status and check to verify updation required or not.
- val (details, updationRequired) =
- isUpdationRequired(
- isForceDeletion,
- carbonTable,
- absoluteTableIdentifier)
- if (!updationRequired) {
- return
- }
- // read latest table status again.
- val latestMetadata = SegmentStatusManager
- .readLoadMetadata(carbonTable.getMetadataPath)
-
- // update the metadata details from old to new status.
- val latestStatus = CarbonLoaderUtil
- .updateLoadMetadataFromOldToNew(details, latestMetadata)
-
- CarbonLoaderUtil.writeLoadMetadata(absoluteTableIdentifier, latestStatus)
- } else {
- val dbName = absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName
- val tableName = absoluteTableIdentifier.getCarbonTableIdentifier.getTableName
- val errorMsg = "Clean files request is failed for " +
- s"$dbName.$tableName" +
- ". Not able to acquire the table status lock due to other operation " +
- "running in the background."
- LOGGER.audit(errorMsg)
- LOGGER.error(errorMsg)
- throw new Exception(errorMsg + " Please try after some time.")
- }
- updationCompletionStaus = true
- } finally {
- if (locked) {
- CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
- }
- }
- if (updationCompletionStaus) {
- DeleteLoadFolders
- .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier,
- carbonTable.getMetadataPath, isForceDeletion, specs)
- }
- }
- }
- }
-
- private def isUpdationRequired(isForceDeletion: Boolean,
- carbonTable: CarbonTable,
- absoluteTableIdentifier: AbsoluteTableIdentifier): (Array[LoadMetadataDetails], Boolean) = {
- val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
- // Delete marked loads
- val isUpdationRequired =
- DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
- absoluteTableIdentifier,
- isForceDeletion,
- details,
- carbonTable.getMetadataPath
- )
- (details, isUpdationRequired)
- }
-
- /**
- * creates a RDD that does reading of multiple CSV files
- */
- def csvFileScanRDD(
- spark: SparkSession,
- model: CarbonLoadModel,
- hadoopConf: Configuration
- ): RDD[InternalRow] = {
- // 1. partition
- val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes
- val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes
- val defaultParallelism = spark.sparkContext.defaultParallelism
- CommonUtil.configureCSVInputFormat(hadoopConf, model)
- hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath)
- val jobConf = new JobConf(hadoopConf)
- SparkHadoopUtil.get.addCredentials(jobConf)
- val jobContext = new JobContextImpl(jobConf, null)
- val inputFormat = new CSVInputFormat()
- val rawSplits = inputFormat.getSplits(jobContext).toArray
- val splitFiles = rawSplits.map { split =>
- val fileSplit = split.asInstanceOf[FileSplit]
- PartitionedFile(
- InternalRow.empty,
- fileSplit.getPath.toString,
- fileSplit.getStart,
- fileSplit.getLength,
- fileSplit.getLocations)
- }.sortBy(_.length)(implicitly[Ordering[Long]].reverse)
- val totalBytes = splitFiles.map(_.length + openCostInBytes).sum
- val bytesPerCore = totalBytes / defaultParallelism
-
- val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore))
- LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " +
- s"open cost is considered as scanning $openCostInBytes bytes.")
-
- val partitions = new ArrayBuffer[FilePartition]
- val currentFiles = new ArrayBuffer[PartitionedFile]
- var currentSize = 0L
-
- def closePartition(): Unit = {
- if (currentFiles.nonEmpty) {
- val newPartition =
- FilePartition(
- partitions.size,
- currentFiles.toArray.toSeq)
- partitions += newPartition
- }
- currentFiles.clear()
- currentSize = 0
- }
-
- splitFiles.foreach { file =>
- if (currentSize + file.length > maxSplitBytes) {
- closePartition()
- }
- // Add the given file to the current partition.
- currentSize += file.length + openCostInBytes
- currentFiles += file
- }
- closePartition()
-
- // 2. read function
- val serializableConfiguration = new SerializableConfiguration(jobConf)
- val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable {
- override def apply(file: PartitionedFile): Iterator[InternalRow] = {
- new Iterator[InternalRow] {
- val hadoopConf = serializableConfiguration.value
- val jobTrackerId: String = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US)
- formatter.format(new Date())
- }
- val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0)
- val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId)
- val inputSplit =
- new FileSplit(new Path(file.filePath), file.start, file.length, file.locations)
- var finished = false
- val inputFormat = new CSVInputFormat()
- val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext)
- reader.initialize(inputSplit, hadoopAttemptContext)
-
- override def hasNext: Boolean = {
- if (!finished) {
- if (reader != null) {
- if (reader.nextKeyValue()) {
- true
- } else {
- finished = true
- reader.close()
- false
- }
- } else {
- finished = true
- false
- }
- } else {
- false
- }
- }
-
- override def next(): InternalRow = {
- new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]])
- }
- }
- }
- }
- new FileScanRDD(spark, readFunction, partitions)
- }
-
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 2bd4f45..fb9ecac 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -321,7 +321,7 @@ object GlobalDictionaryUtil {
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
// get load count
if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+ carbonLoadModel.readAndSetLoadMetadataDetails()
}
val absoluteTableIdentifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, table)
DictionaryLoadModel(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 84215fd..b1e4083 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.PartitionUtils
import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.DataTypes
@@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.processing.util.CarbonLoaderUtil
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 349c436..2c4d604 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -153,7 +153,7 @@ object CarbonDataRDDFactory {
if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
// update the updated table status. For the case of Update Delta Compaction the Metadata
// is filled in LoadModel, no need to refresh.
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+ carbonLoadModel.readAndSetLoadMetadataDetails()
}
val compactionThread = new Thread {
@@ -274,7 +274,7 @@ object CarbonDataRDDFactory {
loadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
loadModel.setTablePath(table.getTablePath)
- CommonUtil.readLoadMetadataDetails(loadModel)
+ loadModel.readAndSetLoadMetadataDetails()
val loadStartTime = CarbonUpdateUtil.readCurrentTime()
loadModel.setFactTimeStamp(loadStartTime)
loadModel
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 1210b92..231b748 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -76,7 +76,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
}
// scan again and determine if anything is there to merge again.
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+ carbonLoadModel.readAndSetLoadMetadataDetails()
segList = carbonLoadModel.getLoadMetadataDetails
// in case of major compaction we will scan only once and come out as it will keep
// on doing major for the new loads also.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index 7d70534..adf5e04 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -35,13 +35,13 @@ import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CarbonException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CarbonScalaUtil
import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 0eb7ad5..6508777 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -23,10 +23,10 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException}
/**
* Below command class will be used to create datamap on table
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 2675036..dcc71a2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException}
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -34,7 +35,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.events._
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, NoSuchDataMapException}
/**
* Drops the datamap and any related tables associated with the datamap
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index acc3358..e47c500 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -21,18 +21,20 @@ import java.io.{File, IOException}
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel, DataCommand}
+import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel}
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
import org.apache.spark.sql.optimizer.CarbonFilters
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.AlterTableUtil
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
@@ -44,7 +46,6 @@ import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEvent
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException}
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.util.CommonUtil
import org.apache.carbondata.streaming.StreamHandoffRDD
@@ -103,7 +104,6 @@ case class CarbonAlterTableCompactionCommand(
.fireEvent(alterTableCompactionExceptionEvent, operationContext)
compactionException = operationContext.getProperty("compactionException").toString
}
-
if (compactionException.equalsIgnoreCase("true") && null == compactionType) {
throw new MalformedCarbonCommandException(
"Unsupported alter operation on carbon table")
@@ -159,8 +159,7 @@ case class CarbonAlterTableCompactionCommand(
operationContext: OperationContext): Unit = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
- val compactionSize: Long = CarbonDataMergerUtil
- .getCompactionSize(compactionType, carbonLoadModel)
+ val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel)
if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
if (alterTableModel.segmentUpdateStatusManager.isDefined) {
carbonLoadModel.setSegmentUpdateStatusManager(
@@ -175,7 +174,7 @@ case class CarbonAlterTableCompactionCommand(
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
if (null == carbonLoadModel.getLoadMetadataDetails) {
- CommonUtil.readLoadMetadataDetails(carbonLoadModel)
+ carbonLoadModel.readAndSetLoadMetadataDetails()
}
if (compactionType == CompactionType.STREAMING) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index a42031d..70134a6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -46,6 +46,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -70,12 +71,15 @@ import org.apache.carbondata.processing.exception.DataLoadingException
import org.apache.carbondata.processing.loading.TableProcessingOperations
import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent}
import org.apache.carbondata.processing.loading.exception.NoRetryException
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.processing.loading.sort.SortScopeOptions
import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider
import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionRDD}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil}
import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark
import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
@@ -174,7 +178,7 @@ case class CarbonLoadDataCommand(
val carbonLoadModel = new CarbonLoadModel()
try {
val tableProperties = table.getTableInfo.getFactTable.getTableProperties
- val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+ val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
carbonProperty.getProperty(CarbonCommonConstants.LOAD_SORT_SCOPE,
@@ -189,10 +193,8 @@ case class CarbonLoadDataCommand(
carbonLoadModel.setAggLoadRequest(
internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
- DataLoadingUtil.buildCarbonLoadModel(
- table,
- carbonProperty,
- options,
+ new CarbonLoadModelBuilder(table).build(
+ options.asJava,
optionsFinal,
carbonLoadModel,
hadoopConf,
@@ -221,7 +223,7 @@ case class CarbonLoadDataCommand(
carbonLoadModel,
factPath,
dataFrame.isDefined,
- optionsFinal.asJava,
+ optionsFinal,
options.asJava,
isOverwriteTable)
operationContext.setProperty("isOverwrite", isOverwriteTable)
@@ -229,6 +231,7 @@ case class CarbonLoadDataCommand(
// First system has to partition the data first and then call the load data
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
// Clean up the old invalid segment data before creating a new entry for new load.
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false)
DataLoadingUtil.deleteLoadsAndUpdateMetadata(
isForceDeletion = false,
table,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index a37d6dc..f074285 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -22,13 +22,13 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.events.{DeleteFromTablePostEvent, DeleteFromTablePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
/**
* IUD update delete and compaction framework.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 4886676..5165342 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.storage.StorageLevel
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.Segment
@@ -34,7 +35,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
import org.apache.carbondata.processing.loading.FailureCauses
-import org.apache.carbondata.spark.exception.ConcurrentOperationException
private[sql] case class CarbonProjectForUpdateCommand(
plan: LogicalPlan,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
index 5817d88..220d75d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/IUDCommonUtil.scala
@@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.HiveSessionCatalog
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
* Util for IUD common function
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 4d0a4c5..c836584 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider
import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.spark.util.DataLoadingUtil
/**
* Below command class will be used to create pre-aggregate table
@@ -179,11 +178,7 @@ case class CreatePreAggregateTableCommand(
// This will be used to check if the parent table has any segments or not. If not then no
// need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
// table.
- DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false,
- parentTable,
- CarbonFilters.getCurrentPartitions(sparkSession,
- TableIdentifier(parentTable.getTableName,
- Some(parentTable.getDatabaseName))).map(_.asJava).orNull)
+ SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index ebf87d2..a5d256c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
import org.apache.spark.sql.types.DataType
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
@@ -42,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchem
import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.format.TableInfo
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 9fde675..cf77e0f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
import org.apache.spark.util.AlterTableUtil
+import org.apache.carbondata.common.exceptions.ConcurrentOperationException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datamap.DataMapStoreManager
@@ -37,7 +39,6 @@ import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException}
private[sql] case class CarbonAlterTableRenameCommand(
alterTableRenameModel: AlterTableRenameModel)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
index 45767da..2b35416 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeSeriesUtil.scala
@@ -18,12 +18,12 @@ package org.apache.spark.sql.execution.command.timeseries
import org.apache.spark.sql.execution.command.{DataMapField, Field}
+import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
import org.apache.carbondata.core.metadata.schema.datamap.Granularity
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.preagg.TimeSeriesUDF
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException}
/**
* Utility class for time series to keep
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index b4d3bea..61a31a5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -50,6 +50,9 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu
import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
+import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
+import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util}
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
@@ -87,7 +90,7 @@ with Serializable {
TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
val model = new CarbonLoadModel
val carbonProperty = CarbonProperties.getInstance()
- val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
+ val optionsFinal = LoadOption.fillOptionWithDefaultValue(options.asJava)
val tableProperties = table.getTableInfo.getFactTable.getTableProperties
optionsFinal.put("sort_scope", tableProperties.asScala.getOrElse("sort_scope",
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SORT_SCOPE,
@@ -102,14 +105,11 @@ with Serializable {
val optionsLocal = new mutable.HashMap[String, String]()
optionsLocal ++= options
optionsLocal += (("header", "false"))
- DataLoadingUtil.buildCarbonLoadModel(
- table,
- carbonProperty,
- optionsLocal.toMap,
+ new CarbonLoadModelBuilder(table).build(
+ optionsLocal.toMap.asJava,
optionsFinal,
model,
- conf
- )
+ conf)
model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
model.setDictionaryServerHost(options.getOrElse("dicthost", null))
model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index dcbce84..ec20ec2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -32,10 +32,10 @@ import org.apache.spark.sql.execution.datasources.RefreshTable
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.processing.merger.CompactionType
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
* Carbon strategies for ddl commands
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 608ec60..7028dcf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command.AlterTableRenameCommand
import org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, CarbonProjectForUpdateCommand}
import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
/**
* Strategy for streaming table, like blocking unsupported operation
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index 7ca34af..27c7d17 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -24,9 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
extends RunnableCommand {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 3896ce9..2905b60 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -36,9 +36,9 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index ad6d0c7..ef4836e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -32,10 +32,10 @@ import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.util.CarbonException
import org.apache.spark.util.CarbonReflectionUtils
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.hadoop.util.SchemaReader
import org.apache.carbondata.spark.CarbonOption
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CommonUtil
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index bc36e9c..aaa87a3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog, HiveExternalCatalog}
import org.apache.spark.sql.hive.HiveExternalCatalog._
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -39,7 +40,6 @@ import org.apache.carbondata.core.util.CarbonUtil
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.core.util.path.CarbonTablePath.getNewTablePath
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
object AlterTableUtil {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
index bc62902..a8094b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/TableAPIUtil.scala
@@ -19,10 +19,10 @@ package org.apache.spark.util
import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
* table api util
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index e82b485..f033a8e 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -16,9 +16,6 @@
*/
package org.apache.spark.sql.hive
-
-import scala.collection.generic.SeqFactory
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
@@ -46,10 +43,8 @@ import org.apache.spark.sql.types.DecimalType
import org.apache.spark.util.CarbonReflectionUtils
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.util.CarbonScalaUtil
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index c676b01..5ade510 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -4,7 +4,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
/**
* Created by rahul on 19/9/17.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 1d41ddc..3298009 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -25,7 +25,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.util.TableOptionConstant
/**
@@ -63,7 +63,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
carbonLoadModel.setCsvHeaderColumns(
- CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
+ LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
// Create table and metadata folders if not exist
val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
val fileType = FileFactory.getFileType(metadataDirectoryPath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 7ca0b56..43d8c03 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -23,14 +23,14 @@ import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.{CarbonEnv, SparkSession}
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.processing.exception.DataLoadingException
-import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel, CarbonLoadModelBuilder, LoadOption}
import org.apache.carbondata.processing.util.TableOptionConstant
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
/**
* test case for external column dictionary generation
@@ -176,7 +176,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
carbonLoadModel.setCsvHeaderColumns(
- CommonUtil.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
+ LoadOption.getCsvHeaderColumns(carbonLoadModel, FileFactory.getConfiguration))
carbonLoadModel.setMaxColumns("100")
// Create table and metadata folders if not exist
val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 7dc6275..3b599fc 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -32,14 +32,12 @@ import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQuery}
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException}
-import org.apache.carbondata.core.util.path.CarbonTablePath
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
import org.apache.carbondata.spark.exception.ProcessMetaDataException
class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 9da7244..65a006b 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -21,11 +21,11 @@ import org.apache.spark.sql.common.util.Spark2QueryTest
import org.apache.spark.sql.execution.exchange.ShuffleExchange
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index ac10b9a..995f041 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -26,9 +26,9 @@ import org.apache.spark.sql.common.util.Spark2QueryTest
import org.apache.spark.sql.test.TestQueryExecutor
import org.scalatest.BeforeAndAfterAll
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException}
class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/89cfd8e0/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 24b25b9..bf1dfd1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -18,14 +18,16 @@
package org.apache.carbondata.processing.loading.model;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.carbondata.core.dictionary.service.DictionaryServiceProvider;
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-
+import org.apache.carbondata.core.util.path.CarbonTablePath;
public class CarbonLoadModel implements Serializable {
@@ -797,6 +799,7 @@ public class CarbonLoadModel implements Serializable {
this.skipEmptyLine = skipEmptyLine;
}
+
public boolean isPartitionLoad() {
return isPartitionLoad;
}
@@ -812,4 +815,13 @@ public class CarbonLoadModel implements Serializable {
public void setDataWritePath(String dataWritePath) {
this.dataWritePath = dataWritePath;
}
+
+ /**
+ * Read segments metadata from table status file and set it to this load model object
+ */
+ public void readAndSetLoadMetadataDetails() {
+ String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+ LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metadataPath);
+ setLoadMetadataDetails(Arrays.asList(details));
+ }
}