You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:42 UTC
[04/14] incubator-carbondata git commit: rebase
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
deleted file mode 100644
index c91cec0..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ /dev/null
@@ -1,558 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.io.{DataInputStream, InputStreamReader}
-import java.nio.charset.Charset
-import java.text.SimpleDateFormat
-import java.util.regex.Pattern
-
-import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
-import scala.util.control.Breaks.{break, breakable}
-
-import au.com.bytecode.opencsv.CSVReader
-import org.apache.commons.lang3.{ArrayUtils, StringUtils}
-import org.apache.spark._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Row
-
-import org.apache.carbondata.common.factory.CarbonCommonFactory
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
-import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
-import org.apache.carbondata.spark.util.CarbonScalaUtil
-import org.apache.carbondata.spark.util.GlobalDictionaryUtil
-import org.apache.carbondata.spark.util.GlobalDictionaryUtil._
-
-/**
- * A partitioner partition by column.
- *
- * @constructor create a partitioner
- * @param numParts the number of partitions
- */
-class ColumnPartitioner(numParts: Int) extends Partitioner {
- override def numPartitions: Int = numParts
-
- override def getPartition(key: Any): Int = key.asInstanceOf[Int]
-}
-
-trait GenericParser {
- val dimension: CarbonDimension
-
- def addChild(child: GenericParser): Unit
-
- def parseString(input: String): Unit
-}
-
-case class DictionaryStats(distinctValues: java.util.List[String],
- dictWriteTime: Long, sortIndexWriteTime: Long)
-
-case class PrimitiveParser(dimension: CarbonDimension,
- setOpt: Option[HashSet[String]]) extends GenericParser {
- val (hasDictEncoding, set: HashSet[String]) = setOpt match {
- case None => (false, new HashSet[String])
- case Some(x) => (true, x)
- }
-
- def addChild(child: GenericParser): Unit = {
- }
-
- def parseString(input: String): Unit = {
- if (hasDictEncoding && input != null) {
- set.add(input)
- }
- }
-}
-
-case class ArrayParser(dimension: CarbonDimension, format: DataFormat) extends GenericParser {
- var children: GenericParser = _
-
- def addChild(child: GenericParser): Unit = {
- children = child
- }
-
- def parseString(input: String): Unit = {
- if (StringUtils.isNotEmpty(input)) {
- val splits = format.getSplits(input)
- if (ArrayUtils.isNotEmpty(splits)) {
- splits.foreach { s =>
- children.parseString(s)
- }
- }
- }
- }
-}
-
-case class StructParser(dimension: CarbonDimension,
- format: DataFormat) extends GenericParser {
- val children = new ArrayBuffer[GenericParser]
-
- def addChild(child: GenericParser): Unit = {
- children += child
- }
-
- def parseString(input: String): Unit = {
- if (StringUtils.isNotEmpty(input)) {
- val splits = format.getSplits(input)
- val len = Math.min(children.length, splits.length)
- for (i <- 0 until len) {
- children(i).parseString(splits(i))
- }
- }
- }
-}
-
-case class DataFormat(delimiters: Array[String],
- var delimiterIndex: Int,
- patterns: Array[Pattern]) extends Serializable {
- self =>
- def getSplits(input: String): Array[String] = {
- // -1 in case after splitting the last column is empty, the surrogate key ahs to be generated
- // for empty value too
- patterns(delimiterIndex).split(input, -1)
- }
-
- def cloneAndIncreaseIndex: DataFormat = {
- DataFormat(delimiters, Math.min(delimiterIndex + 1, delimiters.length - 1), patterns)
- }
-}
-
-/**
- * a case class to package some attributes
- */
-case class DictionaryLoadModel(table: CarbonTableIdentifier,
- dimensions: Array[CarbonDimension],
- hdfsLocation: String,
- dictfolderPath: String,
- dictFilePaths: Array[String],
- dictFileExists: Array[Boolean],
- isComplexes: Array[Boolean],
- primDimensions: Array[CarbonDimension],
- delimiters: Array[String],
- highCardIdentifyEnable: Boolean,
- highCardThreshold: Int,
- rowCountPercentage: Double,
- columnIdentifier: Array[ColumnIdentifier],
- isFirstLoad: Boolean,
- hdfsTempLocation: String,
- lockType: String,
- zooKeeperUrl: String,
- serializationNullFormat: String) extends Serializable
-
-case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable
-
-/**
- * A RDD to combine all dictionary distinct values.
- *
- * @constructor create a RDD with RDD[(String, Iterable[String])]
- * @param prev the input RDD[(String, Iterable[String])]
- * @param model a model package load info
- */
-class CarbonAllDictionaryCombineRDD(
- prev: RDD[(String, Iterable[String])],
- model: DictionaryLoadModel)
- extends RDD[(Int, ColumnDistinctValues)](prev) {
-
- override def getPartitions: Array[Partition] = {
- firstParent[(String, Iterable[String])].partitions
- }
-
- override def compute(split: Partition, context: TaskContext
- ): Iterator[(Int, ColumnDistinctValues)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-
- val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
- /*
- * for all dictionary, all columns need to encoding and checking
- * isHighCardinalityColumn, so no need to calculate rowcount
- */
- val rowCount = 0L
- try {
- val dimensionParsers =
- GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList)
- val dimNum = model.dimensions.length
- // Map[dimColName -> dimColNameIndex]
- val columnIndexMap = new HashMap[String, Int]()
- for (j <- 0 until dimNum) {
- columnIndexMap.put(model.dimensions(j).getColName, j)
- }
-
- var row: (String, Iterable[String]) = null
- val rddIter = firstParent[(String, Iterable[String])].iterator(split, context)
- // generate block distinct value set
- while (rddIter.hasNext) {
- row = rddIter.next()
- if (row != null) {
- columnIndexMap.get(row._1) match {
- case Some(index) =>
- for (record <- row._2) {
- dimensionParsers(index).parseString(record)
- }
- case None =>
- }
- }
- }
- } catch {
- case ex: Exception =>
- LOGGER.error(ex)
- throw ex
- }
-
- distinctValuesList.map { iter =>
- val valueList = iter._2.toArray
- (iter._1, ColumnDistinctValues(valueList, rowCount))
- }.iterator
- }
-}
-
-/**
- * A RDD to combine distinct values in block.
- *
- * @constructor create a RDD with RDD[Row]
- * @param prev the input RDD[Row]
- * @param model a model package load info
- */
-class CarbonBlockDistinctValuesCombineRDD(
- prev: RDD[Row],
- model: DictionaryLoadModel)
- extends RDD[(Int, ColumnDistinctValues)](prev) {
-
- override def getPartitions: Array[Partition] = firstParent[Row].partitions
-
- override def compute(split: Partition,
- context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
- val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
- var rowCount = 0L
- try {
- val dimensionParsers =
- GlobalDictionaryUtil.createDimensionParsers(model, distinctValuesList)
- val dimNum = model.dimensions.length
- var row: Row = null
- val rddIter = firstParent[Row].iterator(split, context)
- val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
- .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
- val format = new SimpleDateFormat(formatString)
- // generate block distinct value set
- while (rddIter.hasNext) {
- row = rddIter.next()
- if (row != null) {
- rowCount += 1
- for (i <- 0 until dimNum) {
- dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i),
- model.serializationNullFormat, model.delimiters(0), model.delimiters(1), format))
- }
- }
- }
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordLoadCsvfilesToDfTime()
- } catch {
- case ex: Exception =>
- LOGGER.error(ex)
- throw ex
- }
-
- distinctValuesList.map { iter =>
- val valueList = iter._2.toArray
- (iter._1, ColumnDistinctValues(valueList, rowCount))
- }.iterator
- }
-}
-
-/**
- * A RDD to generate dictionary file for each column
- *
- * @constructor create a RDD with RDD[Row]
- * @param prev the input RDD[Row]
- * @param model a model package load info
- */
-class CarbonGlobalDictionaryGenerateRDD(
- prev: RDD[(Int, ColumnDistinctValues)],
- model: DictionaryLoadModel)
- extends RDD[(Int, String, Boolean)](prev) {
-
- override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
-
- override def compute(split: Partition, context: TaskContext): Iterator[(Int, String, Boolean)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
- var isHighCardinalityColumn = false
- val iter = new Iterator[(Int, String, Boolean)] {
- var dictionaryForDistinctValueLookUp:
- org.apache.carbondata.core.cache.dictionary.Dictionary = _
- var dictionaryForSortIndexWriting: org.apache.carbondata.core.cache.dictionary.Dictionary = _
- var dictionaryForDistinctValueLookUpCleared: Boolean = false
- val pathService = CarbonCommonFactory.getPathService
- val carbonTablePath = pathService.getCarbonTablePath(model.hdfsLocation, model.table)
- if (StringUtils.isNotBlank(model.hdfsTempLocation )) {
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION,
- model.hdfsTempLocation)
- }
- if (StringUtils.isNotBlank(model.lockType)) {
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.LOCK_TYPE,
- model.lockType)
- }
- if (StringUtils.isNotBlank(model.zooKeeperUrl)) {
- CarbonProperties.getInstance.addProperty(CarbonCommonConstants.ZOOKEEPER_URL,
- model.zooKeeperUrl)
- }
- val dictLock = CarbonLockFactory
- .getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory,
- model.columnIdentifier(split.index).getColumnId + LockUsage.LOCK)
- var isDictionaryLocked = false
- // generate distinct value list
- try {
- val t1 = System.currentTimeMillis
- val valuesBuffer = new mutable.HashSet[String]
- val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context)
- var rowCount = 0L
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
- breakable {
- while (rddIter.hasNext) {
- val distinctValueList = rddIter.next()._2
- valuesBuffer ++= distinctValueList.values
- rowCount += distinctValueList.rowCount
- // check high cardinality
- if (model.isFirstLoad && model.highCardIdentifyEnable
- && !model.isComplexes(split.index)
- && model.dimensions(split.index).isColumnar) {
- isHighCardinalityColumn = GlobalDictionaryUtil.isHighCardinalityColumn(
- valuesBuffer.size, rowCount, model)
- if (isHighCardinalityColumn) {
- break
- }
- }
- }
- }
- val combineListTime = System.currentTimeMillis() - t1
- if (isHighCardinalityColumn) {
- LOGGER.info(s"column ${ model.table.getTableUniqueName }." +
- s"${
- model.primDimensions(split.index)
- .getColName
- } is high cardinality column")
- } else {
- isDictionaryLocked = dictLock.lockWithRetries()
- if (isDictionaryLocked) {
- logInfo(s"Successfully able to get the dictionary lock for ${
- model.primDimensions(split.index).getColName
- }")
- } else {
- sys
- .error(s"Dictionary file ${
- model.primDimensions(split.index).getColName
- } is locked for updation. Please try after some time")
- }
- val t2 = System.currentTimeMillis
- val fileType = FileFactory.getFileType(model.dictFilePaths(split.index))
- model.dictFileExists(split.index) = FileFactory
- .isFileExist(model.dictFilePaths(split.index), fileType)
- dictionaryForDistinctValueLookUp = if (model.dictFileExists(split.index)) {
- CarbonLoaderUtil.getDictionary(model.table,
- model.columnIdentifier(split.index),
- model.hdfsLocation,
- model.primDimensions(split.index).getDataType
- )
- } else {
- null
- }
- val dictCacheTime = System.currentTimeMillis - t2
- val t3 = System.currentTimeMillis()
- val dictWriteTask = new DictionaryWriterTask(valuesBuffer,
- dictionaryForDistinctValueLookUp,
- model,
- split.index)
- // execute dictionary writer task to get distinct values
- val distinctValues = dictWriteTask.execute()
- val dictWriteTime = System.currentTimeMillis() - t3
- val t4 = System.currentTimeMillis()
- // if new data came than rewrite sort index file
- if (distinctValues.size() > 0) {
- val sortIndexWriteTask = new SortIndexWriterTask(model,
- split.index,
- dictionaryForDistinctValueLookUp,
- distinctValues)
- sortIndexWriteTask.execute()
- }
- val sortIndexWriteTime = System.currentTimeMillis() - t4
- CarbonTimeStatisticsFactory.getLoadStatisticsInstance.recordDicShuffleAndWriteTime()
- // After sortIndex writing, update dictionaryMeta
- dictWriteTask.updateMetaData()
- // clear the value buffer after writing dictionary data
- valuesBuffer.clear
- CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
- dictionaryForDistinctValueLookUpCleared = true
- LOGGER.info(s"\n columnName: ${ model.primDimensions(split.index).getColName }" +
- s"\n columnId: ${ model.primDimensions(split.index).getColumnId }" +
- s"\n new distinct values count: ${ distinctValues.size() }" +
- s"\n combine lists: $combineListTime" +
- s"\n create dictionary cache: $dictCacheTime" +
- s"\n sort list, distinct and write: $dictWriteTime" +
- s"\n write sort info: $sortIndexWriteTime")
- }
- } catch {
- case ex: Exception =>
- LOGGER.error(ex)
- throw ex
- } finally {
- if (!dictionaryForDistinctValueLookUpCleared) {
- CarbonUtil.clearDictionaryCache(dictionaryForDistinctValueLookUp)
- }
- CarbonUtil.clearDictionaryCache(dictionaryForSortIndexWriting)
- if (dictLock != null && isDictionaryLocked) {
- if (dictLock.unlock()) {
- logInfo(s"Dictionary ${
- model.primDimensions(split.index).getColName
- } Unlocked Successfully.")
- } else {
- logError(s"Unable to unlock Dictionary ${
- model.primDimensions(split.index).getColName
- }")
- }
- }
- }
- var finished = false
-
- override def hasNext: Boolean = {
-
- if (!finished) {
- finished = true
- finished
- } else {
- !finished
- }
- }
-
- override def next(): (Int, String, Boolean) = {
- (split.index, status, isHighCardinalityColumn)
- }
- }
-
- iter
- }
-
-}
-
-/**
- * Set column dictionry patition format
- *
- * @param id partition id
- * @param dimension current carbon dimension
- */
-class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
- extends Partition {
- override val index: Int = id
- val preDefDictDimension = dimension
-}
-
-
-/**
- * Use external column dict to generate global dictionary
- *
- * @param carbonLoadModel carbon load model
- * @param sparkContext spark context
- * @param table carbon table identifier
- * @param dimensions carbon dimenisons having predefined dict
- * @param hdfsLocation carbon base store path
- * @param dictFolderPath path of dictionary folder
- */
-class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
- dictionaryLoadModel: DictionaryLoadModel,
- sparkContext: SparkContext,
- table: CarbonTableIdentifier,
- dimensions: Array[CarbonDimension],
- hdfsLocation: String,
- dictFolderPath: String)
- extends RDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
-
- override def getPartitions: Array[Partition] = {
- val primDimensions = dictionaryLoadModel.primDimensions
- val primDimLength = primDimensions.length
- val result = new Array[Partition](primDimLength)
- for (i <- 0 until primDimLength) {
- result(i) = new CarbonColumnDictPatition(i, primDimensions(i))
- }
- result
- }
-
- override def compute(split: Partition, context: TaskContext)
- : Iterator[(Int, ColumnDistinctValues)] = {
- val theSplit = split.asInstanceOf[CarbonColumnDictPatition]
- val primDimension = theSplit.preDefDictDimension
- // read the column dict data
- val preDefDictFilePath = carbonLoadModel.getPredefDictFilePath(primDimension)
- var csvReader: CSVReader = null
- var inputStream: DataInputStream = null
- var colDictData: java.util.Iterator[Array[String]] = null
- try {
- inputStream = FileFactory.getDataInputStream(preDefDictFilePath,
- FileFactory.getFileType(preDefDictFilePath))
- csvReader = new CSVReader(new InputStreamReader(inputStream, Charset.defaultCharset),
- carbonLoadModel.getCsvDelimiter.charAt(0))
- // read the column data to list iterator
- colDictData = csvReader.readAll.iterator
- } catch {
- case ex: Exception =>
- logError(s"Error in reading pre-defined " +
- s"dictionary file:${ ex.getMessage }")
- throw ex
- } finally {
- if (csvReader != null) {
- try {
- csvReader.close()
- } catch {
- case ex: Exception =>
- logError(s"Error in closing csvReader of " +
- s"pre-defined dictionary file:${ ex.getMessage }")
- }
- }
- if (inputStream != null) {
- try {
- inputStream.close()
- } catch {
- case ex: Exception =>
- logError(s"Error in closing inputStream of " +
- s"pre-defined dictionary file:${ ex.getMessage }")
- }
- }
- }
- val mapIdWithSet = new HashMap[String, HashSet[String]]
- val columnValues = new HashSet[String]
- val distinctValues = (theSplit.index, columnValues)
- mapIdWithSet.put(primDimension.getColumnId, columnValues)
- // use parser to generate new dict value
- val dimensionParser = GlobalDictionaryUtil.generateParserForDimension(
- Some(primDimension),
- createDataFormat(carbonLoadModel.getDelimiters),
- mapIdWithSet).get
- // parse the column data
- while (colDictData.hasNext) {
- dimensionParser.parseString(colDictData.next()(0))
- }
- Array((distinctValues._1,
- ColumnDistinctValues(distinctValues._2.toArray, 0L))).iterator
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
deleted file mode 100644
index 249f2cd..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ /dev/null
@@ -1,344 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import java.util
-import java.util.{Collections, List}
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.util.Random
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo}
-import org.apache.spark.sql.hive.DistributionUtil
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo}
-import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException}
-import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger}
-import org.apache.carbondata.processing.model.CarbonLoadModel
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
-import org.apache.carbondata.scan.result.iterator.RawResultIterator
-import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.splits.TableSplit
-
-
-class CarbonMergerRDD[K, V](
- sc: SparkContext,
- result: MergeResult[K, V],
- carbonLoadModel: CarbonLoadModel,
- carbonMergerMapping: CarbonMergerMapping,
- confExecutorsTemp: String)
- extends RDD[(K, V)](sc, Nil) {
-
- sc.setLocalProperty("spark.scheduler.pool", "DDL")
- sc.setLocalProperty("spark.job.interruptOnCancel", "true")
-
- var storeLocation: String = null
- val storePath = carbonMergerMapping.storePath
- val metadataFilePath = carbonMergerMapping.metadataFilePath
- val mergedLoadName = carbonMergerMapping.mergedLoadName
- val databaseName = carbonMergerMapping.databaseName
- val factTableName = carbonMergerMapping.factTableName
- val tableId = carbonMergerMapping.tableId
-
- override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- val iter = new Iterator[(K, V)] {
-
- carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
- val tempLocationKey: String = CarbonCommonConstants
- .COMPACTION_KEY_WORD + '_' + carbonLoadModel
- .getDatabaseName + '_' + carbonLoadModel
- .getTableName + '_' + carbonLoadModel.getTaskNo
-
- // this property is used to determine whether temp location for carbon is inside
- // container temp dir or is yarn application directory.
- val carbonUseLocalDir = CarbonProperties.getInstance()
- .getProperty("carbon.use.local.dir", "false")
-
- if (carbonUseLocalDir.equalsIgnoreCase("true")) {
-
- val storeLocations = CarbonLoaderUtil.getConfiguredLocalDirs(SparkEnv.get.conf)
- if (null != storeLocations && storeLocations.nonEmpty) {
- storeLocation = storeLocations(Random.nextInt(storeLocations.length))
- }
- if (storeLocation == null) {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
- } else {
- storeLocation = System.getProperty("java.io.tmpdir")
- }
- storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
- CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
- LOGGER.info(s"Temp storeLocation taken is $storeLocation")
- var mergeStatus = false
- var mergeNumber = ""
- var exec: CarbonCompactionExecutor = null
- try {
- val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
-
- // get destination segment properties as sent from driver which is of last segment.
-
- val segmentProperties = new SegmentProperties(
- carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
- carbonMergerMapping.maxSegmentColCardinality)
-
- // sorting the table block info List.
- val splitList = carbonSparkPartition.split.value.getAllSplits
- val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList)
-
- Collections.sort(tableBlockInfoList)
-
- val segmentMapping: java.util.Map[String, TaskBlockInfo] =
- CarbonCompactionUtil.createMappingForSegments(tableBlockInfoList)
-
- val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
- CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
-
- carbonLoadModel.setStorePath(storePath)
-
- exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties, databaseName,
- factTableName, storePath, carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable,
- dataFileMetadataSegMapping
- )
-
- // fire a query and get the results.
- var result2: util.List[RawResultIterator] = null
- try {
- result2 = exec.processTableBlocks()
- } catch {
- case e: Throwable =>
- if (null != exec) {
- exec.finish()
- }
- LOGGER.error(e)
- if (null != e.getMessage) {
- sys.error(s"Exception occurred in query execution :: ${ e.getMessage }")
- } else {
- sys.error("Exception occurred in query execution.Please check logs.")
- }
- }
- mergeNumber = mergedLoadName
- .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
- CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
- )
-
- val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
- factTableName,
- carbonLoadModel.getTaskNo,
- "0",
- mergeNumber,
- true
- )
-
- carbonLoadModel.setSegmentId(mergeNumber)
- carbonLoadModel.setPartitionId("0")
- val merger =
- new RowResultMerger(result2,
- databaseName,
- factTableName,
- segmentProperties,
- tempStoreLoc,
- carbonLoadModel,
- carbonMergerMapping.maxSegmentColCardinality
- )
- mergeStatus = merger.mergerSlice()
-
- } catch {
- case e: Exception =>
- LOGGER.error(e)
- throw e
- } finally {
- // delete temp location data
- val newSlice = CarbonCommonConstants.LOAD_FOLDER + mergeNumber
- try {
- val isCompactionFlow = true
- CarbonLoaderUtil
- .deleteLocalDataLoadFolderLocation(carbonLoadModel, isCompactionFlow)
- } catch {
- case e: Exception =>
- LOGGER.error(e)
- }
- if (null != exec) {
- exec.finish
- }
- }
-
- var finished = false
-
- override def hasNext: Boolean = {
- if (!finished) {
- finished = true
- finished
- } else {
- !finished
- }
- }
-
- override def next(): (K, V) = {
- finished = true
- result.getKey(0, mergeStatus)
- }
-
- }
- iter
- }
-
- override def getPreferredLocations(split: Partition): Seq[String] = {
- val theSplit = split.asInstanceOf[CarbonSparkPartition]
- theSplit.split.value.getLocations.filter(_ != "localhost")
- }
-
- override def getPartitions: Array[Partition] = {
- val startTime = System.currentTimeMillis()
- val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
- storePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
- )
- val jobConf: JobConf = new JobConf(new Configuration)
- val job: Job = new Job(jobConf)
- val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
- var defaultParallelism = sparkContext.defaultParallelism
- val result = new util.ArrayList[Partition](defaultParallelism)
-
- // mapping of the node and block list.
- var nodeBlockMapping: util.Map[String, util.List[Distributable]] = new
- util.HashMap[String, util.List[Distributable]]
-
- val noOfBlocks = 0
- var carbonInputSplits = mutable.Seq[CarbonInputSplit]()
-
- // for each valid segment.
- for (eachSeg <- carbonMergerMapping.validSegments) {
-
- // map for keeping the relation of a task and its blocks.
- job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
-
- // get splits
- val splits = format.getSplits(job)
- carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
- }
-
- // prepare the details required to extract the segment properties using last segment.
- if (null != carbonInputSplits && carbonInputSplits.nonEmpty) {
- val carbonInputSplit = carbonInputSplits.last
- var dataFileFooter: DataFileFooter = null
-
- try {
- dataFileFooter = CarbonUtil.readMetadatFile(carbonInputSplit.getPath.toString(),
- carbonInputSplit.getStart, carbonInputSplit.getLength)
- } catch {
- case e: CarbonUtilException =>
- logError("Exception in preparing the data file footer for compaction " + e.getMessage)
- throw e
- }
-
- carbonMergerMapping.maxSegmentColCardinality = dataFileFooter.getSegmentInfo
- .getColumnCardinality
- carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
- .toList
- }
- // send complete list of blocks to the mapping util.
- nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping(
- carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava, -1)
-
- val confExecutors = confExecutorsTemp.toInt
- val requiredExecutors = if (nodeBlockMapping.size > confExecutors) {
- confExecutors
- } else { nodeBlockMapping.size() }
- CarbonEnv.ensureExecutors(sparkContext, requiredExecutors)
- logInfo("No.of Executors required=" + requiredExecutors +
- " , spark.executor.instances=" + confExecutors +
- ", no.of.nodes where data present=" + nodeBlockMapping.size())
- var nodes = DistributionUtil.getNodeList(sparkContext)
- var maxTimes = 30
- while (nodes.length < requiredExecutors && maxTimes > 0) {
- Thread.sleep(500)
- nodes = DistributionUtil.getNodeList(sparkContext)
- maxTimes = maxTimes - 1
- }
- logInfo("Time taken to wait for executor allocation is =" + ((30 - maxTimes) * 500) + "millis")
- defaultParallelism = sparkContext.defaultParallelism
- var i = 0
-
- val nodeTaskBlocksMap = new util.HashMap[String, util.List[NodeInfo]]()
-
- // Create Spark Partition for each task and assign blocks
- nodeBlockMapping.asScala.foreach { case (nodeName, blockList) =>
- val taskBlockList = new util.ArrayList[NodeInfo](0)
- nodeTaskBlocksMap.put(nodeName, taskBlockList)
- var blockletCount = 0
- blockList.asScala.foreach { taskInfo =>
- val blocksPerNode = taskInfo.asInstanceOf[CarbonInputSplit]
- blockletCount = blockletCount + blocksPerNode.getNumberOfBlocklets
- taskBlockList.add(
- NodeInfo(blocksPerNode.taskId, blocksPerNode.getNumberOfBlocklets))
- }
- if (blockletCount != 0) {
- val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier,
- carbonInputSplits.asJava, nodeName)
- result.add(new CarbonSparkPartition(id, i, multiBlockSplit))
- i += 1
- }
- }
-
- // print the node info along with task and number of blocks for the task.
-
- nodeTaskBlocksMap.asScala.foreach((entry: (String, List[NodeInfo])) => {
- logInfo(s"for the node ${ entry._1 }")
- for (elem <- entry._2.asScala) {
- logInfo("Task ID is " + elem.TaskId + "no. of blocks is " + elem.noOfBlocks)
- }
- })
-
- val noOfNodes = nodes.length
- val noOfTasks = result.size
- logInfo(s"Identified no.of.Blocks: $noOfBlocks," +
- s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks")
- logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime))
- for (j <- 0 until result.size ) {
- val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value
- val splitList = multiBlockSplit.getAllSplits
- logInfo(s"Node: ${multiBlockSplit.getLocations.mkString(",")}, No.Of Blocks: " +
- s"${CarbonInputSplit.createBlocks(splitList).size}")
- }
- result.toArray(new Array[Partition](result.size))
- }
-
-}
-
-class CarbonLoadPartition(rddId: Int, val idx: Int, @transient val tableSplit: TableSplit)
- extends Partition {
-
- override val index: Int = idx
- val serializableHadoopSplit = new SerializableWritable[TableSplit](tableSplit)
-
- override def hashCode(): Int = 41 * (41 + rddId) + idx
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index e8d7399..cae99d1 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -47,7 +47,7 @@ class CarbonSparkPartition(
val rddId: Int,
val idx: Int,
@transient val multiBlockSplit: CarbonMultiBlockSplit)
- extends Partition {
+ extends Partition {
val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)
@@ -67,9 +67,9 @@ class CarbonScanRDD[V: ClassTag](
filterExpression: Expression,
identifier: AbsoluteTableIdentifier,
@transient carbonTable: CarbonTable)
- extends RDD[V](sc, Nil)
- with CarbonHadoopMapReduceUtil
- with Logging {
+ extends RDD[V](sc, Nil)
+ with CarbonHadoopMapReduceUtil
+ with Logging {
private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
private val jobTrackerId: String = {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
deleted file mode 100644
index 6c6076e..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.status.SegmentStatusManager
-import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
-
-/**
- * Compactor class which handled the compaction cases.
- */
-object Compactor {
-
- val logger = LogServiceFactory.getLogService(Compactor.getClass.getName)
-
- def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
-
- val storePath = compactionCallableModel.storePath
- val storeLocation = compactionCallableModel.storeLocation
- val carbonTable = compactionCallableModel.carbonTable
- val kettleHomePath = compactionCallableModel.kettleHomePath
- val cubeCreationTime = compactionCallableModel.cubeCreationTime
- val loadsToMerge = compactionCallableModel.loadsToMerge
- val sc = compactionCallableModel.sqlContext
- val carbonLoadModel = compactionCallableModel.carbonLoadModel
- val compactionType = compactionCallableModel.compactionType
-
- val startTime = System.nanoTime()
- val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
- var finalMergeStatus = false
- val schemaName: String = carbonLoadModel.getDatabaseName
- val factTableName = carbonLoadModel.getTableName
- val validSegments: Array[String] = CarbonDataMergerUtil
- .getValidSegments(loadsToMerge).split(',')
- val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime()
- val carbonMergerMapping = CarbonMergerMapping(storeLocation,
- storePath,
- carbonTable.getMetaDataFilepath,
- mergedLoadName,
- kettleHomePath,
- cubeCreationTime,
- schemaName,
- factTableName,
- validSegments,
- carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
- maxSegmentColCardinality = null,
- maxSegmentColumnSchemaList = null
- )
- carbonLoadModel.setStorePath(carbonMergerMapping.storePath)
- carbonLoadModel.setLoadMetadataDetails(
- SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
- var execInstance = "1"
- // in case of non dynamic executor allocation, number of executors are fixed.
- if (sc.sparkContext.getConf.contains("spark.executor.instances")) {
- execInstance = sc.sparkContext.getConf.get("spark.executor.instances")
- logger.info(s"spark.executor.instances property is set to = $execInstance")
- } // in case of dynamic executor allocation, taking the max executors of the dynamic allocation.
- else if (sc.sparkContext.getConf.contains("spark.dynamicAllocation.enabled")) {
- if (sc.sparkContext.getConf.get("spark.dynamicAllocation.enabled").trim
- .equalsIgnoreCase("true")) {
- execInstance = sc.sparkContext.getConf.get("spark.dynamicAllocation.maxExecutors")
- logger.info(s"spark.dynamicAllocation.maxExecutors property is set to = $execInstance")
- }
- }
-
- val mergeStatus = new CarbonMergerRDD(
- sc.sparkContext,
- new MergeResultImpl(),
- carbonLoadModel,
- carbonMergerMapping,
- execInstance
- ).collect
-
- if (mergeStatus.length == 0) {
- finalMergeStatus = false
- } else {
- finalMergeStatus = mergeStatus.forall(_._2)
- }
-
- if (finalMergeStatus) {
- val endTime = System.nanoTime()
- logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
- if (!CarbonDataMergerUtil
- .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
- mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType
- )) {
- logger.audit(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
- s"${ carbonLoadModel.getTableName }")
- logger.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
- s"${ carbonLoadModel.getTableName }")
- throw new Exception(s"Compaction failed to update metadata for table" +
- s" ${ carbonLoadModel.getDatabaseName }." +
- s"${ carbonLoadModel.getTableName }")
- } else {
- logger.audit(s"Compaction request completed for table " +
- s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
- logger.info("Compaction request completed for table ${ carbonLoadModel.getDatabaseName } " +
- s".${ carbonLoadModel.getTableName }")
- }
- } else {
- logger.audit("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
- s".${ carbonLoadModel.getTableName }"
- )
- logger.error("Compaction request failed for table ${ carbonLoadModel.getDatabaseName } " +
- s".${ carbonLoadModel.getTableName }")
- throw new Exception("Compaction Failure in Merger Rdd.")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 3a393ed..67d1ce0 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -51,7 +51,6 @@ class NewCarbonDataLoadRDD[K, V](
sc: SparkContext,
result: DataLoadResult[K, V],
carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
loadCount: Integer,
blocksGroupBy: Array[(String, Array[BlockDetails])],
isTableSplitPartition: Boolean)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
deleted file mode 100644
index e23b58d..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.tasks
-
-import java.io.IOException
-
-import scala.collection.mutable
-
-import org.apache.carbondata.common.factory.CarbonCommonFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.util.DataTypeUtil
-import org.apache.carbondata.core.writer.CarbonDictionaryWriter
-import org.apache.carbondata.spark.rdd.DictionaryLoadModel
-
-/**
- *
- * @param valuesBuffer
- * @param dictionary
- * @param model
- * @param columnIndex
- * @param writer
- */
-class DictionaryWriterTask(valuesBuffer: mutable.HashSet[String],
- dictionary: Dictionary,
- model: DictionaryLoadModel, columnIndex: Int,
- var writer: CarbonDictionaryWriter = null) {
-
- /**
- * execute the task
- *
- * @return distinctValueList and time taken to write
- */
- def execute(): java.util.List[String] = {
- val values = valuesBuffer.toArray
- java.util.Arrays.sort(values, Ordering[String])
- val dictService = CarbonCommonFactory.getDictionaryService
- writer = dictService.getDictionaryWriter(
- model.table,
- model.columnIdentifier(columnIndex),
- model.hdfsLocation)
- val distinctValues: java.util.List[String] = new java.util.ArrayList()
-
- try {
- if (!model.dictFileExists(columnIndex)) {
- writer.write(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
- distinctValues.add(CarbonCommonConstants.MEMBER_DEFAULT_VAL)
- }
-
- if (values.length >= 1) {
- if (model.dictFileExists(columnIndex)) {
- for (value <- values) {
- val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
- model.primDimensions(columnIndex))
- if (null != parsedValue && dictionary.getSurrogateKey(parsedValue) ==
- CarbonCommonConstants.INVALID_SURROGATE_KEY) {
- writer.write(parsedValue)
- distinctValues.add(parsedValue)
- }
- }
-
- } else {
- for (value <- values) {
- val parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value,
- model.primDimensions(columnIndex))
- if (null != parsedValue) {
- writer.write(parsedValue)
- distinctValues.add(parsedValue)
- }
- }
- }
- }
- } catch {
- case ex: IOException =>
- throw ex
- } finally {
- if (null != writer) {
- writer.close()
- }
- }
- distinctValues
- }
-
- /**
- * update dictionary metadata
- */
- def updateMetaData() {
- if (null != writer) {
- writer.commit()
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
deleted file mode 100644
index d552331..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/tasks/SortIndexWriterTask.scala
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.tasks
-
-import org.apache.carbondata.common.factory.CarbonCommonFactory
-import org.apache.carbondata.core.cache.dictionary.Dictionary
-import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
-import org.apache.carbondata.spark.rdd.DictionaryLoadModel
-
-/**
- * This task writes sort index file
- *
- * @param model
- * @param index
- * @param dictionary
- * @param distinctValues
- * @param carbonDictionarySortIndexWriter
- */
-class SortIndexWriterTask(model: DictionaryLoadModel,
- index: Int,
- dictionary: Dictionary,
- distinctValues: java.util.List[String],
- var carbonDictionarySortIndexWriter: CarbonDictionarySortIndexWriter = null) {
- def execute() {
- try {
- if (distinctValues.size() > 0) {
- val preparator: CarbonDictionarySortInfoPreparator = new CarbonDictionarySortInfoPreparator
- val dictService = CarbonCommonFactory.getDictionaryService
- val dictionarySortInfo: CarbonDictionarySortInfo =
- preparator.getDictionarySortInfo(distinctValues, dictionary,
- model.primDimensions(index).getDataType)
- carbonDictionarySortIndexWriter =
- dictService.getDictionarySortIndexWriter(model.table, model.columnIdentifier(index),
- model.hdfsLocation)
- carbonDictionarySortIndexWriter.writeSortIndex(dictionarySortInfo.getSortIndex)
- carbonDictionarySortIndexWriter
- .writeInvertedSortIndex(dictionarySortInfo.getSortIndexInverted)
- }
- } finally {
- if (null != carbonDictionarySortIndexWriter) {
- carbonDictionarySortIndexWriter.close()
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
deleted file mode 100644
index d91a012..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import java.io.File
-import java.text.SimpleDateFormat
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.hive.{CarbonMetaData, DictionaryMap}
-import org.apache.spark.sql.types._
-
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonDataType}
-import org.apache.carbondata.core.carbon.metadata.encoder.Encoding
-import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastorage.store.impl.FileFactory
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-
-object CarbonScalaUtil {
- def convertSparkToCarbonDataType(
- dataType: org.apache.spark.sql.types.DataType): CarbonDataType = {
- dataType match {
- case StringType => CarbonDataType.STRING
- case ShortType => CarbonDataType.SHORT
- case IntegerType => CarbonDataType.INT
- case LongType => CarbonDataType.LONG
- case DoubleType => CarbonDataType.DOUBLE
- case FloatType => CarbonDataType.FLOAT
- case DateType => CarbonDataType.DATE
- case BooleanType => CarbonDataType.BOOLEAN
- case TimestampType => CarbonDataType.TIMESTAMP
- case ArrayType(_, _) => CarbonDataType.ARRAY
- case StructType(_) => CarbonDataType.STRUCT
- case NullType => CarbonDataType.NULL
- case _ => CarbonDataType.DECIMAL
- }
- }
-
- def convertSparkToCarbonSchemaDataType(dataType: String): String = {
- dataType match {
- case CarbonCommonConstants.STRING_TYPE => CarbonCommonConstants.STRING
- case CarbonCommonConstants.INTEGER_TYPE => CarbonCommonConstants.INTEGER
- case CarbonCommonConstants.BYTE_TYPE => CarbonCommonConstants.INTEGER
- case CarbonCommonConstants.SHORT_TYPE => CarbonCommonConstants.SHORT
- case CarbonCommonConstants.LONG_TYPE => CarbonCommonConstants.NUMERIC
- case CarbonCommonConstants.DOUBLE_TYPE => CarbonCommonConstants.NUMERIC
- case CarbonCommonConstants.FLOAT_TYPE => CarbonCommonConstants.NUMERIC
- case CarbonCommonConstants.DECIMAL_TYPE => CarbonCommonConstants.NUMERIC
- case CarbonCommonConstants.DATE_TYPE => CarbonCommonConstants.STRING
- case CarbonCommonConstants.BOOLEAN_TYPE => CarbonCommonConstants.STRING
- case CarbonCommonConstants.TIMESTAMP_TYPE => CarbonCommonConstants.TIMESTAMP
- case anyType => anyType
- }
- }
-
- def convertCarbonToSparkDataType(dataType: CarbonDataType): types.DataType = {
- dataType match {
- case CarbonDataType.STRING => StringType
- case CarbonDataType.SHORT => ShortType
- case CarbonDataType.INT => IntegerType
- case CarbonDataType.LONG => LongType
- case CarbonDataType.DOUBLE => DoubleType
- case CarbonDataType.BOOLEAN => BooleanType
- case CarbonDataType.DECIMAL => DecimalType.SYSTEM_DEFAULT
- case CarbonDataType.TIMESTAMP => TimestampType
- }
- }
-
- def updateDataType(
- currentDataType: org.apache.spark.sql.types.DataType): org.apache.spark.sql.types.DataType = {
- currentDataType match {
- case decimal: DecimalType =>
- val scale = currentDataType.asInstanceOf[DecimalType].scale
- DecimalType(DecimalType.MAX_PRECISION, scale)
- case _ =>
- currentDataType
- }
- }
-
- case class TransformHolder(rdd: Any, mataData: CarbonMetaData)
-
- object CarbonSparkUtil {
-
- def createSparkMeta(carbonTable: CarbonTable): CarbonMetaData = {
- val dimensionsAttr = carbonTable.getDimensionByTableName(carbonTable.getFactTableName)
- .asScala.map(x => x.getColName) // wf : may be problem
- val measureAttr = carbonTable.getMeasureByTableName(carbonTable.getFactTableName)
- .asScala.map(x => x.getColName)
- val dictionary =
- carbonTable.getDimensionByTableName(carbonTable.getFactTableName).asScala.map { f =>
- (f.getColName.toLowerCase,
- f.hasEncoding(Encoding.DICTIONARY) && !f.hasEncoding(Encoding.DIRECT_DICTIONARY) &&
- !CarbonUtil.hasComplexDataType(f.getDataType))
- }
- CarbonMetaData(dimensionsAttr, measureAttr, carbonTable, DictionaryMap(dictionary.toMap))
- }
- }
-
- def getKettleHome(sqlContext: SQLContext): String = {
- var kettleHomePath = sqlContext.getConf("carbon.kettle.home", null)
- if (null == kettleHomePath) {
- kettleHomePath = CarbonProperties.getInstance.getProperty("carbon.kettle.home")
- }
- if (null == kettleHomePath) {
- val carbonHome = System.getenv("CARBON_HOME")
- if (null != carbonHome) {
- kettleHomePath = carbonHome + "/processing/carbonplugins"
- }
- }
- if (kettleHomePath != null) {
- val sparkMaster = sqlContext.sparkContext.getConf.get("spark.master").toLowerCase()
- // get spark master, if local, need to correct the kettle home
- // e.g: --master local, the executor running in local machine
- if (sparkMaster.startsWith("local")) {
- val kettleHomeFileType = FileFactory.getFileType(kettleHomePath)
- val kettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, kettleHomeFileType)
- // check if carbon.kettle.home path is exists
- if (!kettleHomeFile.exists()) {
- // get the path of this class
- // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon-
- // xxx.jar!/org/carbondata/spark/rdd/
- var jarFilePath = this.getClass.getResource("").getPath
- val endIndex = jarFilePath.indexOf(".jar!") + 4
- // get the jar file path
- // e.g: file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/carbon-*.jar
- jarFilePath = jarFilePath.substring(0, endIndex)
- val jarFileType = FileFactory.getFileType(jarFilePath)
- val jarFile = FileFactory.getCarbonFile(jarFilePath, jarFileType)
- // get the parent folder of the jar file
- // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib
- val carbonLibPath = jarFile.getParentFile.getPath
- // find the kettle home under the previous folder
- // e.g:file:/srv/bigdata/install/spark/sparkJdbc/carbonlib/cabonplugins
- kettleHomePath = carbonLibPath + File.separator + CarbonCommonConstants.KETTLE_HOME_NAME
- val logger = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- logger.error(s"carbon.kettle.home path is not exists, reset it as $kettleHomePath")
- val newKettleHomeFileType = FileFactory.getFileType(kettleHomePath)
- val newKettleHomeFile = FileFactory.getCarbonFile(kettleHomePath, newKettleHomeFileType)
- // check if the found kettle home exists
- if (!newKettleHomeFile.exists()) {
- sys.error("Kettle home not found. Failed to reset carbon.kettle.home")
- }
- }
- }
- } else {
- sys.error("carbon.kettle.home is not set")
- }
- kettleHomePath
- }
-
- def getString(value: Any,
- serializationNullFormat: String,
- delimiterLevel1: String,
- delimiterLevel2: String,
- format: SimpleDateFormat,
- level: Int = 1): String = {
- if (value == null) {
- serializationNullFormat
- } else {
- value match {
- case s: String => s
- case d: java.math.BigDecimal => d.toPlainString
- case i: java.lang.Integer => i.toString
- case d: java.lang.Double => d.toString
- case t: java.sql.Timestamp => format format t
- case d: java.sql.Date => format format d
- case b: java.lang.Boolean => b.toString
- case s: java.lang.Short => s.toString
- case f: java.lang.Float => f.toString
- case bs: Array[Byte] => new String(bs)
- case s: scala.collection.Seq[Any] =>
- val delimiter = if (level == 1) {
- delimiterLevel1
- } else {
- delimiterLevel2
- }
- val builder = new StringBuilder()
- s.foreach { x =>
- builder.append(getString(x, serializationNullFormat, delimiterLevel1,
- delimiterLevel2, format, level + 1)).append(delimiter)
- }
- builder.substring(0, builder.length - 1)
- case m: scala.collection.Map[Any, Any] =>
- throw new Exception("Unsupported data type: Map")
- case r: org.apache.spark.sql.Row =>
- val delimiter = if (level == 1) {
- delimiterLevel1
- } else {
- delimiterLevel2
- }
- val builder = new StringBuilder()
- for (i <- 0 until r.length) {
- builder.append(getString(r(i), serializationNullFormat, delimiterLevel1,
- delimiterLevel2, format, level + 1)).append(delimiter)
- }
- builder.substring(0, builder.length - 1)
- case other => other.toString
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
deleted file mode 100644
index d5051cf..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.util
-
-import java.util
-import java.util.UUID
-
-import scala.collection.mutable.Map
-
-import org.apache.spark.sql.execution.command.{ColumnProperty, Field}
-
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
-
-object CommonUtil {
- def validateColumnGroup(colGroup: String, noDictionaryDims: Seq[String],
- msrs: Seq[Field], retrievedColGrps: Seq[String], dims: Seq[Field]) {
- val colGrpCols = colGroup.split(',').map(_.trim)
- colGrpCols.foreach { x =>
- // if column is no dictionary
- if (noDictionaryDims.contains(x)) {
- throw new MalformedCarbonCommandException(
- "Column group is not supported for no dictionary columns:" + x)
- } else if (msrs.exists(msr => msr.column.equals(x))) {
- // if column is measure
- throw new MalformedCarbonCommandException("Column group is not supported for measures:" + x)
- } else if (foundIndExistingColGrp(x)) {
- throw new MalformedCarbonCommandException("Column is available in other column group:" + x)
- } else if (isComplex(x, dims)) {
- throw new MalformedCarbonCommandException(
- "Column group doesn't support Complex column:" + x)
- } else if (isTimeStampColumn(x, dims)) {
- throw new MalformedCarbonCommandException(
- "Column group doesn't support Timestamp datatype:" + x)
- }// if invalid column is
- else if (!dims.exists(dim => dim.column.equalsIgnoreCase(x))) {
- // present
- throw new MalformedCarbonCommandException(
- "column in column group is not a valid column: " + x
- )
- }
- }
- // check if given column is present in other groups
- def foundIndExistingColGrp(colName: String): Boolean = {
- retrievedColGrps.foreach { colGrp =>
- if (colGrp.split(",").contains(colName)) {
- return true
- }
- }
- false
- }
-
- }
-
-
- def isTimeStampColumn(colName: String, dims: Seq[Field]): Boolean = {
- dims.foreach { dim =>
- if (dim.column.equalsIgnoreCase(colName)) {
- if (dim.dataType.isDefined && null != dim.dataType.get &&
- "timestamp".equalsIgnoreCase(dim.dataType.get)) {
- return true
- }
- }
- }
- false
- }
-
- def isComplex(colName: String, dims: Seq[Field]): Boolean = {
- dims.foreach { x =>
- if (x.children.isDefined && null != x.children.get && x.children.get.nonEmpty) {
- val children = x.children.get
- if (x.column.equals(colName)) {
- return true
- } else {
- children.foreach { child =>
- val fieldName = x.column + "." + child.column
- if (fieldName.equalsIgnoreCase(colName)) {
- return true
- }
- }
- }
- }
- }
- false
- }
-
- def getColumnProperties(column: String,
- tableProperties: Map[String, String]): Option[util.List[ColumnProperty]] = {
- val fieldProps = new util.ArrayList[ColumnProperty]()
- val columnPropertiesStartKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "."
- tableProperties.foreach {
- case (key, value) =>
- if (key.startsWith(columnPropertiesStartKey)) {
- fieldProps.add(ColumnProperty(key.substring(columnPropertiesStartKey.length(),
- key.length()), value))
- }
- }
- if (fieldProps.isEmpty) {
- None
- } else {
- Some(fieldProps)
- }
- }
-
- def validateTblProperties(tableProperties: Map[String, String], fields: Seq[Field]): Boolean = {
- val itr = tableProperties.keys
- var isValid: Boolean = true
- tableProperties.foreach {
- case (key, value) =>
- if (!validateFields(key, fields)) {
- isValid = false
- throw new MalformedCarbonCommandException(s"Invalid table properties ${ key }")
- }
- }
- isValid
- }
-
- def validateFields(key: String, fields: Seq[Field]): Boolean = {
- var isValid: Boolean = false
- fields.foreach { field =>
- if (field.children.isDefined && field.children.get != null) {
- field.children.foreach(fields => {
- fields.foreach(complexfield => {
- val column = if ("val" == complexfield.column) {
- field.column
- } else {
- field.column + "." + complexfield.column
- }
- if (validateColumnProperty(key, column)) {
- isValid = true
- }
- }
- )
- }
- )
- } else {
- if (validateColumnProperty(key, field.column)) {
- isValid = true
- }
- }
-
- }
- isValid
- }
-
- def validateColumnProperty(key: String, column: String): Boolean = {
- if (!key.startsWith(CarbonCommonConstants.COLUMN_PROPERTIES)) {
- return true
- }
- val columnPropertyKey = CarbonCommonConstants.COLUMN_PROPERTIES + "." + column + "."
- if (key.startsWith(columnPropertyKey)) {
- true
- } else {
- false
- }
- }
-
- /**
- * @param colGrps
- * @param dims
- * @return columns of column groups in schema order
- */
- def arrangeColGrpsInSchemaOrder(colGrps: Seq[String], dims: Seq[Field]): Seq[String] = {
- def sortByIndex(colGrp1: String, colGrp2: String) = {
- val firstCol1 = colGrp1.split(",")(0)
- val firstCol2 = colGrp2.split(",")(0)
- val dimIndex1: Int = getDimIndex(firstCol1, dims)
- val dimIndex2: Int = getDimIndex(firstCol2, dims)
- dimIndex1 < dimIndex2
- }
- val sortedColGroups: Seq[String] = colGrps.sortWith(sortByIndex)
- sortedColGroups
- }
-
- /**
- * @param colName
- * @param dims
- * @return return index for given column in dims
- */
- def getDimIndex(colName: String, dims: Seq[Field]): Int = {
- var index: Int = -1
- dims.zipWithIndex.foreach { h =>
- if (h._1.column.equalsIgnoreCase(colName)) {
- index = h._2.toInt
- }
- }
- index
- }
-
- /**
- * This method will validate the table block size specified by the user
- *
- * @param tableProperties
- */
- def validateTableBlockSize(tableProperties: Map[String, String]): Unit = {
- var tableBlockSize: Integer = 0
- if (tableProperties.get(CarbonCommonConstants.TABLE_BLOCKSIZE).isDefined) {
- val blockSizeStr: String =
- parsePropertyValueStringInMB(tableProperties(CarbonCommonConstants.TABLE_BLOCKSIZE))
- try {
- tableBlockSize = Integer.parseInt(blockSizeStr)
- } catch {
- case e: NumberFormatException =>
- throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " +
- s"$blockSizeStr, only int value from 1 MB to " +
- s"2048 MB is supported.")
- }
- if (tableBlockSize < CarbonCommonConstants.BLOCK_SIZE_MIN_VAL ||
- tableBlockSize > CarbonCommonConstants.BLOCK_SIZE_MAX_VAL) {
- throw new MalformedCarbonCommandException("Invalid table_blocksize value found: " +
- s"$blockSizeStr, only int value from 1 MB to " +
- s"2048 MB is supported.")
- }
- tableProperties.put(CarbonCommonConstants.TABLE_BLOCKSIZE, blockSizeStr)
- }
- }
-
- /**
- * This method will parse the configure string from 'XX MB/M' to 'XX'
- *
- * @param propertyValueString
- */
- def parsePropertyValueStringInMB(propertyValueString: String): String = {
- var parsedPropertyValueString: String = propertyValueString
- if (propertyValueString.trim.toLowerCase.endsWith("mb")) {
- parsedPropertyValueString = propertyValueString.trim.toLowerCase
- .substring(0, propertyValueString.trim.toLowerCase.lastIndexOf("mb")).trim
- }
- if (propertyValueString.trim.toLowerCase.endsWith("m")) {
- parsedPropertyValueString = propertyValueString.trim.toLowerCase
- .substring(0, propertyValueString.trim.toLowerCase.lastIndexOf("m")).trim
- }
- parsedPropertyValueString
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
deleted file mode 100644
index aa8fcd5..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.util
-
-import org.apache.carbondata.core.carbon.metadata.datatype.DataType
-
-object DataTypeConverterUtil {
- def convertToCarbonType(dataType: String): DataType = {
- dataType.toLowerCase match {
- case "string" => DataType.STRING
- case "int" => DataType.INT
- case "integer" => DataType.INT
- case "tinyint" => DataType.SHORT
- case "short" => DataType.SHORT
- case "long" => DataType.LONG
- case "bigint" => DataType.LONG
- case "numeric" => DataType.DOUBLE
- case "double" => DataType.DOUBLE
- case "decimal" => DataType.DECIMAL
- case "timestamp" => DataType.TIMESTAMP
- case "array" => DataType.ARRAY
- case "struct" => DataType.STRUCT
- case _ => sys.error(s"Unsupported data type: $dataType")
- }
- }
-
- def convertToString(dataType: DataType): String = {
- dataType match {
- case DataType.STRING => "string"
- case DataType.SHORT => "smallint"
- case DataType.INT => "int"
- case DataType.LONG => "bigint"
- case DataType.DOUBLE => "double"
- case DataType.DECIMAL => "decimal"
- case DataType.TIMESTAMP => "timestamp"
- case DataType.ARRAY => "array"
- case DataType.STRUCT => "struct"
- }
- }
-}