You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:42:29 UTC
[42/50] [abbrv] incubator-carbondata git commit: Merge
remote-tracking branch 'carbon_master/master' into apache/master
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index bbc9f25,9f534c1..4d39eb2
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@@ -52,16 -51,14 +52,18 @@@ class CarbonSourc
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
- parameters.get("path") match {
- case Some(path) => CarbonDatasourceHadoopRelation(sqlContext, Array(path), parameters)
- case _ =>
- val options = new CarbonOption(parameters)
- val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
- val ident = tableIdentifier match {
- case Seq(name) => TableIdentifier(name, None)
- case Seq(db, name) => TableIdentifier(name, Some(db))
- }
- CarbonDatasourceRelation(ident, None)(sqlContext)
+ if (parameters.get("tablePath") != None) {
+ val options = new CarbonOption(parameters)
+ val tableIdentifier = options.tableIdentifier.split("""\.""").toSeq
- CarbonDatasourceRelation(tableIdentifier, None)(sqlContext)
++ val ident = tableIdentifier match {
++ case Seq(name) => TableIdentifier(name, None)
++ case Seq(db, name) => TableIdentifier(name, Some(db))
++ }
++ CarbonDatasourceRelation(ident, None)(sqlContext)
+ } else if (parameters.get("path") != None) {
+ CarbonDatasourceHadoopRelation(sqlContext, Array(parameters.get("path").get), parameters)
+ } else {
+ sys.error("Carbon table path not found")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 05149eb,ce43c4f..8a9f1c9
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@@ -29,11 -29,10 +29,11 @@@ import org.apache.spark.unsafe.types.UT
import org.carbondata.core.cache.{Cache, CacheProvider, CacheType}
import org.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
- import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+ import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnIdentifier}
import org.carbondata.core.carbon.metadata.datatype.DataType
import org.carbondata.core.carbon.metadata.encoder.Encoding
-import org.carbondata.query.carbon.util.DataTypeUtil
+import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
+import org.carbondata.scan.util.DataTypeUtil
/**
* It decodes the data.
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
index 4209c5a,b340884..406b025
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala
@@@ -1325,13 -1186,12 +1186,13 @@@ class CarbonSqlParser(
}
protected lazy val segmentId: Parser[String] =
- ( numericLit ^^ { u => u } |
- elem("decimal", _.isInstanceOf[lexical.FloatLit]) ^^ (_.chars)
- )
+ numericLit ^^ { u => u } |
+ elem("decimal", p => {
+ p.getClass.getSimpleName.equals("FloatLit") ||
+ p.getClass.getSimpleName.equals("DecimalLit") } ) ^^ (_.chars)
protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
- DELETE ~> (LOAD|SEGMENT) ~> repsep(segmentId, ",") ~ (FROM ~> (CUBE | TABLE) ~>
+ DELETE ~> (LOAD|SEGMENT) ~> repsep(segmentId, ",") ~ (FROM ~> TABLE ~>
(ident <~ ".").? ~ ident) <~
opt(";") ^^ {
case loadids ~ cube => cube match {
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index cb94f72,75abe0e..b8afcdf
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@@ -29,8 -29,10 +29,11 @@@ import scala.util.Rando
import org.apache.spark.SparkEnv
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Cast, Literal}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils
+ import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.TimestampType
@@@ -1244,7 -1269,8 +1266,8 @@@ private[sql] case class CreateCube(cm:
try {
sqlContext.sql(
s"""CREATE TABLE $dbName.$tbName USING org.apache.spark.sql.CarbonSource""" +
- s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$cubePath") """).collect
- s""" OPTIONS (cubename "$dbName.$tbName", tablePath "$tablePath") """)
++ s""" OPTIONS (tableName "$dbName.$tbName", tablePath "$tablePath") """)
+ .collect
} catch {
case e: Exception =>
@@@ -1290,21 -1317,23 +1314,21 @@@ private[sql] case class DeleteLoadsById
// validate load ids first
validateLoadIds
+ val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+ val identifier = TableIdentifier(tableName, Option(dbName))
val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
- Option(schemaName),
- tableName,
- None)(sqlContext).asInstanceOf[CarbonRelation]
+ identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
if (relation == null) {
- LOGGER.audit(s"The delete load by Id is failed. Table $dbName.$tableName does not exist")
- LOGGER.audit(s"Delete load by Id is failed. Table $schemaName.$tableName does not exist")
- sys.error(s"Table $schemaName.$tableName does not exist")
++ LOGGER.audit(s"Delete load by Id is failed. Table $dbName.$tableName does not exist")
+ sys.error(s"Table $dbName.$tableName does not exist")
}
- val carbonTable = CarbonMetadata.getInstance().getCarbonTable(schemaName + '_' + tableName)
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
if (null == carbonTable) {
- CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
- Option(schemaName),
- tableName,
- None)(sqlContext).asInstanceOf[CarbonRelation]
+ CarbonEnv.getInstance(sqlContext).carbonCatalog
+ .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
}
val path = carbonTable.getMetaDataFilepath
@@@ -1356,20 -1375,29 +1370,26 @@@ private[sql] case class DeleteLoadsByLo
def run(sqlContext: SQLContext): Seq[Row] = {
- val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
- LOGGER.audit(s"Delete load by load date request has been received for $schemaName.$tableName")
-
- val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog.lookupRelation1(
- Option(schemaName),
- tableName,
- None
- )(sqlContext).asInstanceOf[CarbonRelation]
+ LOGGER.audit("The delete load by load date request has been received.")
+ val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
+ val identifier = TableIdentifier(tableName, Option(dbName))
+ val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
+ .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation]
if (relation == null) {
LOGGER
- .audit(s"The delete load by load date is failed. Table $dbName.$tableName does not " +
- .audit(s"Delete load by load date is failed. Table $schemaName.$tableName does not " +
++ .audit(s"Delete load by load date is failed. Table $dbName.$tableName does not " +
s"exist")
- sys.error(s"Table $schemaName.$tableName does not exist")
+ sys.error(s"Table $dbName.$tableName does not exist")
}
+ val timeObj = Cast(Literal(loadDate), TimestampType).eval()
+ if(null == timeObj) {
+ val errorMessage = "Error: Invalid load start time format " + loadDate
+ throw new MalformedCarbonCommandException(errorMessage)
+ }
+
var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata.getInstance()
- .getCarbonTable(schemaName + '_' + tableName)
+ .getCarbonTable(dbName + '_' + tableName)
var segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier)
if (null == carbonTable) {
@@@ -1378,9 -1409,14 +1398,14 @@@
}
var path = carbonTable.getMetaDataFilepath()
-
- var invalidLoadTimestamps = segmentStatusManager.updateDeletionStatus(loadDate, path).asScala
- LOGGER.audit("The delete load by Id is successfull.")
+ var invalidLoadTimestamps = segmentStatusManager
+ .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala
+ if(invalidLoadTimestamps.isEmpty) {
- LOGGER.audit(s"Delete load by load date is successfull for $schemaName.$tableName.")
++ LOGGER.audit(s"Delete load by load date is successfull for $dbName.$tableName.")
+ }
+ else {
+ sys.error("Delete load by load date is failed. No matching load found.")
+ }
Seq.empty
}
@@@ -1742,9 -1818,7 +1768,7 @@@ private[sql] case class DropTableComman
if (carbonLock.lockWithRetries()) {
logInfo("Successfully able to get the table metadata file lock")
} else {
- LOGGER.audit(
- s"Dropping table with Database name [$dbName] and Table name [$tableName] " +
- s"failed as the Table is locked")
- LOGGER.audit(s"Dropping table $schemaName.$cubeName failed as the Table is locked")
++ LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table is locked")
sys.error("Table is locked for updation. Please try after some time")
}
@@@ -1877,7 -1976,9 +1901,9 @@@ private[sql] case class DescribeCommand
override def run(sqlContext: SQLContext): Seq[Row] = {
val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
- .lookupRelation2(tblIdentifier, None)(sqlContext).asInstanceOf[CarbonRelation]
+ .lookupRelation1(tblIdentifier)(sqlContext).asInstanceOf[CarbonRelation]
+ val mapper = new ObjectMapper()
+ val colProps = StringBuilder.newBuilder
var results: Seq[(String, String, String)] = child.schema.fields.map { field =>
val comment = if (relation.metaData.dims.contains(field.name)) {
val dimension = relation.metaData.carbonTable.getDimensionByName(
@@@ -1966,17 -2085,16 +2010,17 @@@ private[sql] case class DeleteLoadByDat
def run(sqlContext: SQLContext): Seq[Row] = {
- LOGGER.audit("The delete load by date request has been received.")
- val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
- LOGGER.audit(s"The delete load by date request has been received for $schemaName.$cubeName")
+ val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
++ LOGGER.audit(s"The delete load by date request has been received for $dbName.$cubeName")
+ val identifier = TableIdentifier(cubeName, Option(dbName))
val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
- .lookupRelation1(Some(schemaName), cubeName, None)(sqlContext).asInstanceOf[CarbonRelation]
+ .lookupRelation1(identifier)(sqlContext).asInstanceOf[CarbonRelation]
var level: String = ""
var carbonTable = org.carbondata.core.carbon.metadata.CarbonMetadata
- .getInstance().getCarbonTable(schemaName + '_' + cubeName)
+ .getInstance().getCarbonTable(dbName + '_' + cubeName)
if (relation == null) {
- LOGGER.audit(s"The delete load by date is failed. Table $schemaName.$cubeName does not exist")
- sys.error(s"Table $schemaName.$cubeName does not exist")
+ LOGGER.audit(s"The delete load by date is failed. Table $dbName.$cubeName does not exist")
+ sys.error(s"Table $dbName.$cubeName does not exist")
}
val matches: Seq[AttributeReference] = relation.dimensionsAttr.filter(
@@@ -1986,8 -2104,8 +2030,8 @@@
if (matches.isEmpty) {
LOGGER.audit(
"The delete load by date is failed. " +
- "Table $dbName.$cubeName does not contain date field " + dateField)
- s"Table $schemaName.$cubeName does not contain date field :" + dateField)
- sys.error(s"Table $schemaName.$cubeName does not contain date field " + dateField)
++ "Table $dbName.$cubeName does not contain date field :" + dateField)
+ sys.error(s"Table $dbName.$cubeName does not contain date field " + dateField)
}
else {
level = matches.asJava.get(0).name
@@@ -2007,7 -2125,7 +2051,7 @@@
actualColName,
dateValue,
relation.cubeMeta.partitioner)
- LOGGER.audit("The delete load by date is successfull.")
- LOGGER.audit(s"The delete load by date $dateValue is successful for $schemaName.$cubeName.")
++ LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$cubeName.")
Seq.empty
}
}
@@@ -2020,15 -2138,14 +2064,15 @@@ private[sql] case class CleanFiles
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
def run(sqlContext: SQLContext): Seq[Row] = {
- LOGGER.audit("The clean files request has been received.")
- val schemaName = getDB.getDatabaseName(schemaNameOp, sqlContext)
- LOGGER.audit(s"Clean files request has been received for $schemaName.$cubeName")
+ val dbName = getDB.getDatabaseName(schemaNameOp, sqlContext)
++ LOGGER.audit(s"The clean files request has been received for $dbName.$cubeName")
+ val identifier = TableIdentifier(cubeName, Option(dbName))
val relation = CarbonEnv.getInstance(sqlContext).carbonCatalog
- .lookupRelation1(Some(schemaName), cubeName, None)(sqlContext).
+ .lookupRelation1(identifier)(sqlContext).
asInstanceOf[CarbonRelation]
if (relation == null) {
- LOGGER.audit(s"Clean files request is failed. Table $schemaName.$cubeName does not exist")
- sys.error(s"Table $schemaName.$cubeName does not exist")
+ LOGGER.audit(s"The clean files request is failed. Table $dbName.$cubeName does not exist")
+ sys.error(s"Table $dbName.$cubeName does not exist")
}
val carbonLoadModel = new CarbonLoadModel()
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
index d87e132,f88b74e..aeae761
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastoreCatalog.scala
@@@ -113,14 -112,30 +113,6 @@@ class CarbonMetastoreCatalog(hive: Hive
val metadata = loadMetadata(storePath)
- lazy val useUniquePath = if ("true".equalsIgnoreCase(CarbonProperties.getInstance().
- getProperty(
- CarbonCommonConstants.CARBON_UNIFIED_STORE_PATH,
- CarbonCommonConstants.CARBON_UNIFIED_STORE_PATH_DEFAULT))) {
- true
- } else {
- false
- def lookupRelation1(
- databaseName: Option[String],
- tableName: String,
- alias: Option[String] = None)(sqlContext: SQLContext): LogicalPlan = {
- val db = databaseName match {
- case Some(name) => name
- case _ => null
- }
- if (db == null) {
- lookupRelation2(Seq(tableName), alias)(sqlContext)
- } else {
- lookupRelation2(Seq(db, tableName), alias)(sqlContext)
- }
- }
-
- override def lookupRelation(tableIdentifier: Seq[String],
- alias: Option[String] = None): LogicalPlan = {
- try {
- super.lookupRelation(tableIdentifier, alias)
- } catch {
- case s: java.lang.Exception =>
- lookupRelation2(tableIdentifier, alias)(hive.asInstanceOf[SQLContext])
- }
-- }
def getCubeCreationTime(schemaName: String, cubeName: String): Long = {
val cubeMeta = metadata.cubesMeta.filter(
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonStrategies.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
index 0de2d1a,0000000..e7f131d
mode 100644,000000..100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonScanRDD.scala
@@@ -1,218 -1,0 +1,221 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.carbondata.spark.rdd
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.reflect.ClassTag
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
+import org.apache.spark.rdd.RDD
+
+import org.carbondata.common.CarbonIterator
+import org.carbondata.common.logging.LogServiceFactory
+import org.carbondata.core.carbon.datastore.block.TableBlockInfo
+import org.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit}
+import org.carbondata.scan.executor.QueryExecutorFactory
+import org.carbondata.scan.expression.Expression
+import org.carbondata.scan.model.QueryModel
+import org.carbondata.scan.result.BatchResult
+import org.carbondata.scan.result.iterator.ChunkRowIterator
+import org.carbondata.spark.RawValue
+import org.carbondata.spark.load.CarbonLoaderUtil
+import org.carbondata.spark.util.QueryPlanUtil
+
+class CarbonSparkPartition(rddId: Int, val idx: Int,
+ val locations: Array[String],
+ val tableBlockInfos: util.List[TableBlockInfo])
+ extends Partition {
+
+ override val index: Int = idx
+
+ // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations)
+ override def hashCode(): Int = {
+ 41 * (41 + rddId) + idx
+ }
+}
+
+ /**
+ * This RDD is used to perform query on CarbonData file. Before sending tasks to scan
+ * CarbonData file, this RDD will leverage CarbonData's index information to do CarbonData file
+ * level filtering in driver side.
+ */
+class CarbonScanRDD[V: ClassTag](
+ sc: SparkContext,
+ queryModel: QueryModel,
+ filterExpression: Expression,
+ keyClass: RawValue[V],
+ @transient conf: Configuration,
+ cubeCreationTime: Long,
+ schemaLastUpdatedTime: Long,
+ baseStoreLocation: String)
+ extends RDD[V](sc, Nil) with Logging {
+
+ val defaultParallelism = sc.defaultParallelism
+
+ override def getPartitions: Array[Partition] = {
+ val startTime = System.currentTimeMillis()
+ val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) =
+ QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier)
+
+ val result = new util.ArrayList[Partition](defaultParallelism)
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ // set filter resolver tree
+ try {
- val filterResolver = carbonInputFormat
- .getResolvedFilter(job.getConfiguration, filterExpression)
-
- CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
- queryModel.setFilterExpressionResolverTree(filterResolver)
++ // before applying filter check whether segments are available in the table.
++ val splits = carbonInputFormat.getSplits(job)
++ if (!splits.isEmpty) {
++ var filterResolver = carbonInputFormat
++ .getResolvedFilter(job.getConfiguration, filterExpression)
++ CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver)
++ queryModel.setFilterExpressionResolverTree(filterResolver)
++ }
+ }
+ catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ sys.error("Exception occurred in query execution :: " + e.getMessage)
+ }
+ // get splits
+ val splits = carbonInputFormat.getSplits(job)
+ if (!splits.isEmpty) {
+ val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
+
+ val blockList = carbonInputSplits.map(inputSplit =>
+ new TableBlockInfo(inputSplit.getPath.toString,
+ inputSplit.getStart, inputSplit.getSegmentId,
+ inputSplit.getLocations, inputSplit.getLength
+ )
+ )
+ if (blockList.nonEmpty) {
+ // group blocks to nodes, tasks
+ val nodeBlockMapping =
+ CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism)
+
+ var i = 0
+ // Create Spark Partition for each task and assign blocks
+ nodeBlockMapping.asScala.foreach { entry =>
+ entry._2.asScala.foreach { blocksPerTask =>
+ if (blocksPerTask.size() != 0) {
+ result.add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, blocksPerTask))
+ i += 1
+ }
+ }
+ }
+ val noOfBlocks = blockList.size
+ val noOfNodes = nodeBlockMapping.size
+ val noOfTasks = result.size()
+ logInfo(s"Identified no.of.Blocks: $noOfBlocks,"
+ + s"parallelism: $defaultParallelism , " +
+ s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks"
+ )
+ logInfo("Time taken to identify Blocks to scan : " +
+ (System.currentTimeMillis() - startTime)
+ )
+ result.asScala.foreach { r =>
+ val cp = r.asInstanceOf[CarbonSparkPartition]
+ logInfo(s"Node : " + cp.locations.toSeq.mkString(",")
+ + ", No.Of Blocks : " + cp.tableBlockInfos.size()
+ )
+ }
+ } else {
+ logInfo("No blocks identified to scan")
+ val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
+ result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
+ }
+ }
+ else {
+ logInfo("No valid segments found to scan")
+ val nodesPerBlock = new util.ArrayList[TableBlockInfo]()
+ result.add(new CarbonSparkPartition(id, 0, Seq("").toArray, nodesPerBlock))
+ }
+ result.toArray(new Array[Partition](result.size()))
+ }
+
+ override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ val iter = new Iterator[V] {
+ var rowIterator: CarbonIterator[Array[Any]] = _
+ var queryStartTime: Long = 0
+ try {
+ val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition]
+ if(!carbonSparkPartition.tableBlockInfos.isEmpty) {
+ queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx)
+ // fill table block info
+ queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos)
+ queryStartTime = System.currentTimeMillis
+
+ val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null)
+ logInfo("*************************" + carbonPropertiesFilePath)
+ if (null == carbonPropertiesFilePath) {
+ System.setProperty("carbon.properties.filepath",
+ System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
+ }
+ // execute query
+ rowIterator = new ChunkRowIterator(
+ QueryExecutorFactory.getQueryExecutor(queryModel).execute(queryModel).
+ asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]]
+
+ }
+ } catch {
+ case e: Exception =>
+ LOGGER.error(e)
+ if (null != e.getMessage) {
+ sys.error("Exception occurred in query execution :: " + e.getMessage)
+ } else {
+ sys.error("Exception occurred in query execution.Please check logs.")
+ }
+ }
+
+ var havePair = false
+ var finished = false
+
+ override def hasNext: Boolean = {
+ if (!finished && !havePair) {
+ finished = (null == rowIterator) || (!rowIterator.hasNext)
+ havePair = !finished
+ }
+ !finished
+ }
+
+ override def next(): V = {
+ if (!hasNext) {
+ throw new java.util.NoSuchElementException("End of stream")
+ }
+ havePair = false
+ keyClass.getValue(rowIterator.next())
+ }
+
+ logInfo("********************** Total Time Taken to execute the query in Carbon Side: " +
+ (System.currentTimeMillis - queryStartTime)
+ )
+ }
+ iter
+ }
+
+ /**
+ * Get the preferred locations where to launch this task.
+ */
+ override def getPreferredLocations(partition: Partition): Seq[String] = {
+ val theSplit = partition.asInstanceOf[CarbonSparkPartition]
+ theSplit.locations.filter(_ != "localhost")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/main/scala/org/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
----------------------------------------------------------------------
diff --cc integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
index b6bda0c,4a02975..d13a9df
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala
@@@ -40,19 -40,15 +40,19 @@@ class AllDataTypesTestCaseAggregate ext
val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
.getCanonicalPath
- sql("create cube Carbon_automation_test dimensions(imei string,deviceInformationId integer,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active
_phonePADPartitionedVersions string, Latest_YEAR integer, Latest_MONTH integer, Latest_DAY integer, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string) measures(gamePointId integer,contractNumber integer) OPTIONS (PARTITIONER [CLASS = 'org.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl' ,COLUMNS= (imei) , PARTITION_COUNT=2] )");
+ sql("create table Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPart
itionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_EXCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')");
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
- sql("LOAD DATA FACT FROM'"+currentDirectory+"/src/test/resources/100_olap.csv' INTO Cube Carbon_automation_test partitionData(DELIMITER ',' ,QUOTECHAR '\"', FILEHEADER 'imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUI
Version,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')");
+ sql("LOAD DATA LOCAL INPATH '"+currentDirectory+"/src/test/resources/100_olap.csv' INTO table Carbon_automation_test options('DELIMITER'= ',' ,'QUOTECHAR'= '\"', 'FILEHEADER'= 'imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Lat
est_EMUIVersion,Latest_operaSysVersion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')");
+ sql("create table if not exists Carbon_automation_hive (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Activ
e_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) row format delimited fields terminated by ','");
+ sql("LOAD DATA LOCAL INPATH '"+currentDirectory+"/src/test/resources/100_olap.csv' INTO table Carbon_automation_hive ");
}
override def afterAll {
- sql("drop cube Carbon_automation_test")
- sql("drop cube Carbon_automation_hive")
+ sql("drop table Carbon_automation_test")
++ sql("drop table Carbon_automation_hive")
+
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/integration/spark/src/test/scala/org/carbondata/spark/util/GlobalDictionaryUtilTestCase.scala
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
index 0a0fe62,0000000..a1cf40a
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/LocalFileLock.java
@@@ -1,162 -1,0 +1,162 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.lcm.locks;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+import org.carbondata.core.datastorage.store.impl.FileFactory;
+
+/**
+ * This class handles the file locking in the local file system.
+ * This will be handled using the file channel lock API.
+ */
+public class LocalFileLock extends AbstractCarbonLock {
+ /**
+ * location is the location of the lock file.
+ */
+ private String location;
+
+ /**
+ * lockUsage will determine the lock folder. so that similar locks will try to acquire
+ * same lock file.
+ */
+ private LockUsage lockUsage;
+
+ /**
+ * fileOutputStream of the local lock file
+ */
+ private FileOutputStream fileOutputStream;
+
+ /**
+ * channel is the FileChannel of the lock file.
+ */
+ private FileChannel channel;
+
+ /**
+ * fileLock NIO FileLock Object
+ */
+ private FileLock fileLock;
+
+ public static final String tmpPath;
+
+ private String cubeName;
+
+ private String schemaName;
+
+ /**
+ * LOGGER for logging the messages.
+ */
+ private static final LogService LOGGER =
+ LogServiceFactory.getLogService(LocalFileLock.class.getName());
+
+ static {
+ tmpPath = System.getProperty("java.io.tmpdir");
+ }
+
+ /**
+ * @param location
+ * @param lockUsage
+ */
+ public LocalFileLock(String location, LockUsage lockUsage) {
+ this.lockUsage = lockUsage;
+ location = location.replace("\\", "/");
+ String tempStr = location.substring(0, location.lastIndexOf('/'));
++ cubeName = tempStr.substring(tempStr.lastIndexOf('/') + 1, tempStr.length());
++ tempStr = tempStr.substring(0, tempStr.lastIndexOf('/'));
+ schemaName = tempStr.substring(tempStr.lastIndexOf('/') + 1, tempStr.length());
-
- cubeName = location.substring(location.lastIndexOf('/') + 1, location.length());
+ this.location =
+ tmpPath + File.separator + schemaName + File.separator + cubeName + File.separator
+ + this.lockUsage;
+ initRetry();
+ }
+
+ /**
+ * Lock API for locking of the file channel of the lock file.
+ *
+ * @return
+ */
+ @Override public boolean lock() {
+ try {
+ String schemaFolderPath = tmpPath + File.separator + schemaName;
+ String cubeFolderPath = schemaFolderPath + File.separator + cubeName;
+ // create dir with schema name in tmp location.
+ if (!FileFactory.isFileExist(schemaFolderPath, FileFactory.getFileType(tmpPath))) {
+ FileFactory.mkdirs(schemaFolderPath, FileFactory.getFileType(tmpPath));
+ }
+
+ // create dir with cube name in tmp location.
+ if (!FileFactory.isFileExist(cubeFolderPath, FileFactory.getFileType(tmpPath))) {
+ FileFactory.mkdirs(cubeFolderPath, FileFactory.getFileType(tmpPath));
+ }
+ if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
+ FileFactory.createNewLockFile(location, FileFactory.getFileType(location));
+ }
+
+ fileOutputStream = new FileOutputStream(location);
+ channel = fileOutputStream.getChannel();
+ try {
+ fileLock = channel.tryLock();
+ } catch (OverlappingFileLockException e) {
+ return false;
+ }
+ if (null != fileLock) {
+ return true;
+ } else {
+ return false;
+ }
+ } catch (IOException e) {
+ return false;
+ }
+
+ }
+
+ /**
+ * Unlock API for unlocking of the acquired lock.
+ *
+ * @return
+ */
+ @Override public boolean unlock() {
+ boolean status;
+ try {
+ if (null != fileLock) {
+ fileLock.release();
+ }
+ status = true;
+ } catch (IOException e) {
+ status = false;
+ } finally {
+ if (null != fileOutputStream) {
+ try {
+ fileOutputStream.close();
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage());
+ }
+ }
+ }
+ return status;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java
index e3eab01,0000000..3b03b1f
mode 100644,000000..100644
--- a/processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java
+++ b/processing/src/main/java/org/carbondata/lcm/locks/LockUsage.java
@@@ -1,29 -1,0 +1,30 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.carbondata.lcm.locks;
+
+/**
+ * This enum is used to define the usecase of the lock.
+ * Each enum value is one specific lock case.
+ */
+public enum LockUsage {
+ METADATA_LOCK,
- COMPACTION_LOCK;
++ COMPACTION_LOCK,
++ TABLE_STATUS_LOCK;
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
----------------------------------------------------------------------
diff --cc processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
index dcd5e19,473840b..3dd64be
--- a/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
@@@ -37,9 -39,10 +39,8 @@@ import org.carbondata.core.carbon.metad
import org.carbondata.core.carbon.path.CarbonStorePath;
import org.carbondata.core.carbon.path.CarbonTablePath;
import org.carbondata.core.constants.CarbonCommonConstants;
-import org.carbondata.core.file.manager.composite.FileData;
-import org.carbondata.core.file.manager.composite.FileManager;
-import org.carbondata.core.file.manager.composite.IFileManagerComposite;
+import org.carbondata.core.datastorage.store.columnar.ColumnGroupModel;
import org.carbondata.core.keygenerator.KeyGenException;
- import org.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
import org.carbondata.core.util.CarbonProperties;
import org.carbondata.core.util.CarbonUtil;
import org.carbondata.core.util.CarbonUtilException;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/processing/store/writer/CarbonFactDataWriterImplForIntIndexAndAggBlock.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/7f722186/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
----------------------------------------------------------------------