You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:41 UTC
[03/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
deleted file mode 100644
index 9a4e209..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ /dev/null
@@ -1,875 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import java.io.{FileNotFoundException, IOException}
-import java.nio.charset.Charset
-import java.util.regex.Pattern
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.language.implicitConversions
-import scala.util.control.Breaks.{break, breakable}
-
-import org.apache.commons.lang3.{ArrayUtils, StringUtils}
-import org.apache.spark.Accumulator
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql._
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
-import org.apache.spark.util.FileUtils
-
-import org.apache.carbondata.common.factory.CarbonCommonFactory
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.carbon.path.CarbonStorePath
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.reader.CarbonDictionaryReader
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.writer.CarbonDictionaryWriter
-import org.apache.carbondata.processing.etl.DataLoadingException
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark.CarbonSparkFactory
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.rdd._
-
-/**
- * A object which provide a method to generate global dictionary from CSV files.
- */
-object GlobalDictionaryUtil {
- private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-
- /**
- * The default separator to use if none is supplied to the constructor.
- */
- val DEFAULT_SEPARATOR: Char = ','
- /**
- * The default quote character to use if none is supplied to the
- * constructor.
- */
- val DEFAULT_QUOTE_CHARACTER: Char = '"'
-
- /**
- * find columns which need to generate global dictionary.
- *
- * @param dimensions dimension list of schema
- * @param headers column headers
- * @param columns column list of csv file
- */
- def pruneDimensions(dimensions: Array[CarbonDimension],
- headers: Array[String],
- columns: Array[String]): (Array[CarbonDimension], Array[String]) = {
- val dimensionBuffer = new ArrayBuffer[CarbonDimension]
- val columnNameBuffer = new ArrayBuffer[String]
- val dimensionsWithDict = dimensions.filter(hasEncoding(_, Encoding.DICTIONARY,
- Encoding.DIRECT_DICTIONARY))
- dimensionsWithDict.foreach { dim =>
- breakable {
- headers.zipWithIndex.foreach { h =>
- if (dim.getColName.equalsIgnoreCase(h._1)) {
- dimensionBuffer += dim
- columnNameBuffer += columns(h._2)
- break
- }
- }
- }
- }
- (dimensionBuffer.toArray, columnNameBuffer.toArray)
- }
-
- /**
- * use this method to judge whether CarbonDimension use some encoding or not
- *
- * @param dimension carbonDimension
- * @param encoding the coding way of dimension
- * @param excludeEncoding the coding way to exclude
- */
- def hasEncoding(dimension: CarbonDimension,
- encoding: Encoding,
- excludeEncoding: Encoding): Boolean = {
- if (dimension.isComplex()) {
- val children = dimension.getListOfChildDimensions
- children.asScala.exists(hasEncoding(_, encoding, excludeEncoding))
- } else {
- dimension.hasEncoding(encoding) &&
- (excludeEncoding == null || !dimension.hasEncoding(excludeEncoding))
- }
- }
-
- def gatherDimensionByEncoding(carbonLoadModel: CarbonLoadModel,
- dimension: CarbonDimension,
- encoding: Encoding,
- excludeEncoding: Encoding,
- dimensionsWithEncoding: ArrayBuffer[CarbonDimension],
- forPreDefDict: Boolean) {
- if (dimension.isComplex) {
- val children = dimension.getListOfChildDimensions.asScala
- children.foreach { c =>
- gatherDimensionByEncoding(carbonLoadModel, c, encoding, excludeEncoding,
- dimensionsWithEncoding, forPreDefDict)
- }
- } else {
- if (dimension.hasEncoding(encoding) &&
- (excludeEncoding == null || !dimension.hasEncoding(excludeEncoding))) {
- if ((forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) != null) ||
- (!forPreDefDict && carbonLoadModel.getPredefDictFilePath(dimension) == null)) {
- dimensionsWithEncoding += dimension
- }
- }
- }
- }
-
- def getPrimDimensionWithDict(carbonLoadModel: CarbonLoadModel,
- dimension: CarbonDimension,
- forPreDefDict: Boolean): Array[CarbonDimension] = {
- val dimensionsWithDict = new ArrayBuffer[CarbonDimension]
- gatherDimensionByEncoding(carbonLoadModel, dimension, Encoding.DICTIONARY,
- Encoding.DIRECT_DICTIONARY,
- dimensionsWithDict, forPreDefDict)
- dimensionsWithDict.toArray
- }
-
- /**
- * invoke CarbonDictionaryWriter to write dictionary to file.
- *
- * @param model instance of DictionaryLoadModel
- * @param columnIndex the index of current column in column list
- * @param iter distinct value list of dictionary
- */
- def writeGlobalDictionaryToFile(model: DictionaryLoadModel,
- columnIndex: Int,
- iter: Iterator[String]): Unit = {
- val dictService = CarbonCommonFactory.getDictionaryService
- val writer: CarbonDictionaryWriter = dictService.getDictionaryWriter(
- model.table,
- model.columnIdentifier(columnIndex),
- model.hdfsLocation
- )
- try {
- while (iter.hasNext) {
- writer.write(iter.next)
- }
- } finally {
- writer.close()
- }
- }
-
- /**
- * read global dictionary from cache
- */
- def readGlobalDictionaryFromCache(model: DictionaryLoadModel): HashMap[String, Dictionary] = {
- val dictMap = new HashMap[String, Dictionary]
- model.primDimensions.zipWithIndex.filter(f => model.dictFileExists(f._2)).foreach { m =>
- val dict = CarbonLoaderUtil.getDictionary(model.table,
- m._1.getColumnIdentifier, model.hdfsLocation,
- m._1.getDataType
- )
- dictMap.put(m._1.getColumnId, dict)
- }
- dictMap
- }
-
- /**
- * invoke CarbonDictionaryReader to read dictionary from files.
- *
- * @param model carbon dictionary load model
- */
- def readGlobalDictionaryFromFile(model: DictionaryLoadModel): HashMap[String, HashSet[String]] = {
- val dictMap = new HashMap[String, HashSet[String]]
- val dictService = CarbonCommonFactory.getDictionaryService
- for (i <- model.primDimensions.indices) {
- val set = new HashSet[String]
- if (model.dictFileExists(i)) {
- val reader: CarbonDictionaryReader = dictService.getDictionaryReader(model.table,
- model.columnIdentifier(i), model.hdfsLocation
- )
- val values = reader.read
- if (values != null) {
- for (j <- 0 until values.size) {
- set.add(new String(values.get(j),
- Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)))
- }
- }
- }
- dictMap.put(model.primDimensions(i).getColumnId, set)
- }
- dictMap
- }
-
- def generateParserForChildrenDimension(dim: CarbonDimension,
- format: DataFormat,
- mapColumnValuesWithId:
- HashMap[String, HashSet[String]],
- generic: GenericParser): Unit = {
- val children = dim.getListOfChildDimensions.asScala
- for (i <- children.indices) {
- generateParserForDimension(Some(children(i)), format.cloneAndIncreaseIndex,
- mapColumnValuesWithId) match {
- case Some(childDim) =>
- generic.addChild(childDim)
- case None =>
- }
- }
- }
-
- def generateParserForDimension(dimension: Option[CarbonDimension],
- format: DataFormat,
- mapColumnValuesWithId: HashMap[String, HashSet[String]]): Option[GenericParser] = {
- dimension match {
- case None =>
- None
- case Some(dim) =>
- dim.getDataType match {
- case DataType.ARRAY =>
- val arrDim = ArrayParser(dim, format)
- generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, arrDim)
- Some(arrDim)
- case DataType.STRUCT =>
- val stuDim = StructParser(dim, format)
- generateParserForChildrenDimension(dim, format, mapColumnValuesWithId, stuDim)
- Some(stuDim)
- case _ =>
- Some(PrimitiveParser(dim, mapColumnValuesWithId.get(dim.getColumnId)))
- }
- }
- }
-
- def createDataFormat(delimiters: Array[String]): DataFormat = {
- if (ArrayUtils.isNotEmpty(delimiters)) {
- val patterns = delimiters.map { d =>
- Pattern.compile(if (d == null) {
- ""
- } else {
- d
- })
- }
- DataFormat(delimiters, 0, patterns)
- } else {
- null
- }
- }
-
- def isHighCardinalityColumn(columnCardinality: Int,
- rowCount: Long,
- model: DictionaryLoadModel): Boolean = {
- (columnCardinality > model.highCardThreshold) &&
- (rowCount > 0) &&
- (columnCardinality.toDouble / rowCount * 100 > model.rowCountPercentage)
- }
-
- /**
- * create a instance of DictionaryLoadModel
- *
- * @param carbonLoadModel carbon load model
- * @param table CarbonTableIdentifier
- * @param dimensions column list
- * @param hdfsLocation store location in HDFS
- * @param dictfolderPath path of dictionary folder
- */
- def createDictionaryLoadModel(carbonLoadModel: CarbonLoadModel,
- table: CarbonTableIdentifier,
- dimensions: Array[CarbonDimension],
- hdfsLocation: String,
- dictfolderPath: String,
- forPreDefDict: Boolean): DictionaryLoadModel = {
- val primDimensionsBuffer = new ArrayBuffer[CarbonDimension]
- val isComplexes = new ArrayBuffer[Boolean]
- for (i <- dimensions.indices) {
- val dims = getPrimDimensionWithDict(carbonLoadModel, dimensions(i), forPreDefDict)
- for (j <- dims.indices) {
- primDimensionsBuffer += dims(j)
- isComplexes += dimensions(i).isComplex
- }
- }
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(hdfsLocation, table)
- val primDimensions = primDimensionsBuffer.map { x => x }.toArray
- val dictDetail = CarbonSparkFactory.getDictionaryDetailService().
- getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
- val dictFilePaths = dictDetail.dictFilePaths
- val dictFileExists = dictDetail.dictFileExists
- val columnIdentifier = dictDetail.columnIdentifiers
- val hdfsTempLocation = CarbonProperties.getInstance.
- getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION, System.getProperty("java.io.tmpdir"))
- val lockType = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.LOCK_TYPE, CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
- val zookeeperUrl = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.ZOOKEEPER_URL)
- // load high cardinality identify configure
- val highCardIdentifyEnable = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE,
- CarbonCommonConstants.HIGH_CARDINALITY_IDENTIFY_ENABLE_DEFAULT).toBoolean
- val highCardThreshold = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD,
- CarbonCommonConstants.HIGH_CARDINALITY_THRESHOLD_DEFAULT).toInt
- val rowCountPercentage = CarbonProperties.getInstance().getProperty(
- CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE,
- CarbonCommonConstants.HIGH_CARDINALITY_IN_ROW_COUNT_PERCENTAGE_DEFAULT).toDouble
-
- val serializationNullFormat =
- carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
- // get load count
- if (null == carbonLoadModel.getLoadMetadataDetails) {
- CarbonDataRDDFactory.readLoadMetadataDetails(carbonLoadModel, hdfsLocation)
- }
- new DictionaryLoadModel(table,
- dimensions,
- hdfsLocation,
- dictfolderPath,
- dictFilePaths,
- dictFileExists,
- isComplexes.toArray,
- primDimensions,
- carbonLoadModel.getDelimiters,
- highCardIdentifyEnable,
- highCardThreshold,
- rowCountPercentage,
- columnIdentifier,
- carbonLoadModel.getLoadMetadataDetails.size() == 0,
- hdfsTempLocation,
- lockType,
- zookeeperUrl,
- serializationNullFormat)
- }
-
- /**
- * load CSV files to DataFrame by using datasource "com.databricks.spark.csv"
- *
- * @param sqlContext SQLContext
- * @param carbonLoadModel carbon data load model
- */
- def loadDataFrame(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel): DataFrame = {
- val df = sqlContext.read
- .format("com.databricks.spark.csv.newapi")
- .option("header", {
- if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
- "true"
- } else {
- "false"
- }
- })
- .option("delimiter", {
- if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
- "" + DEFAULT_SEPARATOR
- } else {
- carbonLoadModel.getCsvDelimiter
- }
- })
- .option("parserLib", "univocity")
- .option("escape", carbonLoadModel.getEscapeChar)
- .option("ignoreLeadingWhiteSpace", "false")
- .option("ignoreTrailingWhiteSpace", "false")
- .option("codec", "gzip")
- .option("quote", {
- if (StringUtils.isEmpty(carbonLoadModel.getQuoteChar)) {
- "" + DEFAULT_QUOTE_CHARACTER
- } else {
- carbonLoadModel.getQuoteChar
- }
- })
- .option("comment", carbonLoadModel.getCommentChar)
- .load(carbonLoadModel.getFactFilePath)
- df
- }
-
- private def updateTableMetadata(carbonLoadModel: CarbonLoadModel,
- sqlContext: SQLContext,
- model: DictionaryLoadModel,
- noDictDimension: Array[CarbonDimension]): Unit = {
-
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(model.hdfsLocation,
- model.table)
- val schemaFilePath = carbonTablePath.getSchemaFilePath
-
- // read TableInfo
- val tableInfo = CarbonMetastoreCatalog.readSchemaFileToThriftTable(schemaFilePath)
-
- // modify TableInfo
- val columns = tableInfo.getFact_table.getTable_columns
- for (i <- 0 until columns.size) {
- if (noDictDimension.exists(x => columns.get(i).getColumn_id.equals(x.getColumnId))) {
- columns.get(i).encoders.remove(org.apache.carbondata.format.Encoding.DICTIONARY)
- }
- }
-
- // write TableInfo
- CarbonMetastoreCatalog.writeThriftTableToSchemaFile(schemaFilePath, tableInfo)
-
- // update Metadata
- val catalog = CarbonEnv.getInstance(sqlContext).carbonCatalog
- catalog.updateMetadataByThriftTable(schemaFilePath, tableInfo,
- model.table.getDatabaseName, model.table.getTableName, carbonLoadModel.getStorePath)
-
- // update CarbonDataLoadSchema
- val carbonTable = catalog.lookupRelation1(Option(model.table.getDatabaseName),
- model.table.getTableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
- carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
-
- }
-
- /**
- * check whether global dictionary have been generated successfully or not
- *
- * @param status checking whether the generating is successful
- */
- private def checkStatus(carbonLoadModel: CarbonLoadModel,
- sqlContext: SQLContext,
- model: DictionaryLoadModel,
- status: Array[(Int, String, Boolean)]) = {
- var result = false
- val noDictionaryColumns = new ArrayBuffer[CarbonDimension]
- val tableName = model.table.getTableName
- status.foreach { x =>
- val columnName = model.primDimensions(x._1).getColName
- if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(x._2)) {
- result = true
- LOGGER.error(s"table:$tableName column:$columnName generate global dictionary file failed")
- }
- if (x._3) {
- noDictionaryColumns += model.primDimensions(x._1)
- }
- }
- if (noDictionaryColumns.nonEmpty) {
- updateTableMetadata(carbonLoadModel, sqlContext, model, noDictionaryColumns.toArray)
- }
- if (result) {
- LOGGER.error("generate global dictionary files failed")
- throw new Exception("Failed to generate global dictionary files")
- } else {
- LOGGER.info("generate global dictionary successfully")
- }
- }
-
- /**
- * get external columns and whose dictionary file path
- *
- * @param colDictFilePath external column dict file path
- * @param table table identifier
- * @param dimensions dimension columns
- */
- private def setPredefinedColumnDictPath(carbonLoadModel: CarbonLoadModel,
- colDictFilePath: String,
- table: CarbonTableIdentifier,
- dimensions: Array[CarbonDimension]) = {
- val colFileMapArray = colDictFilePath.split(",")
- for (colPathMap <- colFileMapArray) {
- val colPathMapTrim = colPathMap.trim
- val colNameWithPath = colPathMapTrim.split(":")
- if (colNameWithPath.length == 1) {
- LOGGER.error("the format of external column dictionary should be " +
- "columnName:columnPath, please check")
- throw new DataLoadingException("the format of predefined column dictionary" +
- " should be columnName:columnPath, please check")
- }
- setPredefineDict(carbonLoadModel, dimensions, table, colNameWithPath(0),
- FileUtils.getPaths(colPathMapTrim.substring(colNameWithPath(0).length + 1)))
- }
- }
-
- /**
- * set pre defined dictionary for dimension
- *
- * @param dimensions all the dimensions
- * @param table carbon table identifier
- * @param colName user specified column name for predefined dict
- * @param colDictPath column dictionary file path
- * @param parentDimName parent dimenion for complex type
- */
- def setPredefineDict(carbonLoadModel: CarbonLoadModel,
- dimensions: Array[CarbonDimension],
- table: CarbonTableIdentifier,
- colName: String,
- colDictPath: String,
- parentDimName: String = "") {
- val middleDimName = colName.split("\\.")(0)
- val dimParent = parentDimName + {
- colName match {
- case "" => colName
- case _ =>
- if (parentDimName.isEmpty) middleDimName else "." + middleDimName
- }
- }
- // judge whether the column is exists
- val preDictDimensionOption = dimensions.filter(
- _.getColName.equalsIgnoreCase(dimParent))
- if (preDictDimensionOption.length == 0) {
- LOGGER.error(s"Column $dimParent is not a key column " +
- s"in ${ table.getDatabaseName }.${ table.getTableName }")
- throw new DataLoadingException(s"Column $dimParent is not a key column. " +
- s"Only key column can be part of dictionary " +
- s"and used in COLUMNDICT option.")
- }
- val preDictDimension = preDictDimensionOption(0)
- if (preDictDimension.isComplex) {
- val children = preDictDimension.getListOfChildDimensions.asScala.toArray
- // for Array, user set ArrayFiled: path, while ArrayField has a child Array.val
- val currentColName = {
- preDictDimension.getDataType match {
- case DataType.ARRAY =>
- if (children(0).isComplex) {
- "val." + colName.substring(middleDimName.length + 1)
- } else {
- "val"
- }
- case _ => colName.substring(middleDimName.length + 1)
- }
- }
- setPredefineDict(carbonLoadModel, children, table, currentColName,
- colDictPath, dimParent)
- } else {
- carbonLoadModel.setPredefDictMap(preDictDimension, colDictPath)
- }
- }
-
- /**
- * use external dimension column to generate global dictionary
- *
- * @param colDictFilePath external column dict file path
- * @param table table identifier
- * @param dimensions dimension column
- * @param carbonLoadModel carbon load model
- * @param sqlContext spark sql context
- * @param hdfsLocation store location on hdfs
- * @param dictFolderPath generated global dict file path
- */
- private def generatePredefinedColDictionary(colDictFilePath: String,
- table: CarbonTableIdentifier,
- dimensions: Array[CarbonDimension],
- carbonLoadModel: CarbonLoadModel,
- sqlContext: SQLContext,
- hdfsLocation: String,
- dictFolderPath: String) = {
- // set pre defined dictionary column
- setPredefinedColumnDictPath(carbonLoadModel, colDictFilePath, table, dimensions)
- val dictLoadModel = createDictionaryLoadModel(carbonLoadModel, table, dimensions,
- hdfsLocation, dictFolderPath, forPreDefDict = true)
- // new RDD to achieve distributed column dict generation
- val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel,
- sqlContext.sparkContext, table, dimensions, hdfsLocation, dictFolderPath)
- .partitionBy(new ColumnPartitioner(dictLoadModel.primDimensions.length))
- val statusList = new CarbonGlobalDictionaryGenerateRDD(extInputRDD, dictLoadModel).collect()
- // check result status
- checkStatus(carbonLoadModel, sqlContext, dictLoadModel, statusList)
- }
-
- /* generate Dimension Parsers
- *
- * @param model
- * @param distinctValuesList
- * @return dimensionParsers
- */
- def createDimensionParsers(model: DictionaryLoadModel,
- distinctValuesList: ArrayBuffer[(Int, HashSet[String])]): Array[GenericParser] = {
- // local combine set
- val dimNum = model.dimensions.length
- val primDimNum = model.primDimensions.length
- val columnValues = new Array[HashSet[String]](primDimNum)
- val mapColumnValuesWithId = new HashMap[String, HashSet[String]]
- for (i <- 0 until primDimNum) {
- columnValues(i) = new HashSet[String]
- distinctValuesList += ((i, columnValues(i)))
- mapColumnValuesWithId.put(model.primDimensions(i).getColumnId, columnValues(i))
- }
- val dimensionParsers = new Array[GenericParser](dimNum)
- for (j <- 0 until dimNum) {
- dimensionParsers(j) = GlobalDictionaryUtil.generateParserForDimension(
- Some(model.dimensions(j)),
- GlobalDictionaryUtil.createDataFormat(model.delimiters),
- mapColumnValuesWithId).get
- }
- dimensionParsers
- }
-
- /**
- * parse records in dictionary file and validate record
- *
- * @param x
- * @param accum
- * @param csvFileColumns
- */
- private def parseRecord(x: String, accum: Accumulator[Int],
- csvFileColumns: Array[String]): (String, String) = {
- val tokens = x.split("" + DEFAULT_SEPARATOR)
- var columnName: String = ""
- var value: String = ""
- // such as "," , "", throw ex
- if (tokens.isEmpty) {
- LOGGER.error("Read a bad dictionary record: " + x)
- accum += 1
- } else if (tokens.size == 1) {
- // such as "1", "jone", throw ex
- if (!x.contains(",")) {
- accum += 1
- } else {
- try {
- columnName = csvFileColumns(tokens(0).toInt)
- } catch {
- case ex: Exception =>
- LOGGER.error("Read a bad dictionary record: " + x)
- accum += 1
- }
- }
- } else {
- try {
- columnName = csvFileColumns(tokens(0).toInt)
- value = tokens(1)
- } catch {
- case ex: Exception =>
- LOGGER.error("Read a bad dictionary record: " + x)
- accum += 1
- }
- }
- (columnName, value)
- }
-
- /**
- * read local dictionary and prune column
- *
- * @param sqlContext
- * @param csvFileColumns
- * @param requireColumns
- * @param allDictionaryPath
- * @return allDictionaryRdd
- */
- private def readAllDictionaryFiles(sqlContext: SQLContext,
- csvFileColumns: Array[String],
- requireColumns: Array[String],
- allDictionaryPath: String,
- accumulator: Accumulator[Int]) = {
- var allDictionaryRdd: RDD[(String, Iterable[String])] = null
- try {
- // read local dictionary file, and spilt (columnIndex, columnValue)
- val basicRdd = sqlContext.sparkContext.textFile(allDictionaryPath)
- .map(x => parseRecord(x, accumulator, csvFileColumns)).persist()
-
- // group by column index, and filter required columns
- val requireColumnsList = requireColumns.toList
- allDictionaryRdd = basicRdd
- .groupByKey()
- .filter(x => requireColumnsList.contains(x._1))
- } catch {
- case ex: Exception =>
- LOGGER.error("Read dictionary files failed. Caused by: " + ex.getMessage)
- throw ex
- }
- allDictionaryRdd
- }
-
- /**
- * validate local dictionary files
- *
- * @param allDictionaryPath
- * @return (isNonempty, isDirectory)
- */
- private def validateAllDictionaryPath(allDictionaryPath: String): Boolean = {
- val fileType = FileFactory.getFileType(allDictionaryPath)
- val filePath = FileFactory.getCarbonFile(allDictionaryPath, fileType)
- // filepath regex, look like "/path/*.dictionary"
- if (filePath.getName.startsWith("*")) {
- val dictExt = filePath.getName.substring(1)
- if (filePath.getParentFile.exists()) {
- val listFiles = filePath.getParentFile.listFiles()
- if (listFiles.exists(file =>
- file.getName.endsWith(dictExt) && file.getSize > 0)) {
- true
- } else {
- LOGGER.warn("No dictionary files found or empty dictionary files! " +
- "Won't generate new dictionary.")
- false
- }
- } else {
- throw new FileNotFoundException(
- "The given dictionary file path is not found!")
- }
- } else {
- if (filePath.exists()) {
- if (filePath.getSize > 0) {
- true
- } else {
- LOGGER.warn("No dictionary files found or empty dictionary files! " +
- "Won't generate new dictionary.")
- false
- }
- } else {
- throw new FileNotFoundException(
- "The given dictionary file path is not found!")
- }
- }
- }
-
- /**
- * get file headers from fact file
- *
- * @param carbonLoadModel
- * @return headers
- */
- private def getHeaderFormFactFile(carbonLoadModel: CarbonLoadModel): Array[String] = {
- var headers: Array[String] = null
- val factFile: String = carbonLoadModel.getFactFilePath.split(",")(0)
- val readLine = CarbonUtil.readHeader(factFile)
-
- if (null != readLine) {
- val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
- "" + DEFAULT_SEPARATOR
- } else {
- carbonLoadModel.getCsvDelimiter
- }
- headers = readLine.toLowerCase().split(delimiter)
- } else {
- LOGGER.error("Not found file header! Please set fileheader")
- throw new IOException("Failed to get file header")
- }
- headers
- }
-
- /**
- * generate global dictionary with SQLContext and CarbonLoadModel
- *
- * @param sqlContext sql context
- * @param carbonLoadModel carbon load model
- */
- def generateGlobalDictionary(sqlContext: SQLContext,
- carbonLoadModel: CarbonLoadModel,
- storePath: String,
- dataFrame: Option[DataFrame] = None): Unit = {
- try {
- val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
- val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
- // create dictionary folder if not exists
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier)
- val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
- // columns which need to generate global dictionary file
- val dimensions = carbonTable.getDimensionByTableName(
- carbonTable.getFactTableName).asScala.toArray
- // generate global dict from pre defined column dict file
- carbonLoadModel.initPredefDictMap()
-
- val allDictionaryPath = carbonLoadModel.getAllDictPath
- if (StringUtils.isEmpty(allDictionaryPath)) {
- LOGGER.info("Generate global dictionary from source data files!")
- // load data by using dataSource com.databricks.spark.csv
- var df = dataFrame.getOrElse(loadDataFrame(sqlContext, carbonLoadModel))
- var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
- df.columns
- } else {
- carbonLoadModel.getCsvHeader.split("" + DEFAULT_SEPARATOR)
- }
- headers = headers.map(headerName => headerName.trim)
- val colDictFilePath = carbonLoadModel.getColDictFilePath
- if (colDictFilePath != null) {
- // generate predefined dictionary
- generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
- dimensions, carbonLoadModel, sqlContext, storePath, dictfolderPath)
- }
- if (headers.length > df.columns.length) {
- val msg = "The number of columns in the file header do not match the " +
- "number of columns in the data file; Either delimiter " +
- "or fileheader provided is not correct"
- LOGGER.error(msg)
- throw new DataLoadingException(msg)
- }
- // use fact file to generate global dict
- val (requireDimension, requireColumnNames) = pruneDimensions(dimensions,
- headers, df.columns)
- if (requireDimension.nonEmpty) {
- // select column to push down pruning
- df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
- val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
- requireDimension, storePath, dictfolderPath, false)
- // combine distinct value in a block and partition by column
- val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
- .partitionBy(new ColumnPartitioner(model.primDimensions.length))
- // generate global dictionary files
- val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
- // check result status
- checkStatus(carbonLoadModel, sqlContext, model, statusList)
- } else {
- LOGGER.info("No column found for generating global dictionary in source data files")
- }
- // generate global dict from dimension file
- if (carbonLoadModel.getDimFolderPath != null) {
- val fileMapArray = carbonLoadModel.getDimFolderPath.split(",")
- for (fileMap <- fileMapArray) {
- val dimTableName = fileMap.split(":")(0)
- var dimDataframe = loadDataFrame(sqlContext, carbonLoadModel)
- val (requireDimensionForDim, requireColumnNamesForDim) =
- pruneDimensions(dimensions, dimDataframe.columns, dimDataframe.columns)
- if (requireDimensionForDim.length >= 1) {
- dimDataframe = dimDataframe.select(requireColumnNamesForDim.head,
- requireColumnNamesForDim.tail: _*)
- val modelforDim = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
- requireDimensionForDim, storePath, dictfolderPath, false)
- val inputRDDforDim = new CarbonBlockDistinctValuesCombineRDD(
- dimDataframe.rdd, modelforDim)
- .partitionBy(new ColumnPartitioner(modelforDim.primDimensions.length))
- val statusListforDim = new CarbonGlobalDictionaryGenerateRDD(
- inputRDDforDim, modelforDim).collect()
- checkStatus(carbonLoadModel, sqlContext, modelforDim, statusListforDim)
- } else {
- LOGGER.info(s"No columns in dimension table $dimTableName " +
- "to generate global dictionary")
- }
- }
- }
- } else {
- LOGGER.info("Generate global dictionary from dictionary files!")
- val isNonempty = validateAllDictionaryPath(allDictionaryPath)
- if (isNonempty) {
- var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
- getHeaderFormFactFile(carbonLoadModel)
- } else {
- carbonLoadModel.getCsvHeader.toLowerCase.split("" + DEFAULT_SEPARATOR)
- }
- headers = headers.map(headerName => headerName.trim)
- // prune columns according to the CSV file header, dimension columns
- val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, headers)
- if (requireDimension.nonEmpty) {
- val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
- requireDimension, storePath, dictfolderPath, false)
- // check if dictionary files contains bad record
- val accumulator = sqlContext.sparkContext.accumulator(0)
- // read local dictionary file, and group by key
- val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers,
- requireColumnNames, allDictionaryPath, accumulator)
- // read exist dictionary and combine
- val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)
- .partitionBy(new ColumnPartitioner(model.primDimensions.length))
- // generate global dictionary files
- val statusList = new CarbonGlobalDictionaryGenerateRDD(inputRDD, model).collect()
- // check result status
- checkStatus(carbonLoadModel, sqlContext, model, statusList)
- // if the dictionary contains wrong format record, throw ex
- if (accumulator.value > 0) {
- throw new DataLoadingException("Data Loading failure, dictionary values are " +
- "not in correct format!")
- }
- } else {
- LOGGER.info("have no column need to generate global dictionary")
- }
- }
- }
- } catch {
- case ex: Exception =>
- LOGGER.error(ex, "generate global dictionary failed")
- throw ex
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
deleted file mode 100644
index 3acde94..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadCoalescedRDD.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.reflect.ClassTag
-
-import org.apache.spark._
-
-case class DataLoadPartitionWrap[T: ClassTag](rdd: RDD[T], partition: Partition)
-
-class DataLoadCoalescedRDD[T: ClassTag](
- @transient var prev: RDD[T],
- nodeList: Array[String])
- extends RDD[DataLoadPartitionWrap[T]](prev.context, Nil) with Logging {
-
- override def getPartitions: Array[Partition] = {
- new DataLoadPartitionCoalescer(prev, nodeList).run
- }
-
- override def compute(split: Partition,
- context: TaskContext): Iterator[DataLoadPartitionWrap[T]] = {
-
- new Iterator[DataLoadPartitionWrap[T]] {
- val iter = split.asInstanceOf[CoalescedRDDPartition].parents.iterator
- def hasNext = iter.hasNext
- def next: DataLoadPartitionWrap[T] = {
- DataLoadPartitionWrap(firstParent[T], iter.next())
- }
- }
- }
-
- override def getDependencies: Seq[Dependency[_]] = {
- Seq(new NarrowDependency(prev) {
- def getParents(id: Int): Seq[Int] =
- partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
- })
- }
-
- override def clearDependencies() {
- super.clearDependencies()
- prev = null
- }
-
- /**
- * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
- * then the preferred machine will be one which most parent splits prefer too.
- * @param partition
- * @return the machine most preferred by split
- */
- override def getPreferredLocations(partition: Partition): Seq[String] = {
- partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala b/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
deleted file mode 100644
index 8e0971c..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/rdd/DataLoadPartitionCoalescer.scala
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.rdd
-
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.HashSet
-import scala.collection.mutable.LinkedHashSet
-
-import org.apache.spark.Logging
-import org.apache.spark.Partition
-import org.apache.spark.scheduler.TaskLocation
-
-/**
- * DataLoadPartitionCoalescer
- * Repartition the partitions of rdd to few partitions, one partition per node.
- * exmaple:
- * blk_hst host1 host2 host3 host4 host5
- * block1 host1 host2 host3
- * block2 host2 host4 host5
- * block3 host3 host4 host5
- * block4 host1 host2 host4
- * block5 host1 host3 host4
- * block6 host1 host2 host5
- * -------------------------------------------------------
- * 1. sort host by number of blocks
- * -------------------------------------------------------
- * host3: block1 block3 block5
- * host5: block2 block3 block6
- * host1: block1 block4 block5 block6
- * host2: block1 block2 block4 block6
- * host4: block2 block3 block4 block5
- * -------------------------------------------------------
- * 2. sort blocks of each host1
- * new partitions are before old partitions
- * -------------------------------------------------------
- * host3: block1 block3 block5
- * host5: block2 block6+block3
- * host1: block4+block1 block5 block6
- * host2: block1 block2 block4 block6
- * host4: block2 block3 block4 block5
- * -------------------------------------------------------
- * 3. assign blocks to host
- * -------------------------------------------------------
- * step1: host3 choose block1, remove from host1, host2
- * step2: host5 choose block2, remove from host2, host4
- * step3: host1 choose block4, .....
- * -------------------------------------------------------
- * result:
- * host3: block1 block5
- * host5: block2
- * host1: block4
- * host2: block6
- * host4: block3
- */
-class DataLoadPartitionCoalescer(prev: RDD[_], nodeList: Array[String]) extends Logging {
-
- val prevPartitions = prev.partitions
- var numOfParts = Math.max(1, Math.min(nodeList.length, prevPartitions.length))
- // host => partition id list
- val hostMapPartitionIds = new HashMap[String, LinkedHashSet[Int]]
- // partition id => host list
- val partitionIdMapHosts = new HashMap[Int, ArrayBuffer[String]]
- val noLocalityPartitions = new ArrayBuffer[Int]
- var noLocality = true
- /**
- * assign a task location for a partition
- */
- private def getLocation(index: Int): Option[String] = {
- if (index < nodeList.length) {
- Some(nodeList(index))
- } else {
- None
- }
- }
-
- /**
- * collect partitions to each node
- */
- private def groupByNode(): Unit = {
- // initialize hostMapPartitionIds
- nodeList.foreach { node =>
- val map = new LinkedHashSet[Int]
- hostMapPartitionIds.put(node, map)
- }
- // collect partitions for each node
- val tmpNoLocalityPartitions = new ArrayBuffer[Int]
- prevPartitions.foreach { p =>
- val locs = DataLoadPartitionCoalescer.getPreferredLocs(prev, p)
- if (locs.isEmpty) {
- // if a partition has no location, add to noLocalityPartitions
- tmpNoLocalityPartitions += p.index
- } else {
- // add partion to hostMapPartitionIds and partitionIdMapHosts
- locs.foreach { loc =>
- val host = loc.host
- hostMapPartitionIds.get(host) match {
- // if the location of the partition is not in node list,
- // will add this partition to noLocalityPartitions
- case None => tmpNoLocalityPartitions += p.index
- case Some(ids) =>
- noLocality = false
- ids += p.index
- partitionIdMapHosts.get(p.index) match {
- case None =>
- val hosts = new ArrayBuffer[String]
- hosts += host
- partitionIdMapHosts.put(p.index, hosts)
- case Some(hosts) =>
- hosts += host
- }
- }
- }
- }
- }
-
- // remove locality partition
- tmpNoLocalityPartitions.distinct.foreach {index =>
- partitionIdMapHosts.get(index) match {
- case None => noLocalityPartitions += index
- case Some(_) =>
- }
- }
- }
-
- /**
- * sort host and partitions
- */
- private def sortHostAndPartitions(hostMapPartitionIdsSeq: Seq[(String, LinkedHashSet[Int])]) = {
- val oldPartitionIdSet = new HashSet[Int]
- // sort host by number of partitions
- hostMapPartitionIdsSeq.sortBy(_._2.size).map { loc =>
- // order: newPartitionIds + oldPartitionIds
- val sortedPartitionIdSet = new LinkedHashSet[Int]
- var newPartitionIds = new ArrayBuffer[Int]
- var oldPartitionIds = new ArrayBuffer[Int]
- loc._2.foreach { p =>
- if (oldPartitionIdSet.contains(p)) {
- oldPartitionIds += p
- } else {
- newPartitionIds += p
- oldPartitionIdSet.add(p)
- }
- }
- // sort and add new partitions
- newPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
- // sort and add old partitions
- oldPartitionIds.sortBy(x => x).foreach(sortedPartitionIdSet.add(_))
- // update hostMapPartitionIds
- hostMapPartitionIds.put(loc._1, sortedPartitionIdSet)
- (loc._1, sortedPartitionIdSet)
- }.toArray
- }
-
- /**
- * assign locality partition to each host
- */
- private def assignPartitonNodeLocality(
- noEmptyHosts: Seq[(String, LinkedHashSet[Int])]): Array[ArrayBuffer[Int]] = {
- val localityResult = new Array[ArrayBuffer[Int]](noEmptyHosts.length)
- for (i <- 0 until localityResult.length) {
- localityResult(i) = new ArrayBuffer[Int]
- }
- val noEmptyHostSet = new HashSet[String]
- noEmptyHosts.foreach {loc => noEmptyHostSet.add(loc._1)}
-
- var hostIndex = 0
- while (noEmptyHostSet.nonEmpty) {
- val hostEntry = noEmptyHosts(hostIndex)
- if (noEmptyHostSet.contains(hostEntry._1)) {
- if (hostEntry._2.nonEmpty) {
- var partitionId = hostEntry._2.iterator.next
- localityResult(hostIndex) += partitionId
- // remove from sortedParts
- partitionIdMapHosts.get(partitionId) match {
- case Some(locs) =>
- locs.foreach { loc =>
- hostMapPartitionIds.get(loc) match {
- case Some(parts) =>
- parts.remove(partitionId)
- }
- }
- }
- } else {
- noEmptyHostSet.remove(hostEntry._1)
- }
- }
-
- hostIndex = hostIndex + 1
- if (hostIndex == noEmptyHosts.length) {
- hostIndex = 0
- }
- }
- localityResult
- }
-
- /**
- * assign no locality partitions to each host
- */
- private def assignPartitionNoLocality(emptyHosts: mutable.Buffer[String],
- noEmptyHosts: mutable.Buffer[String],
- localityResult: mutable.Buffer[ArrayBuffer[Int]]): Array[ArrayBuffer[Int]] = {
- val noLocalityResult = new Array[ArrayBuffer[Int]](emptyHosts.length)
- logInfo(s"non empty host: ${noEmptyHosts.length}, empty host: ${emptyHosts.length}")
- val avgNumber = prevPartitions.length / (noEmptyHosts.length + emptyHosts.length)
- for (i <- 0 until noLocalityResult.length) {
- noLocalityResult(i) = new ArrayBuffer[Int]
- }
- var noLocalityPartitionIndex = 0
- if (noLocalityPartitions.nonEmpty) {
- if (emptyHosts.nonEmpty) {
- // at first, assign avg number to empty node
- for (i <- 0 until avgNumber) {
- noLocalityResult.foreach { partitionIds =>
- if (noLocalityPartitionIndex < noLocalityPartitions.length) {
- partitionIds += noLocalityPartitions(noLocalityPartitionIndex)
- noLocalityPartitionIndex = noLocalityPartitionIndex + 1
- }
- }
- }
- }
- // still have no locality partitions
- // assign to all hosts
- if (noLocalityPartitionIndex < noLocalityPartitions.length) {
- var partIndex = 0
- for (i <- noLocalityPartitionIndex until noLocalityPartitions.length) {
- if (partIndex < localityResult.length) {
- localityResult(partIndex) += noLocalityPartitions(i)
- } else {
- noLocalityResult(partIndex - localityResult.length) += noLocalityPartitions(i)
- }
- partIndex = partIndex + 1
- if (partIndex == localityResult.length + noLocalityResult.length) {
- partIndex = 0
- }
- }
- }
- }
- noLocalityResult
- }
-
- /**
- * no locality repartition
- */
- private def repartitionNoLocality(): Array[Partition] = {
- // no locality repartition
- logInfo("no locality partition")
- val prevPartIndexs = new Array[ArrayBuffer[Int]](numOfParts)
- for (i <- 0 until numOfParts) {
- prevPartIndexs(i) = new ArrayBuffer[Int]
- }
- for (i <- 0 until prevPartitions.length) {
- prevPartIndexs(i % numOfParts) += prevPartitions(i).index
- }
- prevPartIndexs.filter(_.nonEmpty).zipWithIndex.map { x =>
- new CoalescedRDDPartition(x._2, prev, x._1.toArray, getLocation(x._2))
- }
- }
-
- private def repartitionLocality(): Array[Partition] = {
- logInfo("locality partition")
- val hostMapPartitionIdsSeq = hostMapPartitionIds.toSeq
- // empty host seq
- val emptyHosts = hostMapPartitionIdsSeq.filter(_._2.isEmpty).map(_._1).toBuffer
- // non empty host array
- var tempNoEmptyHosts = hostMapPartitionIdsSeq.filter(_._2.nonEmpty)
-
- // 1. do locality repartition
- // sort host and partitions
- tempNoEmptyHosts = sortHostAndPartitions(tempNoEmptyHosts)
- // assign locality partition to non empty hosts
- val templocalityResult = assignPartitonNodeLocality(tempNoEmptyHosts)
- // collect non empty hosts and empty hosts
- val noEmptyHosts = mutable.Buffer[String]()
- val localityResult = mutable.Buffer[ArrayBuffer[Int]]()
- for(index <- 0 until templocalityResult.size) {
- if (templocalityResult(index).isEmpty) {
- emptyHosts += tempNoEmptyHosts(index)._1
- } else {
- noEmptyHosts += tempNoEmptyHosts(index)._1
- localityResult += templocalityResult(index)
- }
- }
- // 2. do no locality repartition
- // assign no locality partitions to all hosts
- val noLocalityResult = assignPartitionNoLocality(emptyHosts, noEmptyHosts, localityResult)
-
- // 3. generate CoalescedRDDPartition
- (0 until localityResult.length + noLocalityResult.length).map { index =>
- val ids = if (index < localityResult.length) {
- localityResult(index).toArray
- } else {
- noLocalityResult(index - localityResult.length).toArray
- }
- val loc = if (index < localityResult.length) {
- Some(noEmptyHosts(index))
- } else {
- Some(emptyHosts(index - localityResult.length))
- }
- logInfo(s"CoalescedRDDPartition ${index}, ${ids.length}, ${loc} ")
- new CoalescedRDDPartition(index, prev, ids, loc)
- }.filter(_.parentsIndices.nonEmpty).toArray
-
- }
-
- def run(): Array[Partition] = {
- // 1. group partitions by node
- groupByNode()
- logInfo(s"partition: ${prevPartitions.length}, no locality: ${noLocalityPartitions.length}")
- val partitions = if (noLocality) {
- // 2.A no locality partition
- repartitionNoLocality()
- } else {
- // 2.B locality partition
- repartitionLocality()
- }
- DataLoadPartitionCoalescer.checkPartition(prevPartitions, partitions)
- partitions
- }
-}
-
-object DataLoadPartitionCoalescer {
- def getPreferredLocs(prev: RDD[_], p: Partition): Seq[TaskLocation] = {
- prev.context.getPreferredLocs(prev, p.index)
- }
-
- def getParentsIndices(p: Partition): Array[Int] = {
- p.asInstanceOf[CoalescedRDDPartition].parentsIndices
- }
-
- def checkPartition(prevParts: Array[Partition], parts: Array[Partition]): Unit = {
- val prevPartIds = new ArrayBuffer[Int]
- parts.foreach{ p =>
- prevPartIds ++= DataLoadPartitionCoalescer.getParentsIndices(p)
- }
- // all partitions must be arranged once.
- assert(prevPartIds.size == prevParts.size)
- val prevPartIdsMap = prevPartIds.map{ id =>
- (id, id)
- }.toMap
- prevParts.foreach{ p =>
- prevPartIdsMap.get(p.index) match {
- case None => assert(false, "partition " + p.index + " not found")
- case Some(_) =>
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index cece5e6..d394bfa 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -17,15 +17,15 @@
package org.apache.spark.sql
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical.{UnaryNode, _}
import org.apache.spark.sql.hive.HiveContext
-import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.types._
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
+
/**
* Top command
*/
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
index ba97083..de43319 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala
@@ -57,8 +57,10 @@ class CarbonContext(
CarbonContext.addInstance(sc, this)
CodeGenerateFactory.init(sc.version)
+ CarbonEnv.init(this)
var lastSchemaUpdatedTime = System.currentTimeMillis()
+ val hiveClientInterface = metadataHive
protected[sql] override lazy val conf: SQLConf = new CarbonSQLConf
@@ -66,7 +68,7 @@ class CarbonContext(
override lazy val catalog = {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
- new CarbonMetastoreCatalog(this, storePath, metadataHive, queryId) with OverrideCatalog
+ new CarbonMetastore(this, storePath, metadataHive, queryId) with OverrideCatalog
}
@transient
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index ca0ad58..bb1c4ef 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -31,8 +31,6 @@ import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.execution.command.Partitioner
-import org.apache.spark.sql.hive.{DistributionUtil, TableMeta}
import org.apache.spark.sql.sources.{Filter, HadoopFsRelation, OutputWriterFactory}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
@@ -43,9 +41,8 @@ import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, Carbon
import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
import org.apache.carbondata.scan.expression.logical.AndExpression
import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
+import org.apache.carbondata.spark.merger.TableMeta
import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
-import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil
-
private[sql] case class CarbonDatasourceHadoopRelation(
sqlContext: SQLContext,
@@ -71,15 +68,7 @@ private[sql] case class CarbonDatasourceHadoopRelation(
carbonTable.getDatabaseName,
carbonTable.getFactTableName,
CarbonSparkUtil.createSparkMeta(carbonTable),
- TableMeta(absIdentifier.getCarbonTableIdentifier,
- paths.head,
- carbonTable,
- Partitioner(options.partitionClass,
- Array(""),
- options.partitionCount.toInt,
- DistributionUtil.getNodeList(sqlContext.sparkContext)
- )
- ),
+ new TableMeta(absIdentifier.getCarbonTableIdentifier, paths.head, carbonTable),
None
)(sqlContext)
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index a06d5cb..d898c4f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -23,12 +23,11 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import org.apache.hadoop.fs.Path
-import org.apache.spark._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.hive.{CarbonMetaData, CarbonMetastoreTypes, TableMeta}
+import org.apache.spark.sql.hive.{CarbonMetaData, CarbonMetastoreTypes}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
@@ -37,6 +36,7 @@ import org.apache.carbondata.core.carbon.path.CarbonStorePath
import org.apache.carbondata.core.datastorage.store.impl.FileFactory
import org.apache.carbondata.lcm.status.SegmentStatusManager
import org.apache.carbondata.spark.{CarbonOption, _}
+import org.apache.carbondata.spark.merger.TableMeta
/**
* Carbon relation provider compliant to data source api.
@@ -135,8 +135,8 @@ private[sql] case class CarbonDatasourceRelation(
extends BaseRelation with Serializable {
lazy val carbonRelation: CarbonRelation = {
- CarbonEnv.getInstance(context)
- .carbonCatalog.lookupRelation1(tableIdentifier, None)(sqlContext)
+ CarbonEnv.get
+ .carbonMetastore.lookupRelation1(tableIdentifier, None)(sqlContext)
.asInstanceOf[CarbonRelation]
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index f4fe900..397d479 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -23,8 +23,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
-import org.apache.spark.sql.hive.{CarbonMetastoreCatalog, CarbonMetastoreTypes}
-import org.apache.spark.sql.optimizer.{CarbonAliasDecoderRelation, CarbonDecoderRelation}
+import org.apache.spark.sql.hive.{CarbonMetastore, CarbonMetastoreTypes}
+import org.apache.spark.sql.optimizer.CarbonDecoderRelation
import org.apache.spark.sql.types._
import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
@@ -35,10 +35,10 @@ import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.carbon.querystatistics._
import org.apache.carbondata.core.util.{CarbonTimeStatisticsFactory, DataTypeUtil}
+import org.apache.carbondata.spark.CarbonAliasDecoderRelation
/**
- * It decodes the data.
- *
+ * It decodes the dictionary key to value
*/
case class CarbonDictionaryDecoder(
relations: Seq[CarbonDecoderRelation],
@@ -48,7 +48,6 @@ case class CarbonDictionaryDecoder(
(@transient sqlContext: SQLContext)
extends UnaryNode {
-
override def otherCopyArgs: Seq[AnyRef] = sqlContext :: Nil
override val output: Seq[Attribute] = {
@@ -149,11 +148,9 @@ case class CarbonDictionaryDecoder(
override def canProcessSafeRows: Boolean = true
-
-
override def doExecute(): RDD[InternalRow] = {
attachTree(this, "execute") {
- val storePath = sqlContext.catalog.asInstanceOf[CarbonMetastoreCatalog].storePath
+ val storePath = sqlContext.catalog.asInstanceOf[CarbonMetastore].storePath
val queryId = sqlContext.getConf("queryId", System.nanoTime() + "")
val absoluteTableIdentifiers = relations.map { relation =>
val carbonTable = relation.carbonRelation.carbonRelation.metaData.carbonTable
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index be77954..6cfbd5f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,51 +17,31 @@
package org.apache.spark.sql
-import org.apache.spark.SparkContext
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.sql.hive.{CarbonMetastoreCatalog, HiveContext}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.spark.sql.hive.CarbonMetastore
/**
* Carbon Environment for unified context
*/
-case class CarbonEnv(hiveContext: HiveContext, carbonCatalog: CarbonMetastoreCatalog)
+case class CarbonEnv(carbonMetastore: CarbonMetastore)
object CarbonEnv {
- private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- var carbonEnv: CarbonEnv = _
- def getInstance(sqlContext: SQLContext): CarbonEnv = {
- if (carbonEnv == null) {
- carbonEnv =
- CarbonEnv(sqlContext.asInstanceOf[CarbonContext],
- sqlContext.asInstanceOf[CarbonContext].catalog)
+ @volatile private var carbonEnv: CarbonEnv = _
+
+ var initialized = false
+
+ def init(sqlContext: SQLContext): Unit = {
+ if (!initialized) {
+ val cc = sqlContext.asInstanceOf[CarbonContext]
+ val catalog = new CarbonMetastore(cc, cc.storePath, cc.hiveClientInterface, "")
+ carbonEnv = CarbonEnv(catalog)
+ initialized = true
}
- carbonEnv
}
- /**
- *
- * Requesting the extra executors other than the existing ones.
- *
- * @param sc
- * @param numExecutors
- * @return
- */
- final def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = {
- sc.schedulerBackend match {
- case b: CoarseGrainedSchedulerBackend =>
- val requiredExecutors = numExecutors - b.numExistingExecutors
- LOGGER.info(s"number of executors is =$numExecutors existing executors are =" +
- s"${ b.numExistingExecutors }")
- if (requiredExecutors > 0) {
- b.requestExecutors(requiredExecutors)
- }
- true
- case _ =>
- false
- }
+ def get: CarbonEnv = {
+ if (initialized) carbonEnv
+ else throw new RuntimeException("CarbonEnv not initialized")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
index 6580c4f..3fe4f22 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala
@@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.LeafNode
-import org.apache.spark.sql.hive.CarbonMetastoreCatalog
+import org.apache.spark.sql.hive.CarbonMetastore
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
@@ -36,14 +36,14 @@ import org.apache.carbondata.spark.CarbonFilters
import org.apache.carbondata.spark.rdd.CarbonScanRDD
case class CarbonScan(
- var attributesRaw: Seq[Attribute],
+ var columnProjection: Seq[Attribute],
relationRaw: CarbonRelation,
dimensionPredicatesRaw: Seq[Expression],
useUnsafeCoversion: Boolean = true)(@transient val ocRaw: SQLContext) extends LeafNode {
val carbonTable = relationRaw.metaData.carbonTable
val selectedDims = scala.collection.mutable.MutableList[QueryDimension]()
val selectedMsrs = scala.collection.mutable.MutableList[QueryMeasure]()
- @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastoreCatalog]
+ @transient val carbonCatalog = ocRaw.catalog.asInstanceOf[CarbonMetastore]
val attributesNeedToDecode = new java.util.LinkedHashSet[AttributeReference]()
val unprocessedExprs = new ArrayBuffer[Expression]()
@@ -79,21 +79,21 @@ case class CarbonScan(
private def processExtraAttributes(plan: CarbonQueryPlan) {
if (attributesNeedToDecode.size() > 0) {
- val attributeOut = new ArrayBuffer[Attribute]() ++ attributesRaw
+ val attributeOut = new ArrayBuffer[Attribute]() ++ columnProjection
attributesNeedToDecode.asScala.foreach { attr =>
- if (!attributesRaw.exists(_.name.equalsIgnoreCase(attr.name))) {
+ if (!columnProjection.exists(_.name.equalsIgnoreCase(attr.name))) {
attributeOut += attr
}
}
- attributesRaw = attributeOut
+ columnProjection = attributeOut
}
val dimensions = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
val measures = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
val dimAttr = new Array[Attribute](dimensions.size())
val msrAttr = new Array[Attribute](measures.size())
- attributesRaw.foreach { attr =>
+ columnProjection.foreach { attr =>
val carbonDimension =
carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
if(carbonDimension != null) {
@@ -107,10 +107,10 @@ case class CarbonScan(
}
}
- attributesRaw = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
+ columnProjection = dimAttr.filter(f => f != null) ++ msrAttr.filter(f => f != null)
var queryOrder: Integer = 0
- attributesRaw.foreach { attr =>
+ columnProjection.foreach { attr =>
val carbonDimension =
carbonTable.getDimensionByName(carbonTable.getFactTableName, attr.name)
if (carbonDimension != null) {
@@ -137,22 +137,11 @@ case class CarbonScan(
}
def inputRdd: CarbonScanRDD[Array[Any]] = {
-
- val conf = new Configuration()
- val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-
- // setting queryid
- buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + ""))
-
- val tableCreationTime = carbonCatalog
- .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName)
- val schemaLastUpdatedTime = carbonCatalog
- .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName)
new CarbonScanRDD(
ocRaw.sparkContext,
- attributesRaw,
+ columnProjection,
buildCarbonPlan.getFilterExpression,
- absoluteTableIdentifier,
+ carbonTable.getAbsoluteTableIdentifier,
carbonTable
)
}
@@ -180,7 +169,7 @@ case class CarbonScan(
}
def output: Seq[Attribute] = {
- attributesRaw
+ columnProjection
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
new file mode 100644
index 0000000..4320598
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSparkUtil.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
+
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonUtil
+
+case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
+
+object CarbonSparkUtil {
+
+ def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
+ val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
+ .asScala.map(x => x.getColName) // wf : may be problem
+ val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
+ .asScala.map(x => x.getColName)
+ val dictionary =
+ carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
+ (f.getColName.toLowerCase,
+ f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
+ !CarbonUtil.hasComplexDataType(f.getDataType))
+ }
+ CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 62f1d4c..47fa82e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@ -428,7 +428,7 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
throw new MalformedCarbonCommandException("Invalid table properties")
}
// prepare table model of the collected tokens
- val tableModel: tableModel = prepareTableModel(ifNotExistPresent,
+ val tableModel: TableModel = prepareTableModel(ifNotExistPresent,
dbName,
tableName,
fields,
@@ -502,7 +502,7 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
protected def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String]
, tableName: String, fields: Seq[Field],
partitionCols: Seq[PartitionerField],
- tableProperties: Map[String, String]): tableModel
+ tableProperties: Map[String, String]): TableModel
= {
val (dims: Seq[Field], noDictionaryDims: Seq[String]) = extractDimColsAndNoDictionaryFields(
@@ -530,11 +530,10 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
// get no inverted index columns from table properties.
val noInvertedIdxCols = extractNoInvertedIndexColumns(fields, tableProperties)
- val partitioner: Option[Partitioner] = getPartitionerObject(partitionCols, tableProperties)
// validate the tableBlockSize from table properties
CommonUtil.validateTableBlockSize(tableProperties)
- tableModel(
+ TableModel(
ifNotExistPresent,
dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
dbName,
@@ -544,7 +543,6 @@ class CarbonSqlParser() extends AbstractSparkSQLParser {
msrs.map(f => normalizeType(f)),
Option(noDictionaryDims),
Option(noInvertedIdxCols),
- partitioner,
groupCols,
Some(colProps))
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
deleted file mode 100644
index 7884f9d..0000000
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonTableIdentifierImplicit.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst
-
-import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
-
-/**
- * Implicit functions for [TableIdentifier]
- */
-object CarbonTableIdentifierImplicit {
- def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName)
-
- implicit def toTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
- tableIdentifier match {
- case Seq(dbName, tableName) => TableIdentifier(tableName, Some(dbName))
- case Seq(tableName) => TableIdentifier(tableName, None)
- case _ => throw new NoSuchTableException
- }
- }
-
- implicit def toSequence(tableIdentifier: TableIdentifier): Seq[String] = {
- tableIdentifier.database match {
- case Some(dbName) => Seq(dbName, tableIdentifier.table)
- case _ => Seq(tableIdentifier.table)
- }
- }
-}